kafka-sink-connect
메인 인덱스 | bob-yamong
실행 골격
- 이 레포는 Kafka Connect REST 플러그인이 아니라,
main.py가 Heartbeat와 TracePoint 커넥터를 각각 별도 Process로 띄우는 상시 실행기입니다.
main.py는 커넥터가 죽었는지 계속 감시하고, 죽은 프로세스는 terminate()한 뒤 다시 생성합니다.
KeyboardInterrupt나 예외가 나면 실행 중이던 프로세스를 정리하고 종료합니다.
커넥터 흐름
create_heartbeat_connector()는 heartbeat 토픽을 읽어 PostgreSQL로 적재합니다.
HeartbeatTransformer는 호스트 정보와 컨테이너 stats를 server, system_info, containers, container_ids, container_stats, heartbeat 레코드로 분해합니다.
HeartbeatWriter는 psycopg2.extras.execute_batch와 execute_values를 사용해 서버 upsert, 시스템 정보 삽입, 컨테이너 upsert, ID 매핑, heartbeat 적재를 처리합니다.
create_tracepoint_connector()는 tracepoint 토픽을 ContainerLog 테이블에 적재합니다.
ContainerLogTransformer는 microsecond timestamp를 called_at으로 바꾸고, PostgresWriter가 ContainerLog INSERT를 수행합니다.
공통 실행기
utils/bulk_data_connector.py는 KafkaConsumer, Queue, Process를 묶은 일반화된 bulk connector입니다.
RoundRobinPartitionAssignor를 사용하고, consumer/writer/monitor 프로세스를 분리한 뒤 죽으면 다시 시작합니다.
- monitor는 처리량, read/write rate, 살아 있는 프로세스 수를 로그에 남깁니다.
로깅
utils/logger.py는 logs/monitor.log에 회전 로그를 남기고 콘솔에도 같은 포맷으로 출력합니다.
의존성
requirements.txt에는 kafka-python==2.0.2, psycopg2-binary, pyspark, py4j가 들어갑니다.