kafka-sink-connect

메인 인덱스 | bob-yamong

실행 골격

  • 이 레포는 Kafka Connect REST 플러그인이 아니라, main.pyHeartbeatTracePoint 커넥터를 각각 별도 Process로 띄우는 상시 실행기입니다.
  • main.py는 커넥터가 죽었는지 계속 감시하고, 죽은 프로세스는 terminate()한 뒤 다시 생성합니다.
  • KeyboardInterrupt나 예외가 나면 실행 중이던 프로세스를 정리하고 종료합니다.

커넥터 흐름

  • create_heartbeat_connector()heartbeat 토픽을 읽어 PostgreSQL로 적재합니다.
  • HeartbeatTransformer는 호스트 정보와 컨테이너 stats를 server, system_info, containers, container_ids, container_stats, heartbeat 레코드로 분해합니다.
  • HeartbeatWriterpsycopg2.extras.execute_batchexecute_values를 사용해 서버 upsert, 시스템 정보 삽입, 컨테이너 upsert, ID 매핑, heartbeat 적재를 처리합니다.
  • create_tracepoint_connector()tracepoint 토픽을 ContainerLog 테이블에 적재합니다.
  • ContainerLogTransformer는 microsecond timestamp를 called_at으로 바꾸고, PostgresWriterContainerLog INSERT를 수행합니다.

공통 실행기

  • utils/bulk_data_connector.pyKafkaConsumer, Queue, Process를 묶은 일반화된 bulk connector입니다.
  • RoundRobinPartitionAssignor를 사용하고, consumer/writer/monitor 프로세스를 분리한 뒤 죽으면 다시 시작합니다.
  • monitor는 처리량, read/write rate, 살아 있는 프로세스 수를 로그에 남깁니다.

로깅

  • utils/logger.pylogs/monitor.log에 회전 로그를 남기고 콘솔에도 같은 포맷으로 출력합니다.

의존성

  • requirements.txt에는 kafka-python==2.0.2, psycopg2-binary, pyspark, py4j가 들어갑니다.