Kafka Consumer는 메시지를 구독하고 처리하는 역할을 담당하는 클라이언트이다.
1. Consumer Group
Consumer는 단 하나의 Consumer Group에 소속되며, Consumer Group은 고유한 그룹아이디를 가지고 1개 이상의 Consumer를 가질 수 있다.
토픽 메시지는 Consumer Group에서 단 하나의 Consumer 에만 할당된다.
2. Kafka Consumer Subscribe, Poll, Commit 과정
1. 토픽 구독 (Subscribe)
Consumer는 subscribe를 호출하여 읽을 토픽을 등록한다.
2. 토픽 메시지 소비 (poll)
Consumer는 poll 메소드를 이용하여 주기적으로 브로커의 토픽 파티션에서 메시지를 가져온다.
Fetcher는 Consumer 내부의 Queue에서 데이터를 가져오는데, Queue에 데이터가 없을 경우 ConsumerNetworkClient에서 데이터를 가져오도록 요청한다.
Poll 동작 과정
1. ConsumerNetworkClient가 주기적으로 Kafka Borker에게 데이터 요청하여 Queue에 데이터를 저장
2. Consumer 애플리케이션에서 Poll 호출
3. Fetcher가 Queue 확인
- Queue에 데이터가 있는 경우 애플리케이션으로 데이터 전달
- Queue에 데이터가 없는 경우 ConsumerNetworkClient에 의해 Queue가 쌓일 때까지 대기
3. 오프셋 관리 (commit)
Consumer가 poll 메소드를 통해 성공적으로 메시지를 가져오면 commit을 통해 __consumer_offset에 다음에 읽을 offset 위치를 기록한다.
4. Offset
Consumer가 토픽에서 메시지를 읽을 때, 읽은 위치를 offset으로 기록한다.
Consumer가 Topic에 처음 접근할 때 어떤 메시지 부터 가져올지 설정 할 수 있다.
- earliest : 토픽의 앞의 offset 부터 읽음
- latest: 토픽의 가장 마지막 offset 부터 시작
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earlist");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
Conssumer Group로 새로운 Consumer가 접속할 경우 __consumer_offsets에 있는 offset 정보를 기반으로 메시지를 읽는다. Consumer Group의 Consumer가 모두 종료 되어도 Consumer Group이 읽어들인 offset 정보는 7일동안 __consumer_offsets에 저장되어 있다.
5. Heart Beat Thread
HeartBeat Thread는 Consumer의 정상적으로 활동하고 있는지 Group Coordinator에 보고하는 역할을 수행한다.
KafkaConsumer는 Fetcher, ConsumerClientNetwork 등의 주요 내부 객체와 별도의 Heart Beat Thread를 생성한다.
Heart Beat와 관련된 주요 파라미터
- heartbeat.interval.ms (default 3000ms)
- HeartBeat Thread 가 HeartBeat를 보내는 간격
- Session.timeout.ms 보다 낮게 설정되어야 함
- Session.timeout.ms의 1/3 보다 낮게 설정 권장
- session.timeout.ms (default 45000ms)
- Broker가 Consumer로 HeartBeat를 기다리는 최대 시간
- Broker는 설정된 시간 동안 HeartBeat를 받지 못하면 Rebalancing 명령을 지시
- max.poll.interval.ms (default 300000ms)
- 이전 poll() 호출 후 다음 호출 poll()까지 broker가 기다리는 시간
- 설정 시간 동안 poll() 호출이 이뤄지지 않으면 해당 Consumer가 문제가 있는 것으로 판단하여 Rebalancing 명령
6. Group Coordinator
Group Coordinator는 Broker 내부에 위치하며 Consumer Group의 상태를 체크하는 역할을 담당한다.
Consumer Group에 변동이 생기거나 컨슈머에 장애가 발생한 경우 Rebalancing 명령을 내린다.
7. Kafka Consumer 주요 옵션
- Heart Beat와 관련된 주요 파라미터 이 외
옵션 | 설명 |
bootstrap.servers | Kafka 브로커의 호스트와 포트 정보 |
group.id | Consumer Group를 식별하기 위한 ID |
client.id | Consumer를 식별하기 위한 ID |
key.deserializer | 메시지 키를 역직렬화하는 클래스 |
value.deserializer | 메시지 값을 역직렬화하는 클래스 |
enable.auto.commit | 메시지 소비후 offset을 자동으로 commit 할지 여부 (default: true) |
auto.offset.reset | Consumer가 초기 offset을 찾을 때 어떤 위치에서 시작할지 옵션 - latest: 가장 마지막 offset에서 시작 - earliest: 가장 앞의 offset에서 시작 - none : offest 정보가 없으면 예외 발생 |
fetch.min.bytes | 한 번의 Fetch 요청에서 가져올 데이터의 최소 크기 (default: 1Bytes) |
fetch.max.wait.ms | Broker가 Fetch 요청을 처리하기 전에 대기할 최대 시간 (default: 500ms) |
fetch.max.bytes | 한 번의 Fetch 요청에서 가져올 수 있는 전체 데이터 크기 (default: 50mb) |
max.partition.fetch.bytes | 한 파티션에서 한 번의 Fetch 요청으로 가져올 수 있는 최대 데이터 크기 (default: 1mb) - 설정 값이 너무 작으면 Fetch 요청이 자주 발생하여 네트워크 부하 증가 - 설정 값이 너무 크면 메모리 사용량 증가 |
max.poll.records | 한 번의 poll 호출에서 반환할 최대 레코드 수 (default: 500) - 처리량과 메시지 소비 지연 간의 조정 필요 |
max.poll.interval.ms | Consumer가 Poll 호출 후 대기할 수 있는 최대 시간 (default: 30000ms) |
connections.max.idle.ms | Broker와의 유휴 연결 최대 시간 (default: 540000ms) |
partition.assignment.strategy | Consumer Group 파티션 할당 전략 - kafka 2.3.0 이전 default: RangeAssignor - kafka 2.4.0 이후 default: StickyAssignor |
'Kafka' 카테고리의 다른 글
[Apache Kafka] Kafka Group Consumer Rebalance (0) | 2024.12.28 |
---|---|
[Apache Kafka] Kafka Producer (0) | 2024.12.26 |
[Apache Kafka] Kafka Replication란? (0) | 2024.09.18 |
[Apache Kafka] 카프카란 무엇인가? (0) | 2024.09.18 |