(SpringBoot) (Kotlin) (Kafka) 기본 Kafka 소비자

안녕하세요. 저는 홈디벨로퍼 금입니다.


최근에는 Kafka가 발행한 메시지를 외부 시스템에서 수신하는 기능을 개발했습니다.


(지금까지는 SQS SQS-SNS를 주로 사용했는데 회사 정책 변경으로 인해 kafka로 마이그레이션 중입니다.

)

Spring Boot를 사용하지 않는 상황에서는 Bean을 등록하여 사용해야 합니다.

하지만 요즘은 대부분의 시스템이 Spring Boot로 구축되기 때문에,

설정이 쉽고 옵션을 변경하기 쉽습니다.

나는 주로 @KafkaListener를 사용합니다.

이 기사에서는 @KafkaListener를 사용하여 구현을 요약합니다.

Kotlin + SpringBoot + kafka 소비자의 기본 사용법

가장 먼저 @KafkaListener를 사용하여 기본 소비자를 구현해 봅시다.


– 먼저 spring-kafka 라이브러리 추가
* Spring Boot를 사용할 때(프로젝트를 생성하기 위해 start.spring.io를 사용하지 않음) 버전을 생략하면 Boot는 자동으로 Boot 버전과 호환되는 올바른 버전을 가져옵니다.

compile 'org.springframework.kafka:spring-kafka'

– application.yml 설정

기본 리스너 컨테이너를 사용하는 경우 yaml 구성 파일에서 소비자 및 리스너 옵션을 설정할 수 있으며 옵션 값은 애플리케이션 실행 시 자동으로 재정의됩니다.

Spring Kafka에서 사용하는 옵션에 대한 설명은 다음과 같습니다.

스프링 공식 홈페이지에서 찾을 수 있습니다

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092 //kafka 메시지를 송신하는 서버 URL
    listener:
      type: BATCH
      ack-mode: MANUAL_IMMEDIATE

* 소비자 옵션

더보기

spring.kafka.consumer.auto-commit-interval ‘enable.auto.commit’이 true로 설정된 경우 소비자 오프셋이 Kafka에 자동 커밋되는 빈도입니다.

spring.kafka.consumer.auto-offset-reset Kafka에 초기 오프셋이 없거나 현재 오프셋이 서버에 더 이상 존재하지 않는 경우 수행할 작업입니다.

spring.kafka.consumer.bootstrap-servers Kafka 클러스터에 대한 초기 연결을 설정하는 데 사용할 호스트:포트 쌍의 쉼표로 구분된 목록입니다.

소비자의 전역 속성을 재정의합니다.

spring.kafka.consumer.client-id 요청 시 서버에 전달할 ID입니다.

서버 측 로깅에 사용됩니다.

spring.kafka.consumer.enable-auto-commit 소비자의 오프셋이 백그라운드에서 주기적으로 커밋되는지 여부입니다.

spring.kafka.consumer.fetch-max-wait “fetch-min-size”에 의해 주어진 요구 사항을 즉시 충족하기에 충분한 데이터가 없는 경우 가져오기 요청에 응답하기 전에 서버가 차단하는 최대 시간입니다.

spring.kafka.consumer.fetch-min-size 가져오기 요청에 대해 서버가 반환해야 하는 최소 데이터 양입니다.

spring.kafka.consumer.group-id 이 소비자가 속한 소비자 그룹을 식별하는 고유한 문자열입니다.

spring.kafka.consumer.heartbeat-간격 소비자 조정자에 대한 하트비트 사이의 예상 시간입니다.

spring.kafka.consumer.isolation-level 트랜잭션 방식으로 작성된 메시지를 읽기 위한 격리 수준입니다.


(기본: 커밋되지 않은 읽기)
spring.kafka.consumer.key-deserializer 키에 대한 Deserializer 클래스입니다.

spring.kafka.consumer.max-poll-records poll()에 대한 단일 호출에서 반환되는 최대 레코드 수입니다.

spring.kafka.consumer.properties.* 클라이언트를 구성하는 데 사용되는 추가 소비자별 속성입니다.

spring.kafka.consumer.security.protocol 브로커와 통신하는 데 사용되는 보안 프로토콜입니다.

spring.kafka.consumer.ssl.key-암호 키 저장소 키 또는 키 저장소 파일의 개인 키에 대한 비밀번호입니다.

spring.kafka.consumer.ssl.key-store-certificate-chain X.509 인증서 목록이 있는 PEM 형식의 인증서 체인.
spring.kafka.consumer.ssl.key-store-key PKCS#8 키가 있는 PEM 형식의 개인 키.
spring.kafka.consumer.ssl.key-store-location 키 저장소 파일의 위치입니다.

spring.kafka.consumer.ssl.key-store-password 키 저장소 파일의 비밀번호를 저장합니다.

spring.kafka.consumer.ssl.key-store-type 키 저장소의 유형입니다.

spring.kafka.consumer.ssl.protocol 사용할 SSL 프로토콜입니다.

spring.kafka.consumer.ssl.trust-store-certificates X.509 인증서를 포함하는 PEM 형식의 신뢰할 수 있는 인증서입니다.

spring.kafka.consumer.ssl.trust-store-location truststore 파일의 위치입니다.

spring.kafka.consumer.ssl.trust-store-password truststore 파일의 암호를 저장합니다.

spring.kafka.consumer.ssl.trust-store-type 신뢰 저장소의 유형입니다.

spring.kafka.consumer.value-deserializer 값에 대한 Deserializer 클래스입니다.

* 리스너 옵션

더보기

spring.kafka.listener.ack-count ackMode가 “COUNT” 또는 “COUNT_TIME”인 경우 오프셋 커밋 사이의 레코드 수입니다.

spring.kafka.listener.ack 모드 리스너 AckMode. spring-kafka 문서를 참조하십시오.
spring.kafka.listener.ack-시간 ackMode가 “TIME” 또는 “COUNT_TIME”인 경우 오프셋 커밋 사이의 시간입니다.

spring.kafka.listener.async-acks 비동기 기록 확인 지원. spring.kafka.listener.ack-mode가 manual 또는 manual-immediate인 경우에만 적용됩니다.

spring.kafka.listener.client-id 수신기 소비자 client.id 속성의 접두사입니다.

spring.kafka.listener.concurrency 리스너 컨테이너에서 실행할 스레드 수입니다.

spring.kafka.listener.idle-between-polls Consumer.poll(Duration)에 대한 호출 사이의 휴면 간격. (기본: 0)
spring.kafka.listener.idle 이벤트 간격 유휴 소비자 이벤트 게시 사이의 시간입니다(수신된 데이터 없음).
spring.kafka.listener.idle-파티션-이벤트-간격 유휴 파티션 소비자 이벤트 게시물 사이의 시간입니다(파티션에 대해 수신된 데이터 없음).
spring.kafka.listener.immediate-stop 현재 레코드가 처리된 후 또는 이전 폴링의 모든 레코드가 처리된 후 컨테이너가 중지되는지 여부입니다.

(기본: 거짓)
spring.kafka.listener.log-container-config 초기화 중 컨테이너 구성을 기록할지 여부(INFO 수준).
spring.kafka.listener.missing-topics-fatal 하나 이상의 구성된 항목이 브로커에 없는 경우 컨테이너 시작 실패 여부입니다.

(기본: 거짓)
spring.kafka.listener.monitor-interval 응답하지 않는 소비자에 대한 확인 사이의 시간입니다.

마침표 접미사를 지정하지 않으면 초가 사용됩니다.

spring.kafka.listener.no-poll-threshold 소비자가 응답하지 않는지 확인하기 위해 “pollTimeout”에 적용되는 승수입니다.

spring.kafka.listener.poll-timeout 소비자를 폴링할 때 사용할 제한 시간입니다.

spring.kafka.listener.type 리스너 유형. (기본: 하나의)

@KafkaListener 사용

@Component
class ConsumerTest {
    @KafkaListener(topics = "topic-name", groupId = "group-01")
    public void onMessage(kafkaSchema: kafkaSchema, Acknowledgment ack) {
        //kafkaSchema 사용 처리
        ack.acknowledge(); // 커밋 수행
    }
}

data class KafkaSchema(
    val name: String,
    val columnNames: List<String> = listOf() 
)

리스너 유형 및 커밋 모드(AcksMode)

이전에 기본 소비자를 구현했습니다.

@kafkaListener 생성 방식의 파라미터는 리스너 유형과 커밋 방식에 따라 변경할 수 있습니다.

그리고 커밋 모드(AcksMode)에 따라 커밋 방식을 선택할 수 있습니다.

– 리스너 유형 및 리스너 유형

  • 레코드 리스너(MessageListener): 1개의 레코드만 처리합니다.

    (Spring Kafka 소비자의 기본 리스너 유형)
  • Batch Listener(BatchMessageListener): 한 번에 여러 레코드를 처리합니다.

기록 메시지 리스너 한 번에 1개 레코드 처리 onMessage(소비자 기록< K, V > 데이터)
onMessage(V 데이터)
확인 메시지 리스너 하나의 레코드가 처리됨에 따라
수동 커밋
onMessage(소비자 기록< K, V > 데이터, 승인 승인)
onMessage(V 데이터, 확인 확인)
ConsumerAwareMessageListener 하나의 레코드가 처리됨에 따라
KafkaConsumer 인스턴스에
직접적인 접근으로
제어할 수 있는 방법
onMessage(소비자 기록< K, V > 데이터, 소비자< K, V > 소비자)
onMessage(V 데이터, 소비자< K, V > 소비자)
AcknowledgegingConsumerAwareMessageListener 하나의 레코드가 처리됨에 따라
수동 커밋입니다.


KafkaConsumer 인스턴스에
직접적인 접근으로
제어할 수 있는 방법
onMessage(소비자 기록< K, V > 데이터, 확인 승인, 소비자 소비자)
onMessage(V 데이터, 확인 확인, 소비자< K, V > 소비자)
일괄 배치 메시지 리스너 한 번에 여러 레코드 처리 onMessage(소비자 기록< K, V > 데이터)
onMessage(목록< V > 데이터)
BatchAcknowledgingMessageListener 한 번에 여러 레코드 처리
수동 커밋
onMessage(소비자 기록< K, V > 데이터, 승인 승인)
onMessage(목록< V > 데이터, 승인 승인)
BatchConsumerAwareMessageListener KafkaConsumer 인스턴스에
직접적인 접근으로
제어할 수 있는 방법
onMessage(소비자 기록< K, V > 데이터, 소비자< K, V > 소비자)
onMessage(목록< V > 데이터, 소비자< K, V > 소비자)
BatchAcknowledgingConsumerAwareMessageListener 수동 커밋으로
KafkaConsumer 인스턴스에
직접적인 접근으로
제어할 수 있는 방법
onMessage(소비자 기록< K, V > 데이터, 확인 승인, 소비자< K, V > 소비자)
onMessage(목록< V > 데이터, 확인 승인, 소비자< K, V > 소비자)


– AcksMode 유형

* spring-kafka Consumer의 기본 AckMode는 BATCH이고 enable.auto.commit 옵션은 false로 설정되어 있습니다.

기록 레코드별 처리 후 커밋
일괄 poll() 메서드에 의해 호출된 모든 레코드가 처리된 후 커밋
시간 일정 시간 후 커밋
이 옵션을 사용하면
시간 간격을 선언하는 `AckTime` 옵션을 설정해야 합니다.

세다 특정 수의 레코드가 처리된 후 커밋
이 옵션을 사용할 때 레코드 수를 선언하는 `AckCount` 옵션을 설정해야 합니다.

COUNT_TIME TIME 및 COUNT 옵션 중 하나라도 충족되면 커밋
수동 Acknowledgement.acknowledge() 메서드가 호출될 때
다음 poll() 시간에 커밋합니다.


acknowledge() 메서드를 매번 호출하면 BATCH 옵션과 동일하게 작동합니다.


이 옵션을 사용하는 경우 AcknowledgingMessageListener 또는 BatchAcknowledgeMessageListener를 리스너로 사용해야 합니다.

MANUAL_IMMEDIATE Acknowledgement.acknowledge() 메서드를 호출한 직후 커밋합니다.


이 옵션을 사용하는 경우 AcknowledgingMessageListener 또는 BatchAcknowledgeMessageListener를 리스너로 사용해야 합니다.

참조
https://docs.spring.io/spring-kafka/