1. 문제 배경
Kafka, Redis, MySQL, Spark Streaming을 포함한 기반 서비스 구성을 완료했지만, 각 구성 요소가 개별적으로 실행되는 상태만으로는 실제 데이터 흐름이 정상 동작하는지 확인하기 어려웠다.
특히 이번 구조에서는 다음 흐름이 실제로 연결되어 동작해야 했다.
Airflow
→ Kafka
→ Spark Streaming
→ Redis
→ MySQL, S3
즉, 단순 Pod 실행 여부가 아니라 실제 데이터가 생성되고 처리되는 흐름 자체를 검증할 필요가 있었다.
이번 단계에서는 Airflow DAG 실행을 통해 데이터가 생성된 이후, Kafka, Spark, Redis, MySQL, S3 로 이어지는 흐름이 정상적으로 연결되는지를 확인해야한다.
2. 목표
- Airflow DAG 실행을 통한 데이터 적재
- Kafka → Spark → Redis / MySQL,S3 흐름 검증
- 각 구성 요소 간 실제 연결 상태 확인
- 데이터 파이프라인 정상 동작 여부 확인
3. Airflow 구성 및 실행
데이터 적재를 위해 Airflow를 Kubernetes 환경에 배포했다.
Airflow는 Scheduler와 Webserver를 분리된 리소스로 구성했으며, 각 구성 요소는 YAML 기준으로 적용했다.
kubectl apply -f airflow-rbac.yaml
kubectl apply -f airflow-init.yaml
kubectl apply -f airflow-logs-pvc.yaml
kubectl apply -f airflow-scheduler.yaml
kubectl apply -f airflow-webserver.yaml
kubectl apply -f airflow-service.yaml
3-1) Airflow Webserver 접속
Airflow UI에 접근하기 위해 Kubernetes Service 기준으로 포트 포워딩을 수행했다.
kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow
하지만 현재 Airflow는 원격 미니 PC 서버 내부에서 실행 중이었기 때문에, 로컬 PC 브라우저에서 직접 접근할 수는 없는 상태였다.
따라서 Windows 로컬 PC에서 SSH Local Port Forwarding을 추가로 구성했다.
ssh -L 8080:localhost:8080 <접속ID>@<미니PC-IP> -p <포트번호>
이후 로컬 브라우저에서 다음 주소로 접속했다.
localhost:8080
이 구조를 통해 로컬 PC 브라우저 요청이 SSH 터널을 거쳐 미니 PC 내부의 Airflow Webserver로 전달되도록 구성했다.

3-2) airflow hook 설정

Airflow에서 외부 API를 호출할 때 API 키나 기본 URL을 코드에 직접 작성하면 관리가 어려워진다.
특히 API 키가 DAG 코드에 포함되면 Git에 노출될 위험이 있고, 환경이 바뀔 때마다 코드를 수정해야 한다.
이를 방지하기 위해 Airflow의 Connection 기능을 사용했다.
Connection에 API 접속 정보를 등록하고, DAG에서는 Hook을 통해 필요한 값을 가져오도록 구성했다.
4. 데이터 흐름 확인
4-1) Kafka 데이터 유입 확인
Airflow DAG 실행 이후 Kafka topic에 데이터가 정상적으로 적재되는지 확인했다.
Kafka Pod 접속
kubectl exec -it kafka-0 -- bash
Topic 데이터 확인
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic outbreak_topic \
--from-beginning

확인 결과 DAG 실행 이후 Kafka topic에 메시지가 적재되는 것을 확인했다.
이를 통해 데이터가 파이프라인 시작 지점까지 정상적으로 전달되는 것을 확인할 수 있었다.
4-2) Spark Streaming 처리 확인
Spark Streaming Driver 및 Executor Pod 상태를 확인했다.
kubectl get pods | grep streaming
또한 로그를 통해 Kafka 데이터를 지속적으로 소비하는 흐름을 확인했다.
kubectl logs outbreak-streaming-driver
로그 확인 결과 Spark Streaming이 Kafka 메시지를 정상적으로 읽어서 배치 처리하는 것을 확인할 수 있었다.
특히 이번 구조에서는 Spark 작업을 DAG 실행 시마다 생성하는 방식이 아니라, Kafka topic을 지속적으로 구독하는 상시 실행 서비스 형태로 구성했다는 점이 중요했다.
4-3) Redis 반영 확인
Redis에 데이터가 정상적으로 저장되었는지 확인하기 위해 redis-cli로 key 목록을 조회했다.
kubectl exec -it redis-xxxxx -- redis-cli
KEYS *
조회 결과 Spark 처리 이후 Redis에 key가 저장된 것을 확인했다.
이를 통해 Spark 처리 결과가 실시간 저장 영역까지 정상적으로 전달되는 것을 확인할 수 있었다.
4-4) MySQL 반영 확인
최종적으로 MySQL에 데이터가 저장되는지 확인했다.
MySQL Pod 접속
kubectl exec -it mysql-0 -- mysql -u user -p
Database 확인

SHOW DATABASES;
USE toy_project;
테이블 확인
SHOW TABLES;

데이터 확인
SELECT * FROM OUTBREAK_OCCURRENCE;

확인 결과 돌발상황 ID(ACC_ID), 발생 시각, 종료 시각 데이터가 정상적으로 저장된 것을 확인했다.
4-5) S3 저장 확인

이를 통해 최종적으로 전처리된 데이터는 MySQL, 원본데이터는 S3에 정상적으로 전달되는 것을 검증할 수 있었다.
5. 핵심 포인트
이번 단계에서 중요한 점은 각 서비스를 단순 실행한 것이 아니라, 전체 흐름이 하나의 데이터 파이프라인으로 연결되어 동작하는지를 검증했다는 점이다.
각 서비스 역할은 다음과 같이 분리했다.
- Airflow → 외부 데이터 수집 및 Kafka 적재
- Kafka → 데이터 전달 경로 역할
- Spark Streaming → Kafka 데이터 지속 처리
- Redis → 실시간 저장 및 중간 버퍼 역할
- MySQL → 최종 이력 데이터 저장
- S3 → 원본 데이터 저장
특히 Spark Streaming은 DAG 실행 시마다 생성되는 구조가 아니라, Kafka topic을 지속적으로 구독하는 상시 실행 서비스 형태로 구성했다.
6. 전체 구조 정리
전체 데이터 흐름은 다음과 같이 구성했다.
Airflow
→ Kafka
→ Spark Streaming
→ Redis
→ MySQL
구체적으로는 다음과 같은 역할로 분리했다.
- Airflow에서 1분 주기로 외부 API 호출
- Kafka를 통해 메시지 단위 데이터 전달
- PySpark에서 Kafka topic 지속 구독 및 전처리
- Redis에 실시간 저장
- MySQL에 최종 이력 데이터 저장
전체적으로 수집–처리–저장 흐름을 분리해 각 단계 역할을 명확하게 구성했다.
7. 마치며
이번 단계에서는 Kubernetes 위에서 실행 중인 각 서비스를 실제 데이터 흐름 기준으로 연결해 검증했다.
특히 단순 Pod 실행 상태만 확인하는 것이 아니라,
- 데이터 생성
- Kafka 적재
- Spark 처리
- Redis 저장
- MySQL 최종 적재
과정을 순차적으로 확인하면서 실제 데이터 파이프라인이 정상적으로 동작하는지를 검증할 수 있었다.
또한 Spark Streaming을 상시 실행 구조로 구성하면서, 실시간 데이터 처리 흐름도 함께 확인할 수 있었다.
'Navisafe > Infrastructure' 카테고리의 다른 글
| 미니 PC 서버 Kubernetes 배포 (1) - Kafka · Redis · MySQL 기반 서비스 배포 (0) | 2026.05.12 |
|---|---|
| 미니 PC 서버 Kubernetes 배포 준비 - Namespace · Secret · Spark Operator 구성 (0) | 2026.05.12 |
| 미니 PC 서버에 k3s 기반 Kubernetes 운영 환경 구성 (1) | 2026.05.11 |
| Spark 연동을 위한 AWS RDS(MySQL) 환경 구성 (0) | 2026.05.03 |
| AWS S3를 활용한 데이터 저장 구조 설계 (0) | 2026.05.02 |