(SpringBoot) (Kotlin) (Kafka) 기본 Kafka 소비자

안녕하세요. 저는 홈디벨로퍼 금입니다.


최근에는 Kafka가 발행한 메시지를 외부 시스템에서 수신하는 기능을 개발했습니다.
(지금까지는 SQS SQS-SNS를 주로 사용했는데 회사 정책 변경으로 인해 kafka로 마이그레이션 중입니다.)

Spring Boot를 사용하지 않는 상황에서는 Bean을 등록하여 사용해야 합니다. 하지만 요즘은 대부분의 시스템이 Spring Boot로 구축되기 때문에,

설정이 쉽고 옵션을 변경하기 쉽습니다. 나는 주로 @KafkaListener를 사용합니다. 이 기사에서는 @KafkaListener를 사용하여 구현을 요약합니다.

Kotlin + SpringBoot + kafka 소비자의 기본 사용법

가장 먼저 @KafkaListener를 사용하여 기본 소비자를 구현해 봅시다.

– 먼저 spring-kafka 라이브러리 추가
* Spring Boot를 사용할 때(프로젝트를 생성하기 위해 start.spring.io를 사용하지 않음) 버전을 생략하면 Boot는 자동으로 Boot 버전과 호환되는 올바른 버전을 가져옵니다.

compile 'org.springframework.kafka:spring-kafka'

– application.yml 설정

기본 리스너 컨테이너를 사용하는 경우 yaml 구성 파일에서 소비자 및 리스너 옵션을 설정할 수 있으며 옵션 값은 애플리케이션 실행 시 자동으로 재정의됩니다. Spring Kafka에서 사용하는 옵션에 대한 설명은 다음과 같습니다. 스프링 공식 홈페이지에서 찾을 수 있습니다

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092 //kafka 메시지를 송신하는 서버 URL
    listener:
      type: BATCH
      ack-mode: MANUAL_IMMEDIATE

* 소비자 옵션

더보기

spring.kafka.consumer.auto-commit-interval ‘enable.auto.commit’이 true로 설정된 경우 소비자 오프셋이 Kafka에 자동 커밋되는 빈도입니다.
spring.kafka.consumer.auto-offset-reset Kafka에 초기 오프셋이 없거나 현재 오프셋이 서버에 더 이상 존재하지 않는 경우 수행할 작업입니다.
spring.kafka.consumer.bootstrap-servers Kafka 클러스터에 대한 초기 연결을 설정하는 데 사용할 호스트:포트 쌍의 쉼표로 구분된 목록입니다. 소비자의 전역 속성을 재정의합니다.
spring.kafka.consumer.client-id 요청 시 서버에 전달할 ID입니다. 서버 측 로깅에 사용됩니다.
spring.kafka.consumer.enable-auto-commit 소비자의 오프셋이 백그라운드에서 주기적으로 커밋되는지 여부입니다.
spring.kafka.consumer.fetch-max-wait “fetch-min-size”에 의해 주어진 요구 사항을 즉시 충족하기에 충분한 데이터가 없는 경우 가져오기 요청에 응답하기 전에 서버가 차단하는 최대 시간입니다.
spring.kafka.consumer.fetch-min-size 가져오기 요청에 대해 서버가 반환해야 하는 최소 데이터 양입니다.
spring.kafka.consumer.group-id 이 소비자가 속한 소비자 그룹을 식별하는 고유한 문자열입니다.
spring.kafka.consumer.heartbeat-간격 소비자 조정자에 대한 하트비트 사이의 예상 시간입니다.
spring.kafka.consumer.isolation-level 트랜잭션 방식으로 작성된 메시지를 읽기 위한 격리 수준입니다.
(기본: 커밋되지 않은 읽기)
spring.kafka.consumer.key-deserializer 키에 대한 Deserializer 클래스입니다.
spring.kafka.consumer.max-poll-records poll()에 대한 단일 호출에서 반환되는 최대 레코드 수입니다.
spring.kafka.consumer.properties.* 클라이언트를 구성하는 데 사용되는 추가 소비자별 속성입니다.
spring.kafka.consumer.security.protocol 브로커와 통신하는 데 사용되는 보안 프로토콜입니다.
spring.kafka.consumer.ssl.key-암호 키 저장소 키 또는 키 저장소 파일의 개인 키에 대한 비밀번호입니다.
spring.kafka.consumer.ssl.key-store-certificate-chain X.509 인증서 목록이 있는 PEM 형식의 인증서 체인.
spring.kafka.consumer.ssl.key-store-key PKCS#8 키가 있는 PEM 형식의 개인 키.
spring.kafka.consumer.ssl.key-store-location 키 저장소 파일의 위치입니다.
spring.kafka.consumer.ssl.key-store-password 키 저장소 파일의 비밀번호를 저장합니다.
spring.kafka.consumer.ssl.key-store-type 키 저장소의 유형입니다.
spring.kafka.consumer.ssl.protocol 사용할 SSL 프로토콜입니다.
spring.kafka.consumer.ssl.trust-store-certificates X.509 인증서를 포함하는 PEM 형식의 신뢰할 수 있는 인증서입니다.
spring.kafka.consumer.ssl.trust-store-location truststore 파일의 위치입니다.
spring.kafka.consumer.ssl.trust-store-password truststore 파일의 암호를 저장합니다.
spring.kafka.consumer.ssl.trust-store-type 신뢰 저장소의 유형입니다.
spring.kafka.consumer.value-deserializer 값에 대한 Deserializer 클래스입니다.

* 리스너 옵션

더보기

spring.kafka.listener.ack-count ackMode가 “COUNT” 또는 “COUNT_TIME”인 경우 오프셋 커밋 사이의 레코드 수입니다.
spring.kafka.listener.ack 모드 리스너 AckMode. spring-kafka 문서를 참조하십시오.
spring.kafka.listener.ack-시간 ackMode가 “TIME” 또는 “COUNT_TIME”인 경우 오프셋 커밋 사이의 시간입니다.
spring.kafka.listener.async-acks 비동기 기록 확인 지원. spring.kafka.listener.ack-mode가 manual 또는 manual-immediate인 경우에만 적용됩니다.
spring.kafka.listener.client-id 수신기 소비자 client.id 속성의 접두사입니다.
spring.kafka.listener.concurrency 리스너 컨테이너에서 실행할 스레드 수입니다.
spring.kafka.listener.idle-between-polls Consumer.poll(Duration)에 대한 호출 사이의 휴면 간격. (기본: 0)
spring.kafka.listener.idle 이벤트 간격 유휴 소비자 이벤트 게시 사이의 시간입니다(수신된 데이터 없음).
spring.kafka.listener.idle-파티션-이벤트-간격 유휴 파티션 소비자 이벤트 게시물 사이의 시간입니다(파티션에 대해 수신된 데이터 없음).
spring.kafka.listener.immediate-stop 현재 레코드가 처리된 후 또는 이전 폴링의 모든 레코드가 처리된 후 컨테이너가 중지되는지 여부입니다.(기본: 거짓)
spring.kafka.listener.log-container-config 초기화 중 컨테이너 구성을 기록할지 여부(INFO 수준).
spring.kafka.listener.missing-topics-fatal 하나 이상의 구성된 항목이 브로커에 없는 경우 컨테이너 시작 실패 여부입니다. (기본: 거짓)
spring.kafka.listener.monitor-interval 응답하지 않는 소비자에 대한 확인 사이의 시간입니다. 마침표 접미사를 지정하지 않으면 초가 사용됩니다.
spring.kafka.listener.no-poll-threshold 소비자가 응답하지 않는지 확인하기 위해 “pollTimeout”에 적용되는 승수입니다.
spring.kafka.listener.poll-timeout 소비자를 폴링할 때 사용할 제한 시간입니다.
spring.kafka.listener.type 리스너 유형. (기본: 하나의)

@KafkaListener 사용

@Component
class ConsumerTest {
    @KafkaListener(topics = "topic-name", groupId = "group-01")
    public void onMessage(kafkaSchema: kafkaSchema, Acknowledgment ack) {
        //kafkaSchema 사용 처리
        ack.acknowledge(); // 커밋 수행
    }
}

data class KafkaSchema(
    val name: String,
    val columnNames: List<String> = listOf() 
)

리스너 유형 및 커밋 모드(AcksMode)

이전에 기본 소비자를 구현했습니다. @kafkaListener 생성 방식의 파라미터는 리스너 유형과 커밋 방식에 따라 변경할 수 있습니다. 그리고 커밋 모드(AcksMode)에 따라 커밋 방식을 선택할 수 있습니다.

– 리스너 유형 및 리스너 유형

  • 레코드 리스너(MessageListener): 1개의 레코드만 처리합니다. (Spring Kafka 소비자의 기본 리스너 유형)
  • Batch Listener(BatchMessageListener): 한 번에 여러 레코드를 처리합니다.
기록 메시지 리스너 한 번에 1개 레코드 처리 onMessage(소비자 기록< K, V > 데이터)
onMessage(V 데이터)
확인 메시지 리스너 하나의 레코드가 처리됨에 따라
수동 커밋
onMessage(소비자 기록< K, V > 데이터, 승인 승인)
onMessage(V 데이터, 확인 확인)
ConsumerAwareMessageListener 하나의 레코드가 처리됨에 따라
KafkaConsumer 인스턴스에
직접적인 접근으로
제어할 수 있는 방법
onMessage(소비자 기록< K, V > 데이터, 소비자< K, V > 소비자)
onMessage(V 데이터, 소비자< K, V > 소비자)
AcknowledgegingConsumerAwareMessageListener 하나의 레코드가 처리됨에 따라
수동 커밋입니다.
KafkaConsumer 인스턴스에
직접적인 접근으로
제어할 수 있는 방법
onMessage(소비자 기록< K, V > 데이터, 확인 승인, 소비자 소비자)
onMessage(V 데이터, 확인 확인, 소비자< K, V > 소비자)
일괄 배치 메시지 리스너 한 번에 여러 레코드 처리 onMessage(소비자 기록< K, V > 데이터)
onMessage(목록< V > 데이터)
BatchAcknowledgingMessageListener 한 번에 여러 레코드 처리
수동 커밋
onMessage(소비자 기록< K, V > 데이터, 승인 승인)
onMessage(목록< V > 데이터, 승인 승인)
BatchConsumerAwareMessageListener KafkaConsumer 인스턴스에
직접적인 접근으로
제어할 수 있는 방법
onMessage(소비자 기록< K, V > 데이터, 소비자< K, V > 소비자)
onMessage(목록< V > 데이터, 소비자< K, V > 소비자)
BatchAcknowledgingConsumerAwareMessageListener 수동 커밋으로
KafkaConsumer 인스턴스에
직접적인 접근으로
제어할 수 있는 방법
onMessage(소비자 기록< K, V > 데이터, 확인 승인, 소비자< K, V > 소비자)
onMessage(목록< V > 데이터, 확인 승인, 소비자< K, V > 소비자)


– AcksMode 유형

* spring-kafka Consumer의 기본 AckMode는 BATCH이고 enable.auto.commit 옵션은 false로 설정되어 있습니다.

기록 레코드별 처리 후 커밋
일괄 poll() 메서드에 의해 호출된 모든 레코드가 처리된 후 커밋
시간 일정 시간 후 커밋
이 옵션을 사용하면
시간 간격을 선언하는 `AckTime` 옵션을 설정해야 합니다.
세다 특정 수의 레코드가 처리된 후 커밋
이 옵션을 사용할 때 레코드 수를 선언하는 `AckCount` 옵션을 설정해야 합니다.
COUNT_TIME TIME 및 COUNT 옵션 중 하나라도 충족되면 커밋
수동 Acknowledgement.acknowledge() 메서드가 호출될 때
다음 poll() 시간에 커밋합니다.
acknowledge() 메서드를 매번 호출하면 BATCH 옵션과 동일하게 작동합니다.
이 옵션을 사용하는 경우 AcknowledgingMessageListener 또는 BatchAcknowledgeMessageListener를 리스너로 사용해야 합니다.
MANUAL_IMMEDIATE Acknowledgement.acknowledge() 메서드를 호출한 직후 커밋합니다.
이 옵션을 사용하는 경우 AcknowledgingMessageListener 또는 BatchAcknowledgeMessageListener를 리스너로 사용해야 합니다.

참조
https://docs.spring.io/spring-kafka/