안녕하세요. 저는 홈디벨로퍼 금입니다.
최근에는 Kafka가 발행한 메시지를 외부 시스템에서 수신하는 기능을 개발했습니다.
(지금까지는 SQS SQS-SNS를 주로 사용했는데 회사 정책 변경으로 인해 kafka로 마이그레이션 중입니다.
)
Spring Boot를 사용하지 않는 상황에서는 Bean을 등록하여 사용해야 합니다.
하지만 요즘은 대부분의 시스템이 Spring Boot로 구축되기 때문에,
설정이 쉽고 옵션을 변경하기 쉽습니다.
나는 주로 @KafkaListener를 사용합니다.
이 기사에서는 @KafkaListener를 사용하여 구현을 요약합니다.
Kotlin + SpringBoot + kafka 소비자의 기본 사용법
가장 먼저 @KafkaListener를 사용하여 기본 소비자를 구현해 봅시다.
– 먼저 spring-kafka 라이브러리 추가
* Spring Boot를 사용할 때(프로젝트를 생성하기 위해 start.spring.io를 사용하지 않음) 버전을 생략하면 Boot는 자동으로 Boot 버전과 호환되는 올바른 버전을 가져옵니다.
compile 'org.springframework.kafka:spring-kafka'
– application.yml 설정
기본 리스너 컨테이너를 사용하는 경우 yaml 구성 파일에서 소비자 및 리스너 옵션을 설정할 수 있으며 옵션 값은 애플리케이션 실행 시 자동으로 재정의됩니다.
Spring Kafka에서 사용하는 옵션에 대한 설명은 다음과 같습니다.
스프링 공식 홈페이지에서 찾을 수 있습니다
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092 //kafka 메시지를 송신하는 서버 URL
listener:
type: BATCH
ack-mode: MANUAL_IMMEDIATE
* 소비자 옵션
* 리스너 옵션
spring.kafka.listener.ack-count | ackMode가 “COUNT” 또는 “COUNT_TIME”인 경우 오프셋 커밋 사이의 레코드 수입니다. |
spring.kafka.listener.ack 모드 | 리스너 AckMode. spring-kafka 문서를 참조하십시오. |
spring.kafka.listener.ack-시간 | ackMode가 “TIME” 또는 “COUNT_TIME”인 경우 오프셋 커밋 사이의 시간입니다. |
spring.kafka.listener.async-acks | 비동기 기록 확인 지원. spring.kafka.listener.ack-mode가 manual 또는 manual-immediate인 경우에만 적용됩니다. |
spring.kafka.listener.client-id | 수신기 소비자 client.id 속성의 접두사입니다. |
spring.kafka.listener.concurrency | 리스너 컨테이너에서 실행할 스레드 수입니다. |
spring.kafka.listener.idle-between-polls | Consumer.poll(Duration)에 대한 호출 사이의 휴면 간격. (기본: 0) |
spring.kafka.listener.idle 이벤트 간격 | 유휴 소비자 이벤트 게시 사이의 시간입니다(수신된 데이터 없음). |
spring.kafka.listener.idle-파티션-이벤트-간격 | 유휴 파티션 소비자 이벤트 게시물 사이의 시간입니다(파티션에 대해 수신된 데이터 없음). |
spring.kafka.listener.immediate-stop | 현재 레코드가 처리된 후 또는 이전 폴링의 모든 레코드가 처리된 후 컨테이너가 중지되는지 여부입니다. (기본: 거짓) |
spring.kafka.listener.log-container-config | 초기화 중 컨테이너 구성을 기록할지 여부(INFO 수준). |
spring.kafka.listener.missing-topics-fatal | 하나 이상의 구성된 항목이 브로커에 없는 경우 컨테이너 시작 실패 여부입니다. (기본: 거짓) |
spring.kafka.listener.monitor-interval | 응답하지 않는 소비자에 대한 확인 사이의 시간입니다. 마침표 접미사를 지정하지 않으면 초가 사용됩니다. |
spring.kafka.listener.no-poll-threshold | 소비자가 응답하지 않는지 확인하기 위해 “pollTimeout”에 적용되는 승수입니다. |
spring.kafka.listener.poll-timeout | 소비자를 폴링할 때 사용할 제한 시간입니다. |
spring.kafka.listener.type | 리스너 유형. (기본: 하나의) |
– @KafkaListener 사용
@Component
class ConsumerTest {
@KafkaListener(topics = "topic-name", groupId = "group-01")
public void onMessage(kafkaSchema: kafkaSchema, Acknowledgment ack) {
//kafkaSchema 사용 처리
ack.acknowledge(); // 커밋 수행
}
}
data class KafkaSchema(
val name: String,
val columnNames: List<String> = listOf()
)
리스너 유형 및 커밋 모드(AcksMode)
이전에 기본 소비자를 구현했습니다.
@kafkaListener 생성 방식의 파라미터는 리스너 유형과 커밋 방식에 따라 변경할 수 있습니다.
그리고 커밋 모드(AcksMode)에 따라 커밋 방식을 선택할 수 있습니다.
– 리스너 유형 및 리스너 유형
- 레코드 리스너(MessageListener): 1개의 레코드만 처리합니다.
(Spring Kafka 소비자의 기본 리스너 유형) - Batch Listener(BatchMessageListener): 한 번에 여러 레코드를 처리합니다.
기록 | 메시지 리스너 | 한 번에 1개 레코드 처리 | onMessage(소비자 기록< K, V > 데이터) |
onMessage(V 데이터) | |||
확인 메시지 리스너 | 하나의 레코드가 처리됨에 따라 수동 커밋 |
onMessage(소비자 기록< K, V > 데이터, 승인 승인) | |
onMessage(V 데이터, 확인 확인) | |||
ConsumerAwareMessageListener | 하나의 레코드가 처리됨에 따라 KafkaConsumer 인스턴스에 직접적인 접근으로 제어할 수 있는 방법 |
onMessage(소비자 기록< K, V > 데이터, 소비자< K, V > 소비자) | |
onMessage(V 데이터, 소비자< K, V > 소비자) | |||
AcknowledgegingConsumerAwareMessageListener | 하나의 레코드가 처리됨에 따라 수동 커밋입니다. KafkaConsumer 인스턴스에 직접적인 접근으로 제어할 수 있는 방법 |
onMessage(소비자 기록< K, V > 데이터, 확인 승인, 소비자 소비자) | |
onMessage(V 데이터, 확인 확인, 소비자< K, V > 소비자) | |||
일괄 | 배치 메시지 리스너 | 한 번에 여러 레코드 처리 | onMessage(소비자 기록< K, V > 데이터) |
onMessage(목록< V > 데이터) | |||
BatchAcknowledgingMessageListener | 한 번에 여러 레코드 처리 수동 커밋 |
onMessage(소비자 기록< K, V > 데이터, 승인 승인) | |
onMessage(목록< V > 데이터, 승인 승인) | |||
BatchConsumerAwareMessageListener | KafkaConsumer 인스턴스에 직접적인 접근으로 제어할 수 있는 방법 |
onMessage(소비자 기록< K, V > 데이터, 소비자< K, V > 소비자) | |
onMessage(목록< V > 데이터, 소비자< K, V > 소비자) | |||
BatchAcknowledgingConsumerAwareMessageListener | 수동 커밋으로 KafkaConsumer 인스턴스에 직접적인 접근으로 제어할 수 있는 방법 |
onMessage(소비자 기록< K, V > 데이터, 확인 승인, 소비자< K, V > 소비자) | |
onMessage(목록< V > 데이터, 확인 승인, 소비자< K, V > 소비자) |
– AcksMode 유형
* spring-kafka Consumer의 기본 AckMode는 BATCH이고 enable.auto.commit 옵션은 false로 설정되어 있습니다.
기록 | 레코드별 처리 후 커밋 |
일괄 | poll() 메서드에 의해 호출된 모든 레코드가 처리된 후 커밋 |
시간 | 일정 시간 후 커밋 이 옵션을 사용하면 시간 간격을 선언하는 `AckTime` 옵션을 설정해야 합니다. |
세다 | 특정 수의 레코드가 처리된 후 커밋 이 옵션을 사용할 때 레코드 수를 선언하는 `AckCount` 옵션을 설정해야 합니다. |
COUNT_TIME | TIME 및 COUNT 옵션 중 하나라도 충족되면 커밋 |
수동 | Acknowledgement.acknowledge() 메서드가 호출될 때 다음 poll() 시간에 커밋합니다. acknowledge() 메서드를 매번 호출하면 BATCH 옵션과 동일하게 작동합니다. 이 옵션을 사용하는 경우 AcknowledgingMessageListener 또는 BatchAcknowledgeMessageListener를 리스너로 사용해야 합니다. |
MANUAL_IMMEDIATE | Acknowledgement.acknowledge() 메서드를 호출한 직후 커밋합니다. 이 옵션을 사용하는 경우 AcknowledgingMessageListener 또는 BatchAcknowledgeMessageListener를 리스너로 사용해야 합니다. |