ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 05. 카프카 컨슈머(Kafka Consumer)
    BackEnd/kafka 2021. 11. 4. 21:10
    반응형

      프로듀서가 전송한 데이터는 카프카 브로커에 적재됩니다. 컨슈머는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 합니다. 컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈 때, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능합니다. 그리고 1개 컨슈머는 여러 개의 파티션에 할당될 수 있습니다. 이러한 특징으로 컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야 합니다. 컨슈머 그룹의 컨슈머에 장애가 발생하면 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어갑니다. 이러한 과정을 '리밸런싱(rebalancing)'이라고 합니다. 리밸런싱은 컨슈머가 데이터를 처리하는 도중에 언제든지 발생할 수 있으므로 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 작성해야 합니다.

     

      컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋(commit)을 통해 기록합니다. 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽(__consumer_offsets)에 기록됩니다. 오프셋 커밋이 기록되지 못했다면 데이터 처리의 중복이 발생할 수 있습니다. 오프셋 커밋은 컨슈머 애플리케이션에서 명시적, 비명시적으로 수행할 수 있습니다. 기본 옵션은 비명시 '오프셋 커밋'으로 poll() 메서드가 수행될 때 일정 간격(auto.commit.interval.ms에 설정된 값 이상이 지났을 때)마다 오프셋을 커밋하도록 enable.auto.commit=true로 설정되어 있습니다. 비명시 오프셋은 편리하지만, 데이터 중복이나 유실을 허용하지 않는 서비스라면 명시적으로 오프셋을 커밋해야 합니다. 명시적으로 오프셋을 커밋하려면 poll() 메서드 호출 이후 반환받은 데이터의 처리가 완료되고 commitSync(), commitAsync() 메서드를 호출하면 됩니다. commitSync() 메서드는 동기 방식으로 데이터 처리량이 줄어들고, commitAsync()는 비동기 방식으로 데이터 처리량은 향상되지만 커밋 요청이 실패했을 경우 현재 처리 중인 데이터의 순서를 보장하지 않으며 데이터의 중복 처리가 발생할 수 있습니다.

     

      컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가지고 있습니다. 따라서 카프카 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지 않게 처리할 수 있다는 장점을 가집니다. 예를 들어, 실시간 리소스를 시간순으로 확인하기 위해 데이터를 엘라스틱서치에 저장하고 동시에 대용량 적재를 위해 하둡에 적재할 경우 각기 다른 저장소에 저장하는 컨슈머를 다른 컨슈머 그룹으로 묶음으로써 각 저장소의 장애에 격리되어 운영할 수 있습니다.

     

    1. 멀티 스레드 컨슈머

      카프카는 처리량을 늘리기 위해 파티션과 컨슈머 개수를 늘려서 운영할 수 있습니다. 데이터를 병렬처리하기 위해 파티션 개수와 컨슈머 개수를 동일하게 맞추는 것이 가장 좋은 방법입니다.

      멀티코어 CPU를 가진 가상/물리 서버 환경에서 멀티 컨슈머 스레드를 운영하면 제한된 리소스 내에서 최상의 성능을 발휘할 수 있습니다. 단, 레코드 처리에 있어 중복이 발생하거나 데이터의 역전현상이 발생할 수 있습니다. 스레드의 생성은 순서대로 진행되지만 처리 시간은 다를 수 있기 때문입니다.

      컨슈머를 멀티 스레드로 활용하는 방식은 크게 두 가지로 나뉩니다. 첫 번째는 컨슈머 스레드는 1개만 실행하고 데이터 처리를 담당하는 워커 스레드(worker thread)를 여러 개 실행하는 방법인 멀티 워커 스레드 전략입니다. 두 번째는 컨슈머 인스턴스에서 poll() 메서드를 호출하는 스레드를 여러 개 띄워서 사용하는 컨슈머 멀티 스레드 전략입니다. 아래 샘플코드에 대한 상세 내용은 이후 게시물인 카프카 클라이언트를 참고하시면 됩니다.

     

    1) 카프카 컨슈머 멀티 워커 스레드 전략

    package com.example;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class ConsumerWorker implements Runnable {
    
        private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
        private String recordValue;
    
        ConsumerWorker(String recordValue) {
            this.recordValue = recordValue;
        }
    
        @Override
        public void run() {
            // 데이터 처리 로직
            logger.info("thread:{}\trecord:{}", Thread.currentThread().getName(), recordValue);
        }
    }
    package com.example;
    
    import com.google.gson.Gson;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ConsumerWithMultiWorkerThread {
        private final static Logger logger = LoggerFactory.getLogger(ConsumerWithMultiWorkerThread.class);
        private final static String TOPIC_NAME = "test";
        private final static String BOOTSTRAP_SERVERS = "localhost:9092";
        private final static String GROUP_ID = "test-group";
    
        public static void main(String[] args) {
            Properties configs = new Properties();
            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
            // 작업 이후 스레드가 종료되도록 CachedThreadPool을 사용하여 스레드를 실행합니다.
            ExecutorService executorService = Executors.newCachedThreadPool();
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
                for (ConsumerRecord<String, String> record : records) {
                    // 스레드 개별 생성
                    ConsumerWorker worker = new ConsumerWorker(record.value());
                    executorService.execute(worker);
                }
            }
        }
    }

     

    2) 카프카 컨슈머 멀티 스레드 전략

      1개의 애플리케이션에 n개 컨슈머 스레드를 띄우는 방법입니다. 컨슈머 스레드를 늘려서 운영하면 각 스레드에 각 파티션이 할당되며, 파티션의 레코드들을 병렬처리할 수 있습니다.

    package com.example;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.errors.WakeupException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class ConsumerWorker implements Runnable {
        private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
        private Properties prop;
        private String topic;
        private String threadName;
        private KafkaConsumer<String, String> consumer;
    
        ConsumerWorker(Properties prop, String topic, int number) {
            this.prop = prop;
            this.topic = topic;
            this.threadName = "consumer-thread-" + number;
        }
    
        @Override
        public void run() {
            // KafkaConsumer는 스레드 세이프하지 않으므로 스레드별 인스턴스를 생성합니다.
            consumer = new KafkaConsumer<>(prop);
            consumer.subscribe(Arrays.asList(topic)); // 구독
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    // 데이터 처리 로직
                    logger.info("{}", record);
                }
                consumer.commitSync();
            }
        }
    }
    package com.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class MultiConsumerThread {
    
        private final static String TOPIC_NAME = "test";
        private final static String BOOTSTRAP_SERVERS = "localhost:9092";
        private final static String GROUP_ID = "test-group";
        private final static int CONSUMER_COUNT = 3; // 스레드 개수
    
        public static void main(String[] args) {
            // 컨슈머 옵션 지정
            Properties configs = new Properties();
            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < CONSUMER_COUNT; i++) {
                ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
                executorService.execute(worker);
            }
        }
    }

     

    2. 컨슈머 랙

      컨슈머 랙(LAG)은 토픽의 최신 오프셋(LOG-END-OFFSET)과 컨슈머 오프셋(CURRENT-OFFSET) 간의 차이입니다. 프로듀서는 계속해서 새로운 데이터를 파티션에 저장하고 컨슈머는 자신이 처리할 수 있는 만큼 데이터를 가져갑니다. 컨슈머 랙은 컨슈머가 정상 동작하는지 여부를 확인할 수 있기 때문에 컨슈머 애플리케이션을 운영한다면 필수적으로 모니터링해야 하는 지표입니다. 컨슈머 랙은 컨슈머 그룹과 토픽, 파티션별로 생성됩니다. 1개의 토픽에 3개의 파티션이 있고 1개의 컨슈머 그룹이 토픽을 구독하여 데이터를 가져가면 컨슈머 랙은 총 3개가 됩니다.

      프로듀서가 보내는 데이터 양이 컨슈머의 데이터 처리량보다 크다면 컨슈머 랙은 늘어납니다. 반대로 프로듀서가 보내는 데이터양이 컨슈머의 데이터 처리량보다 적으면 컨슈머 랙은 줄어들고 최솟값은 0으로 지연이 없음을 뜻합니다.

     

    [컨슈머 랙을 확인하는 방법]

    1) 카프카 명령어를 사용하여 컨슈머 랙 조회 (일회성)

    bin/kafka-consumer-group.sh --bootstrap-server localhost:9092 \
    --group my-group --describe

     

    2) 컨슈머 metrics() 메서드를 사용하여 컨슈머 랙 조회

      컨슈머 애플리케이션에서 KafkaConsumer 인스턴스의 metrics() 메서드를 활용하면 컨슈머 랙 지표를 확인할 수 있습니다. 컨슈머 인스턴스가 제공하는 컨슈머 랙 관련 모니터링 지표는 3가지로 records-lag-max, records-lag, records-lag-avg입니다.

      (1) 컨슈머 애플리케이션 비정상적 종료 시 모니터링 불가, (2) 모든 컨슈머 애플리케이션에 모니터링 코드 중복 작성, (3) 컨슈머 랙 모니터링 코드를 추가할 수 없는 카프카 서드 파티(third party) 애플리케이션의 컨슈머 랙 모니터링 불가능의 문제점을 가집니다.

     

    3) 외부 모니터링 툴을 사용하여 컨슈머 랙 조회 (최선의 방법)

      데이터독(Datadog), 컨플루언트 컨트롤 센터(Confluent Control Center)와 같은 카프카 클러스터 종합 모니터링 툴을 사용하면 카프카 운영에 필요한 다양한 지표(클러스터 모니터링, 컨슈머 랙 등)를 모니터링할 수 있습니다. 컨슈머 랙 모니터링만을 위한 툴로 오픈소스로 공개되어 있는 버로우(Burrow)가 있습니다. 버로우는 링크드인에서 개발한 툴로 REST API를 통해 컨슈머 그룹별로 컨슈머 랙을 확인할 수 있습니다. 그러나 모니터링을 위해 컨슈머 랙 지표를 수집, 적재, 알람 설정을 하고 싶다면 별도의 저장소와 대시보드를 구축해야 합니다.

     

    (참고) 무료 컨슈머 랙 모니터링 아키텍처

    • 버로우 : REST API를 통해 컨슈머 랙을 조회
    • 텔레그래프 : 데이터 수집 및 전달에 특화된 툴로 버로우를 조회하여 데이터를 엘라스틱서치에 전달
    • 엘라스틱서치 : 컨슈머 랙 정보를 담는 저장소
    • 그라파타 : 엘라스틱서치의 정보를 시각화하고 특정 조건에 따라 슬랙 알람을 보낼 수 있는 웹 대시보드 툴

    (참고) 컨슈머 애플리케이션 무중단 배포

    • 블루/그린 배포 : 이전 버전 애플리케이션과 신규 버전 애플리케이션을 동시에 띄어놓고 트래픽을 전환하는 방법
    • 롤링 배포 : 2개의 인스턴스 중 1개의 인스턴스를 신규 버전으로 실행하고 모니터링한 이후에 나머지 1개의 인스턴스를 신규 버전으로 배포하여 롤링 업그레이드를 진행하는 방법
    • 카나리 배포 : 소수 파티션에 신규 버전 카프카 애플리케이션을 할당함으로 사전 테스트를 진행하는 방법
    반응형

    댓글

Designed by Tistory.