안녕하세요. 저는 홈디벨로퍼 금입니다.
최근에는 Kafka가 발행한 메시지를 외부 시스템에서 수신하는 기능을 개발했습니다.
(지금까지는 SQS SQS-SNS를 주로 사용했는데 회사 정책 변경으로 인해 kafka로 마이그레이션 중입니다.)
Spring Boot를 사용하지 않는 상황에서는 Bean을 등록하여 사용해야 합니다. 하지만 요즘은 대부분의 시스템이 Spring Boot로 구축되기 때문에,
설정이 쉽고 옵션을 변경하기 쉽습니다. 나는 주로 @KafkaListener를 사용합니다. 이 기사에서는 @KafkaListener를 사용하여 구현을 요약합니다.
Kotlin + SpringBoot + kafka 소비자의 기본 사용법
가장 먼저 @KafkaListener를 사용하여 기본 소비자를 구현해 봅시다.
– 먼저 spring-kafka 라이브러리 추가
* Spring Boot를 사용할 때(프로젝트를 생성하기 위해 start.spring.io를 사용하지 않음) 버전을 생략하면 Boot는 자동으로 Boot 버전과 호환되는 올바른 버전을 가져옵니다.
compile 'org.springframework.kafka:spring-kafka'
– application.yml 설정
기본 리스너 컨테이너를 사용하는 경우 yaml 구성 파일에서 소비자 및 리스너 옵션을 설정할 수 있으며 옵션 값은 애플리케이션 실행 시 자동으로 재정의됩니다. Spring Kafka에서 사용하는 옵션에 대한 설명은 다음과 같습니다. 스프링 공식 홈페이지에서 찾을 수 있습니다
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092 //kafka 메시지를 송신하는 서버 URL
listener:
type: BATCH
ack-mode: MANUAL_IMMEDIATE
* 소비자 옵션
* 리스너 옵션
spring.kafka.listener.ack-count | ackMode가 “COUNT” 또는 “COUNT_TIME”인 경우 오프셋 커밋 사이의 레코드 수입니다. |
spring.kafka.listener.ack 모드 | 리스너 AckMode. spring-kafka 문서를 참조하십시오. |
spring.kafka.listener.ack-시간 | ackMode가 “TIME” 또는 “COUNT_TIME”인 경우 오프셋 커밋 사이의 시간입니다. |
spring.kafka.listener.async-acks | 비동기 기록 확인 지원. spring.kafka.listener.ack-mode가 manual 또는 manual-immediate인 경우에만 적용됩니다. |
spring.kafka.listener.client-id | 수신기 소비자 client.id 속성의 접두사입니다. |
spring.kafka.listener.concurrency | 리스너 컨테이너에서 실행할 스레드 수입니다. |
spring.kafka.listener.idle-between-polls | Consumer.poll(Duration)에 대한 호출 사이의 휴면 간격. (기본: 0) |
spring.kafka.listener.idle 이벤트 간격 | 유휴 소비자 이벤트 게시 사이의 시간입니다(수신된 데이터 없음). |
spring.kafka.listener.idle-파티션-이벤트-간격 | 유휴 파티션 소비자 이벤트 게시물 사이의 시간입니다(파티션에 대해 수신된 데이터 없음). |
spring.kafka.listener.immediate-stop | 현재 레코드가 처리된 후 또는 이전 폴링의 모든 레코드가 처리된 후 컨테이너가 중지되는지 여부입니다.(기본: 거짓) |
spring.kafka.listener.log-container-config | 초기화 중 컨테이너 구성을 기록할지 여부(INFO 수준). |
spring.kafka.listener.missing-topics-fatal | 하나 이상의 구성된 항목이 브로커에 없는 경우 컨테이너 시작 실패 여부입니다. (기본: 거짓) |
spring.kafka.listener.monitor-interval | 응답하지 않는 소비자에 대한 확인 사이의 시간입니다. 마침표 접미사를 지정하지 않으면 초가 사용됩니다. |
spring.kafka.listener.no-poll-threshold | 소비자가 응답하지 않는지 확인하기 위해 “pollTimeout”에 적용되는 승수입니다. |
spring.kafka.listener.poll-timeout | 소비자를 폴링할 때 사용할 제한 시간입니다. |
spring.kafka.listener.type | 리스너 유형. (기본: 하나의) |
– @KafkaListener 사용
@Component
class ConsumerTest {
@KafkaListener(topics = "topic-name", groupId = "group-01")
public void onMessage(kafkaSchema: kafkaSchema, Acknowledgment ack) {
//kafkaSchema 사용 처리
ack.acknowledge(); // 커밋 수행
}
}
data class KafkaSchema(
val name: String,
val columnNames: List<String> = listOf()
)
리스너 유형 및 커밋 모드(AcksMode)
이전에 기본 소비자를 구현했습니다. @kafkaListener 생성 방식의 파라미터는 리스너 유형과 커밋 방식에 따라 변경할 수 있습니다. 그리고 커밋 모드(AcksMode)에 따라 커밋 방식을 선택할 수 있습니다.
– 리스너 유형 및 리스너 유형
- 레코드 리스너(MessageListener): 1개의 레코드만 처리합니다. (Spring Kafka 소비자의 기본 리스너 유형)
- Batch Listener(BatchMessageListener): 한 번에 여러 레코드를 처리합니다.
기록 | 메시지 리스너 | 한 번에 1개 레코드 처리 | onMessage(소비자 기록< K, V > 데이터) |
onMessage(V 데이터) | |||
확인 메시지 리스너 | 하나의 레코드가 처리됨에 따라 수동 커밋 |
onMessage(소비자 기록< K, V > 데이터, 승인 승인) | |
onMessage(V 데이터, 확인 확인) | |||
ConsumerAwareMessageListener | 하나의 레코드가 처리됨에 따라 KafkaConsumer 인스턴스에 직접적인 접근으로 제어할 수 있는 방법 |
onMessage(소비자 기록< K, V > 데이터, 소비자< K, V > 소비자) | |
onMessage(V 데이터, 소비자< K, V > 소비자) | |||
AcknowledgegingConsumerAwareMessageListener | 하나의 레코드가 처리됨에 따라 수동 커밋입니다. KafkaConsumer 인스턴스에 직접적인 접근으로 제어할 수 있는 방법 |
onMessage(소비자 기록< K, V > 데이터, 확인 승인, 소비자 소비자) | |
onMessage(V 데이터, 확인 확인, 소비자< K, V > 소비자) | |||
일괄 | 배치 메시지 리스너 | 한 번에 여러 레코드 처리 | onMessage(소비자 기록< K, V > 데이터) |
onMessage(목록< V > 데이터) | |||
BatchAcknowledgingMessageListener | 한 번에 여러 레코드 처리 수동 커밋 |
onMessage(소비자 기록< K, V > 데이터, 승인 승인) | |
onMessage(목록< V > 데이터, 승인 승인) | |||
BatchConsumerAwareMessageListener | KafkaConsumer 인스턴스에 직접적인 접근으로 제어할 수 있는 방법 |
onMessage(소비자 기록< K, V > 데이터, 소비자< K, V > 소비자) | |
onMessage(목록< V > 데이터, 소비자< K, V > 소비자) | |||
BatchAcknowledgingConsumerAwareMessageListener | 수동 커밋으로 KafkaConsumer 인스턴스에 직접적인 접근으로 제어할 수 있는 방법 |
onMessage(소비자 기록< K, V > 데이터, 확인 승인, 소비자< K, V > 소비자) | |
onMessage(목록< V > 데이터, 확인 승인, 소비자< K, V > 소비자) |
– AcksMode 유형
* spring-kafka Consumer의 기본 AckMode는 BATCH이고 enable.auto.commit 옵션은 false로 설정되어 있습니다.
기록 | 레코드별 처리 후 커밋 |
일괄 | poll() 메서드에 의해 호출된 모든 레코드가 처리된 후 커밋 |
시간 | 일정 시간 후 커밋 이 옵션을 사용하면 시간 간격을 선언하는 `AckTime` 옵션을 설정해야 합니다. |
세다 | 특정 수의 레코드가 처리된 후 커밋 이 옵션을 사용할 때 레코드 수를 선언하는 `AckCount` 옵션을 설정해야 합니다. |
COUNT_TIME | TIME 및 COUNT 옵션 중 하나라도 충족되면 커밋 |
수동 | Acknowledgement.acknowledge() 메서드가 호출될 때 다음 poll() 시간에 커밋합니다. acknowledge() 메서드를 매번 호출하면 BATCH 옵션과 동일하게 작동합니다. 이 옵션을 사용하는 경우 AcknowledgingMessageListener 또는 BatchAcknowledgeMessageListener를 리스너로 사용해야 합니다. |
MANUAL_IMMEDIATE | Acknowledgement.acknowledge() 메서드를 호출한 직후 커밋합니다. 이 옵션을 사용하는 경우 AcknowledgingMessageListener 또는 BatchAcknowledgeMessageListener를 리스너로 사용해야 합니다. |