1. 문제 배경
- 3편까지 Kafka, Redis, MySQL, Spark Streaming을 포함한 기반 파이프라인 구성을 완료했다.
- 각 구성 요소가 개별적으로 실행되는 상태는 확인했지만, 실제로 데이터가 흐르는지는 별도로 검증이 필요했다.
- 이번 단계에서는 Airflow DAG 실행을 통해 데이터가 생성된 이후, Kafka, Spark, Redis, MySQL로 이어지는 흐름이 정상적으로 연결되는지를 확인한다.
2. 목표
- Airflow DAG 실행을 통해 데이터 적재
- Kafka → Spark → Redis / MySQL로 이어지는 흐름 검증
- 각 구성 요소가 실제로 연결되어 동작하는지 확인
3. Airflow 구성 및 실행
- 데이터 적재를 위해 Airflow를 Kubernetes 환경에 배포했다.
- Airflow는 Scheduler와 Webserver를 분리된 리소스로 구성했으며, 각 구성 요소는 YAML 기반으로 적용했다.
kubectl apply -f airflow-rbac.yaml
kubectl apply -f airflow-init.yaml
kubectl apply -f airflow-logs-pvc.yaml
kubectl apply -f airflow-scheduler.yaml
kubectl apply -f airflow-webserver.yaml
kubectl apply -f airflow-service.yaml
- Airflow Webserver에 접속하기 위해 Kubernetes Service에 대해 포트 포워딩을 수행했다.
kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow

- 배포 이후 Webserver에 접속해 DAG를 실행했다.이 DAG는 외부 데이터를 수집한 뒤 Kafka topic에 적재하는 역할을 한다.

4. 데이터 흐름 확인
1) Kafka 데이터 유입 확인
- public_api_producer_dag(공공데이터 API 호출 DAG) 실행

- Kafka outbreak_topic 메시지 수신 확인
kubectl exec -it kafka-0 -- bash
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic outbreak_topic \
--from-beginning
- DAG 실행 이후 Kafka topic에 데이터가 적재되는 것을 확인

이를 통해 데이터가 파이프라인의 시작 지점까지 정상적으로 전달되는 것을 확인할 수 있었다.
2) Spark Streaming 처리 확인
- Spark driver / executor Pod가 정상적으로 실행 중인지 확인
kubectl get pods | findstr streaming

- log를 통해 Kafka 데이터를 지속적으로 소비하면서 처리하는 흐름을 확인
kubectl logs outbreak-streaming-driver

3) Redis 반영 확인
- Redis에 데이터가 정상적으로 적재되었는지 확인하기 위해 redis-cli를 통해 키 목록을 조회
kubectl exec -it <redis-pod-name> -- redis-cli
KEYS *

저장된 key 목록을 확인했다.
4) MySQL 반영 확인
- mysql pod 접속
kubectl exec -it mysql-0 -- mysql -u user -p
- 데이터베이스 선택
SHOW DATABASES;
USE toy_project;

- 테이블 확인
SHOW TABLES;

- 테이블 값 확인
SELECT * FROM OUTBREAK_OCCURRENCE;

최종적으로 데이터가 MySQL에 적재된 것을 확인했다.
5. 핵심 포인트
이번 단계에서 중요한 점은 각 서비스를 개별적으로 실행하는 것이 아니라, 전체 흐름이 하나의 파이프라인으로 연결되어 동작하는지를 확인했다는 점이다.
- Airflow는 주기적으로 데이터를 생성하고
- Kafka는 데이터 전달 경로 역할을 수행하며
- Spark는 이를 지속적으로 처리하고
- Redis와 MySQL은 처리 결과를 저장하는 구조로 역할을 분리했다
특히 Spark는 DAG에 의해 매번 실행되는 작업이 아니라, Kafka topic을 지속적으로 구독하는 상시 실행 서비스로 구성했다.
6. 전체 구조 정리

- Airflow에서 1분 주기로 외부 API를 호출해 데이터를 수집하고 Kafka로 전달하는 구조로 구성함
- Kafka를 통해 수집 데이터를 메시지 단위로 분리하고, PySpark에서 토픽을 구독하여 전처리 및 배치 처리를 수행함
- 처리된 데이터는 Redis에 저장하여 실시간 조회 및 임시 저장 용도로 활용함
- 이후 배치 작업을 통해 Redis 데이터를 MySQL로 적재하여 이력 관리 및 분석용 데이터로 활용함
- 전체적으로 수집–처리–저장 흐름을 분리하여 각 단계의 역할을 명확히 하고, 데이터 흐름 기준으로 구조를 구성함
다음 글
- 데이터 파이프라인이 정상적으로 동작하는 것을 확인한 이후, 다음 단계에서는 위치 기반 데이터 처리와 경로 탐색을 위한 데이터베이스 구조를 확장했다.
- 기존 MySQL 기반 저장 구조에서는 좌표 연산이나 경로 계산이 제한적이었기 때문에, PostgreSQL에 PostGIS와 pgRouting 확장을 적용해 공간 데이터 처리와 최단 경로 탐색이 가능한 환경을 구성했다.
- 또한 Kubernetes 환경에서 PostgreSQL을 StatefulSet으로 배포하고, 도로 노드·링크 데이터를 직접 적재해 실제 경로 계산이 가능한 구조까지 연결했다.
다음 글에서는
- PostgreSQL + PostGIS + pgRouting 구성 과정
- shapefile 데이터 적재 방법
- 경로 탐색을 위한 데이터 구조 설계
를 순서대로 정리할 예정이다.
Kubernetes 환경 구성 (5) - PostgreSQL + PostGIS + pgRouting 구성
1. 문제 배경NaviSafe 프로젝트에서는 도로 링크와 노드 데이터를 기반으로 위치 정보 처리와 경로 탐색 기능이 필요했다.기존 MySQL 중심 구조에서는 데이터 저장은 가능했지만, 좌표 기반 연산이
jjaehyeok.tistory.com
'Navisafe > Infrastructure' 카테고리의 다른 글
| AWS S3 및 IAM 설정 - 클라우드 데이터 접근 환경 구성 (0) | 2026.04.29 |
|---|---|
| Kubernetes 환경 구성 (5) - PostgreSQL + PostGIS + pgRouting 구성 (0) | 2026.04.29 |
| Kubernetes 환경 구성 (3) - NaviSafe 서비스 배포와 Kafka Topic 초기화 (0) | 2026.04.20 |
| Kubernetes 환경 구성 (2) - NaviSafe 배포를 위한 사전 준비 (이미지, Helm, 공통 리소스) (1) | 2026.04.20 |
| Kubernetes 환경 구성 (1) - Windows에서 kind로 NaviSafe 클러스터 생성 (0) | 2026.04.20 |