ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 08. 스프링 카프카 컨슈머(Spring Kafka Consumer)
    BackEnd/kafka 2021. 11. 6. 11:27
    반응형

      스프링 카프카 컨슈머는 기존 컨슈머를 2개의 타입으로 나누고 커밋을 7가지로 나누어 세분화 했습니다.

     

    1. 타입

    • 레코드 리스너(MessageListener): 단 1개의 레코드를 처리합니다. (스프링 카프카 컨슈머의 기본 리스너 타입)
    • 배치 리스너(BatchMessageListener): 한 번에 여러 개 레코드들을 처리합니다.

      매뉴얼 커밋(스프링 카프카에서는 커밋이라고 부르지 않고 'AckMode'라고 합니다. AcksMode: MANUAL, MANUAL_IMMEDIATE)을 사용할 경우 Acknowledging이 붙은 리스너를 사용하고, KafkaConsumer 인스턴스에 직접 접근하여 컨트롤하고 싶다면 ConsumerAware가 붙은 리스너를 사용하면 됩니다.

     

    2. 메시지 리스너 종류와 파라미터

    RECORD Type: Record 인스턴스 단위로 프로세싱

    리스너 이름 생성 메서드 파라미터 설명
    MessageListener onMessage(ConsumerRecord<K, V> data)

    onMessage(V data)
    오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우
    AcknowledgingMessageListener onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment)

    onMessage(V data, Acknowledgment acknowledgment)
    매뉴얼 커밋을 사용하는 경우
    ConsumerAwareMessageListener onMessage(ConsumerRecord<K, V> data, Consumer<K, V> consumer)

    onMessage(V data, Consumer<K, V> consumer)
    컨슈머 객체를 활용하고 싶은 경우
    AcknowledgingConsumerAwareMessageListener onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer)

    onMessage(V data, Acknowledgment acknowledgment, Consumer<K, V> consumer)
    매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우

    BATCH Type: Records 인스턴스 단위로 프로세싱

    리스너 이름 파라미터 설명
    BatchMessageListener onMessage(ConsumerRecords<K, V> data)

    onMessage(List<V> data)
    오토 커밋 ㄸ도는 컨슈머 컨테이너의 AckMode를 사용하는 경우
    BatchAcknowledgingMessageListener onMessage(ConsumerRecords<K, V> data, Acknowledgment acknowledgment)

    onMessage(List<V> data, Acknowledgment acknowledgment)
    매뉴얼 커밋을 사용하는 경우
    BatchConsumerAwareMessageListener onMessage(ConsumerRecords<K, V> data, Consumer<K, V> consumer)

    onMessage(List<V> data, Consumer<K, V> consumer)
    컨슈머 객체를 활용하고 싶은 경우
    BatchAcknowledgingConsumerAwareMessageListener onMessage(ConsumerRecords<K, V> data, Acknowledgment acknowledgment, Consumer<K, V> consumer)

    onMessage(List<V> data, Acknowledgment acknowledgment, Consumer<K, V> consumer)
    매뉴얼 커밋을 사용하고 컨슈머 객체를 활용하고 싶은 경우

     

    3. AcksMode

      스프링 카프카 컨슈머의 AckMode 기본값은 BATCH이고 컨슈머의 enable.auto.commit 옵션은 false로 지정됩니다.

    RECORD 레코드 단위로 프로세싱 이후 커밋
    BATCH poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋
    스프링 카프카 컨슈머의 AckMode 기본값
    TIME 특정 시간 이후에 커밋
    이 옵션을 사용할 경우 시간 간격을 선언하는 AckTime 옵션을 설정해야 한다.
    COUNT 특정 개수만큼 레코드가 처리된 이후 커밋
    이 옵션을 사용할 경우에는 레코드 개수를 선언하는 AckCount 옵션을 설정해야 한다.
    COUNT_TIME TIME, COUNT 옵션 중 맞는 조건이 하나라도 나올 경우 커밋
    MANUAL Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll() 때 커밋을 한다. 매번 acknowledge() 메서드를 호출하면 BATCH 옵션과 동일하게 동작한다. 이 옵션을 사용할 경우에는 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.
    MANUAL_IMMEDIATE Acknowledgement.acknowledge() 메서드를 호출한 즉시 커밋한다. 이 옵션을 사용할 경우에는 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.

     

    4. 기본 리스너 컨테이너

      기본 리스너 컨테이너는 기본 리스너 컨테이너 팩토리를 통해 생성된 리스너 컨테이너를 사용합니다. 기본 리스너 컨테이너를 사용할 때는 application.yaml에 컨슈머와 리스너 옵션을 넣고 사용할 수 있으며 설정한 옵션값은 애플리케이션이 실행될 때 자동으로 오버라이드되어 설정됩니다. 스프링 카프카에서 사용되는 옵션에 대한 설명은 스프링 공식 홈페이지에서 확인할 수 있습니다.

    # application.yaml 컨슈머와 리스너 옵션값
    spring.kafka.consumer.auto-commit-interval
    spring.kafka.consumer.auto-offset-reset
    spring.kafka.consumer.bootstrap-servers
    spring.kafka.consumer.client-id
    spring.kafka.consumer.enable-auto-commit
    spring.kafka.consumer.fetch-max-wait
    spring.kafka.consumer.fetch-min-size
    spring.kafka.consumer.group-id
    spring.kafka.consumer.heartbeat-interval
    spring.kafka.consumer.key-deserializer
    spring.kafka.consumer.max-poll-records
    spring.kafka.consumer.properties.*
    spring.kafka.consumer.value-deserializer
    
    spring.kafka.listener.ack-count
    spring.kafka.listener.ack-mode
    spring.kafka.listener.ack-time
    spring.kafka.listener.client-id
    spring.kafka.listener.concurrency
    spring.kafka.listener.idle-event-interval
    spring.kafka.listener.log-container-config
    spring.kafka.listener.monitor-interval
    spring.kafka.listener.no-poll-threshold
    spring.kafka.listener.poll-timeout
    spring.kafka.listener.type

     

    1) 레코드 리스너(MessageListener)

    # application.yaml
    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
        listener:
          type: RECORD

      리스너를 사용하기 위해서는 KafkaListener 어노테이션을 포함한 메서드를 선언해야 합니다. KafkaListener 어노테이션에 포함된 파라미터에 따라 메서드에 필요한 파라미터 종류가 달라집니다.

    package com.example;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.annotation.PartitionOffset;
    import org.springframework.kafka.annotation.TopicPartition;
    
    @SpringBootApplication
    public class SpringConsumerApplication {
        public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class);
    
    
        public static void main(String[] args) {
            SpringApplication application = new SpringApplication(SpringConsumerApplication.class);
            application.run(args);
        }
        
        // 가장 기본적인 리스너 선언
        @KafkaListener(topics = "test",
                groupId = "test-group-00")
        public void recordListener(ConsumerRecord<String,String> record) {
            // 메시지 키, 메시지 값에 대한 처리
            logger.info(record.toString());
        }
    
        // 메시지 값을 파라미터로 받는 리스너
        @KafkaListener(topics = "test",
                groupId = "test-group-01")
        public void singleTopicListener(String messageValue) {
            logger.info(messageValue);
        }
    
        // 개별 리스너에 카프카 컨슈머 옵션값 부여 (properties 옵션)
        @KafkaListener(topics = "test",
                groupId = "test-group-02", properties = {
                "max.poll.interval.ms:60000",
                "auto.offset.reset:earliest"
        })
        public void singleTopicWithPropertiesListener(String messageValue) {
            logger.info(messageValue);
        }
    
        // concurrency 옵션값에 해당하는 만큼 컨슈머 스레드 생성 (병렬처리)
        @KafkaListener(topics = "test",
                groupId = "test-group-03",
                concurrency = "3")
        public void concurrentTopicListener(String messageValue) {
            logger.info(messageValue);
        }
    
        // 특정 토픽의 특정 파티션 구독 (topicPartitions 파라미터 사용)
        // PartitionOffset: 특정 오프셋 지정, 그룹 아이디에 관계없이 항상 설정한 오프셋의 데이터부터 가져옴
        @KafkaListener(topicPartitions =
                {
                        @TopicPartition(topic = "test01", partitions = {"0", "1"}),
                        @TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3"))
                },
                groupId = "test-group-04")
        public void listenSpecificPartition(ConsumerRecord<String, String> record) {
            logger.info(record.toString());
        }
    }

     

    2) 배치 리스너(BatchMessageListener)

    # application.yaml
    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
        listener:
          type: BATCH

      배치 리스너는 레코드 리스너와 다르게 KafkaListener로 사용되는 메서드의 파라미터를 List 또는 ConsumerRecords로 받습니다.

    package com.example;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    
    import java.util.List;
    
    @SpringBootApplication
    public class SpringConsumerApplication {
        public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class);
    
        public static void main(String[] args) {
            SpringApplication application = new SpringApplication(SpringConsumerApplication.class);
            application.run(args);
        }
    
        // ConsumerRecords
        @KafkaListener(topics = "test",
                groupId = "test-group-01")
        public void batchListener(ConsumerRecords<String, String> records) {
            records.forEach(record -> logger.info(record.toString()));
        }
    
        // 메시지 값들을 List 자료구조로 받아서 처리
        @KafkaListener(topics = "test",
                groupId = "test-group-02")
        public void batchListener(List<String> list) {
            list.forEach(recordValue -> logger.info(recordValue));
        }
    
        // concurrency = "3": 3개의 컨슈머 스레드 생성
        @KafkaListener(topics = "test",
                groupId = "test-group-03",
                concurrency = "3")
        public void concurrentBatchListener(ConsumerRecords<String, String> records) {
            records.forEach(record -> logger.info(record.toString()));
        }
    
    }

     

    3) 배치 커밋 리스너(BatchAcknowledgingMessageListener)와 배치 컨슈머 리스터(BatchConsumerAwareMessageListener)

      동기, 비동기 커밋이나 컨슈머 인스턴스에서 제공하는 메서드들을 활용하고 싶다면 배치 컨슈머 리스너를 사용하고, 컨슈머 컨테이너에서 관리하는 AckMode를 사용하여 커밋하고 싶다면 배치 커밋 리스너를 사용하면 됩니다. 만약 AckMode도 사용하고 컨슈머도 사용하고 싶다면 배치 커밋 컨슈머 리스너(BatchAcknowledgingConsumerAwareMessageListener)를 사용하면 됩니다.

    # application.yaml
    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
        listener:
          type: BATCH
          ack-mode: MANUAL_IMMEDIATE
    package com.example;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    
    @SpringBootApplication
    public class SpringConsumerApplication {
        public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class);
    
    
        public static void main(String[] args) {
            SpringApplication application = new SpringApplication(SpringConsumerApplication.class);
            application.run(args);
        }
    
        // BatchAcknowledgingMessageListener
        @KafkaListener(topics = "test", groupId = "test-group-01")
        public void commitListener(ConsumerRecords<String, String> records, Acknowledgment ack) {
            records.forEach(record -> logger.info(record.toString()));
            ack.acknowledge(); // 커밋 수행
        }
    
        // BatchConsumerAwareMessageListener: 리스너가 커밋하지 않도록 AckMode: MANUAL or MANUAL_IMMEDIATE 설정
        @KafkaListener(topics = "test", groupId = "test-group-02")
        public void consumerCommitListener(ConsumerRecords<String, String> records, Consumer<String, String> consumer) {
            records.forEach(record -> logger.info(record.toString()));
            consumer.commitSync();
        }
    }

     

    5. 커스텀 리스너 컨테이너

      서로 다른 설정을 가진 2개 이상의 리스너를 구현하거나 리밸런스 리스너를 구현하기 위해서는 커스텀 리스너 컨테이너를 사용해야 합니다. 커스텀 리스너 컨테이너를 만들기 위해서 스프링 카프카에서 카프카 리스너 컨테이너 팩토리(KafkaListenerContainerFactory) 인스턴스를 생성해야 합니다. 카프카 리스너 컨테이너 팩토리를 빈으로 등록하고 KafkaListener 어노테이션에서 커스텀 리스너 컨테이너 팩토리를 등록하면 커스텀 리스너 컨테이너를 사용할 수 있습니다.

    package com.example;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.*;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class ListenerContainerConfiguration {
    
        // customContainerFactory메서드명: KafkaListener 어노테이션 컨테이너 팩토리 등록 시 사용
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> customContainerFactory() {
    
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            // DefaultKafkaConsumerFactory: 컨슈머 기본 옵션을 설정하는 용도
            DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props);
    
            // 리스너 컨테이너를 만들기 위해 사용 (2개 이상의 컨슈머 리스너를 만들 때 사용)
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            
            // 리밸런스 리스너를 선언하기 위해 setConsumerRebalanceListener 메서드 호출
            factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
                @Override
                public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                  // 커밋이 되기 전, 리밸런스 발생 시 호출된다.
                }
    
                @Override
                public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                  // 커밋이 일어난 이후, 리밸런스 발생 시 호출된다.
                }
    
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    
                }
    
                @Override
                public void onPartitionsLost(Collection<TopicPartition> partitions) {
    
                }
            });
            factory.setBatchListener(false);
            // AckMode 설정
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
            
            // 컨슈머 설정값을 가지고 있는 DefaultKafkaConsumerFactory 인스턴스를 팩토리에 설정
            factory.setConsumerFactory(cf);
            return factory;
        }
    }
    package com.example;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    
    @SpringBootApplication
    public class SpringConsumerApplication {
        public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class);
    
        public static void main(String[] args) {
            SpringApplication application = new SpringApplication(SpringConsumerApplication.class);
            application.run(args);
        }
    
        @KafkaListener(topics = "test",
                groupId = "test-group",
                containerFactory = "customContainerFactory")
        public void customListener(String data) {
            logger.info(data);
        }
    }

     

    반응형

    댓글

Designed by Tistory.