Navisafe/Infrastructure

Kubernetes 환경 구성 (4) - 데이터 파이프라인 동작 검증

jjaehyeok 2026. 4. 20. 17:19

1. 문제 배경

  • 3편까지 Kafka, Redis, MySQL, Spark Streaming을 포함한 기반 파이프라인 구성을 완료했다.
  • 각 구성 요소가 개별적으로 실행되는 상태는 확인했지만, 실제로 데이터가 흐르는지는 별도로 검증이 필요했다.
  • 이번 단계에서는 Airflow DAG 실행을 통해 데이터가 생성된 이후, Kafka, Spark, Redis, MySQL로 이어지는 흐름이 정상적으로 연결되는지를 확인한다.

2. 목표

  • Airflow DAG 실행을 통해 데이터 적재
  • Kafka → Spark → Redis / MySQL로 이어지는 흐름 검증
  • 각 구성 요소가 실제로 연결되어 동작하는지 확인

3. Airflow 구성 및 실행

  • 데이터 적재를 위해 Airflow를 Kubernetes 환경에 배포했다.
  • Airflow는 Scheduler와 Webserver를 분리된 리소스로 구성했으며, 각 구성 요소는 YAML 기반으로 적용했다.
kubectl apply -f airflow-rbac.yaml
kubectl apply -f airflow-init.yaml
kubectl apply -f airflow-logs-pvc.yaml
kubectl apply -f airflow-scheduler.yaml
kubectl apply -f airflow-webserver.yaml
kubectl apply -f airflow-service.yaml
 
  • Airflow Webserver에 접속하기 위해 Kubernetes Service에 대해 포트 포워딩을 수행했다.
kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow

 

  • 배포 이후 Webserver에 접속해 DAG를 실행했다.이 DAG는 외부 데이터를 수집한 뒤 Kafka topic에 적재하는 역할을 한다.


4. 데이터 흐름 확인

1) Kafka 데이터 유입 확인

  • public_api_producer_dag(공공데이터 API 호출 DAG) 실행

  • Kafka outbreak_topic 메시지 수신 확인
kubectl exec -it kafka-0 -- bash

kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic outbreak_topic \
--from-beginning
 
  • DAG 실행 이후 Kafka topic에 데이터가 적재되는 것을 확인

이를 통해 데이터가 파이프라인의 시작 지점까지 정상적으로 전달되는 것을 확인할 수 있었다.

2) Spark Streaming 처리 확인

  • Spark driver / executor Pod가 정상적으로 실행 중인지 확인
kubectl get pods | findstr streaming
  • log를 통해 Kafka 데이터를 지속적으로 소비하면서 처리하는 흐름을 확인
kubectl logs outbreak-streaming-driver
Spark 스트리밍이 Kafka 메시지 3건을 정상적으로 읽어서 배치 처리

3) Redis 반영 확인

  • Redis에 데이터가 정상적으로 적재되었는지 확인하기 위해 redis-cli를 통해 키 목록을 조회
kubectl exec -it <redis-pod-name> -- redis-cli
KEYS *
 

저장된 key 목록을 확인했다.

4) MySQL 반영 확인

  • mysql pod 접속
kubectl exec -it mysql-0 -- mysql -u user -p
  • 데이터베이스 선택
SHOW DATABASES;
USE toy_project;
  • 테이블 확인
SHOW TABLES;​
  • 테이블 값 확인
    SELECT * FROM OUTBREAK_OCCURRENCE;
돌발상황 ID(ACC_ID) 와 발생 시각 및 종료 시각

최종적으로 데이터가 MySQL에 적재된 것을 확인했다.


5. 핵심 포인트

이번 단계에서 중요한 점은 각 서비스를 개별적으로 실행하는 것이 아니라, 전체 흐름이 하나의 파이프라인으로 연결되어 동작하는지를 확인했다는 점이다.

  • Airflow는 주기적으로 데이터를 생성하고
  • Kafka는 데이터 전달 경로 역할을 수행하며
  • Spark는 이를 지속적으로 처리하고
  • Redis와 MySQL은 처리 결과를 저장하는 구조로 역할을 분리했다

특히 Spark는 DAG에 의해 매번 실행되는 작업이 아니라, Kafka topic을 지속적으로 구독하는 상시 실행 서비스로 구성했다.


6. 전체 구조 정리

  • Airflow에서 1분 주기로 외부 API를 호출해 데이터를 수집하고 Kafka로 전달하는 구조로 구성함
  • Kafka를 통해 수집 데이터를 메시지 단위로 분리하고, PySpark에서 토픽을 구독하여 전처리 및 배치 처리를 수행함
  • 처리된 데이터는 Redis에 저장하여 실시간 조회 및 임시 저장 용도로 활용함
  • 이후 배치 작업을 통해 Redis 데이터를 MySQL로 적재하여 이력 관리 및 분석용 데이터로 활용함
  • 전체적으로 수집–처리–저장 흐름을 분리하여 각 단계의 역할을 명확히 하고, 데이터 흐름 기준으로 구조를 구성함

다음 글

  • 데이터 파이프라인이 정상적으로 동작하는 것을 확인한 이후, 다음 단계에서는 위치 기반 데이터 처리와 경로 탐색을 위한 데이터베이스 구조를 확장했다.
  • 기존 MySQL 기반 저장 구조에서는 좌표 연산이나 경로 계산이 제한적이었기 때문에, PostgreSQL에 PostGIS와 pgRouting 확장을 적용해 공간 데이터 처리와 최단 경로 탐색이 가능한 환경을 구성했다.
  • 또한 Kubernetes 환경에서 PostgreSQL을 StatefulSet으로 배포하고, 도로 노드·링크 데이터를 직접 적재해 실제 경로 계산이 가능한 구조까지 연결했다.

다음 글에서는

  • PostgreSQL + PostGIS + pgRouting 구성 과정
  • shapefile 데이터 적재 방법
  • 경로 탐색을 위한 데이터 구조 설계

를 순서대로 정리할 예정이다.

 

Kubernetes 환경 구성 (5) - PostgreSQL + PostGIS + pgRouting 구성

1. 문제 배경NaviSafe 프로젝트에서는 도로 링크와 노드 데이터를 기반으로 위치 정보 처리와 경로 탐색 기능이 필요했다.기존 MySQL 중심 구조에서는 데이터 저장은 가능했지만, 좌표 기반 연산이

jjaehyeok.tistory.com