ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 06. 카프카 클라이언트(Kafka Client with JAVA)
    BackEnd/kafka 2021. 11. 5. 09:35
    반응형

      카프카 클러스터에 명령을 내리거나 데이터를 송수신하기 위해 카프카 프로듀서, 컨슈머, 어드민 클라이언트를 제공하는 카프카 클라이언트를 사용하여 애플리케이션을 개발합니다. 카프카 클라이언트는 라이브러리이기 때문에 자체 라이프사이클을 가진 프레임워크나 애플리케이션 위에서 구현하고 실행해야 합니다. 아래 자바 애플리케이션에 대한 전체 소스코드는 각 제목에 링크가 걸려 있습니다.

     

    1. 프로듀서 API

      아래 SimpleProducer.java는 프로듀서의 가장 기본적인 형태로 개발된 애플리케이션입니다. 프로듀서 사용환경에 따라 특정 데이터를 가지는 레코드를 특정 파티션으로 보내야 하는 경우(kafka-producer-custom-partitioner)나 브로커 정상 전송 여부를 확인하는 프로듀서가 필요한 경우(동기 : kafka-producer-sync-callback, 비동기 : kafka-producer-async-callback)는 링크가 걸린 제목을 눌러 github의 소스코드를 참고하면 됩니다.

     

    1) 라이브러리 추가

    // build.gradle
    dependencies {
        compile 'org.apache.kafka:kafka-clients:2.5.0'
        compile 'org.slf4j:slf4j-simple:1.7.30'
    }

    2) SimpleProducer.java 파일 생성

    package com.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Properties;
    
    public class SimpleProducer {
        private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
        private final static String TOPIC_NAME = "test";
        private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    
        public static void main(String[] args) {
    
            // Properties에는 KafkaProducer 인스턴스를 생성하기 위한 프로듀서 옵션들을 key/value 값으로 선언
            Properties configs = new Properties();
            configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            
            // 메시지 키, 값을 직렬화하기 위한 직렬화 클래스 선언
            configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // KafkaProducer 인스턴스 생성
            KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
    
            String messageValue = "testMessage";
            
            // 카프카 브로커에 데이터를 보내기 위해 ProducerRecord 생성
            // 생성자의 2개 제네릭 값(메시지 키/값) 타입은 직렬화 클래스와 동일하게 설정 <String, String>
            // ProducerRecord 생성 시 추가 파라미터를 사용하여 파티션 번호, 타임스탬프, 메시지 키 설정 가능
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
            // 메시지 키를 가진 ProducerRecord 생성
            // ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "23");
            
            // 프로듀서 전송 : 배치 전송(프로듀서 내부에 가지고 있다가 배치 형태로 묶어서 브로커에 전송)
            producer.send(record);
            logger.info("{}", record);
            
            // flush를 통해 프로듀서 내부 버퍼에 가지고 있던 레코드 배치를 브로커로 전송
            producer.flush();
            
            // producer 인스턴스의 리소스들 종료
            producer.close();
        }
    }

    3) 토픽 생성

    $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create \
    --topic test \
    --partitions 3

    4) 애플리케이션 실행 후 토픽 데이터 확인

    $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    
    # 메시지 키 확인
    $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic test \
    --property print.key=true \
    --property key.separator="-" \
    --from-beginning

     

      프로듀서 애플리케이션을 실행할 때 설정해야 하는 필수 옵션과 선택 옵션이 있습니다. 주요 옵션은 아래와 같습니다. 추가 옵션에 대한 내용은 카프카 공식 도큐먼트 페이지에서 확인할 수 있습니다.

     

    필수 옵션

    • bootstrap.servers: 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성합니다.
    • key.serializer: 레코드의 메시지 키를 직렬화하는 클래스를 지정합니다.
    • value.serializer: 레코드의 메시지 값을 직렬화하는 클래스를 지정합니다.

     

    선택 옵션

    • acks: 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는 데에 사용하는 옵션입니다. 0, 1, -1(all) 중 하나로 설정할 수 있으며 기본값은 1로 리더 파티션에 데이터가 저장되면 전송 성공으로 판단합니다. 상세 내용은 04. 카프카 프로듀서에서 확인할 수 있습니다.
    • buffer.memory: 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리양을 지정합니다. 기본값은 33554432(32MB)입니다.
    • retries: 프로듀서가 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수를 지정합니다. 기본값은 2147483647입니다.
    • batch.size: 배치로 전송할 레코드 최대 용량을 지정합니다. 너무 작게 설정하면 프로듀서가 브로커로 더 자주 보내기 때문에 네트워크 부담이 있고 너무 크게 설정하면 메모리를 더 많이 사용하게 되는 점을 주의해야 합니다. 기본값은 16384입니다.
    • linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간입니다. 기본값은 0입니다.
    • partitioner.class: 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정합니다. 기본값은 org.apache.kafka.clients.producer.internals.DefaultPartitioner입니다.
    • enable.idempotence: 멱등성 프로듀서로 동작할지 여부를 설정합니다.
    • transactional.id: 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 설정합니다.

     

    2. 컨슈머 API

      아래 SimpleConsumer.java는 기본 설정으로 생성할 수 있는 오토 커밋(auto commit) 카프카 컨슈머 애플리케이션입니다. 동기 오프셋 커밋(kafka-consumer-sync-commit), 개별 레코드 단위 오프셋 커밋(kafka-consumer-sync-offset-commit), 비동기 오프셋 커밋(kafka-consumer-async-commit)에 대한 전체 소스코드는 링크가 걸린 제목을 눌러 확인하면 됩니다.

    package com.example;
    
    import com.google.gson.Gson;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class SimpleConsumer {
        private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
        private final static String TOPIC_NAME = "test";
        private final static String BOOTSTRAP_SERVERS = "localhost:9092";
        private final static String GROUP_ID = "test-group";
    
        public static void main(String[] args) {
            Properties configs = new Properties();
            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            // 컨슈머 그룹
            configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
            // 역직렬화 클래스 지정 (반드시 프로듀서에서 직렬화한 타입으로 역직렬화해야 함)
            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // KafkaConsumer 인스턴스 생성
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
    
            // 컨슈머에게 토픽 할당 (1개 이상의 토픽 이름을 받을 수 있음)
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
            /* 1) 0번 파티션을 할당하여 레코드들을 가져오는 구문
                  > 직접 컨슈머가 특정 토픽/파티션에 할당되므로 리밸런싱 과정 없음
               consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, 0)));
               
               2) 컨슈머에 할당된 파티션 확인 방법
               Set<TopicPartition> assignedTopicPartition = consumer.assignment();
            */
    
            while (true) {
                // poll() 메서드를 호출하여 데이터를 가져옴
                // Duration 타입 인자 : 데이터를 가져올 때 컨슈머 버퍼에 데이터를 기다리기 위한 타임아웃 간격
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                
                // 데이터 처리
                for (ConsumerRecord<String, String> record : records) {
                    logger.info("record:{}", record);
                }
                /* 동기 오프셋 커밋 consumer.commitSync();
                   비동기 오프셋 커밋 1) consumer.commitAsync();
                   2)
                   consumer.commitAsync(new OffsetCommitCallback() {
                       public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
                           if (e != null)
                               System.err.println("Commit failed");
                           else
                               System.out.println("Commit succeeded");
                           if (e != null)
                               logger.error("Commit failed for offsets {}", offsets, e);
                       }
                   });
                */
            }
        }
    }

     

      컨슈머 애플리케이션을 실행할 때 설정해야 하는 필수 옵션과 선택 옵션이 있습니다. 주요 옵션은 아래와 같습니다. 추가 옵션에 대한 내용은 카프카 공식 도큐먼트 페이지에서 확인할 수 있습니다.

     

    필수 옵션

    • bootstrap.servers: 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성합니다.
    • key.deserializer: 레코드의 메시지 키를 역직렬화하는 클래스를 지정합니다.
    • value.deserializer: 레코드의 메시지 값을 역직렬화하는 클래스를 지정합니다.

     

    선택 옵션

    • group.id: 컨슈머 그룹 아이디를 지정합니다. subscribe() 메서드로 토픽을 구독하여 사용할 때 이 옵션을 필수로 넣어야 합니다. 기본값은 null입니다.
    • auto.offset.reset: 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션입니다. 이미 컨슈머 오프셋이 있다면 이 옵션값은 무시됩니다. 이 옵션은 latest, earliest, none 중 1개를 설정할 수 있습니다. latest로 설정하면 가장 최근에 넣은 오프셋부터, earliest로 설정하면 가장 오래전에 넣은 오프셋부터 읽기 시작합니다. none으로 설정하면 컨슈머 그룹이 커밋한 기록이 없으면 오류를 반환하고, 커밋 기록이 있다면 기존 커밋 기록 이후 오프셋부터 읽기 시작합니다. 기본값은 latest입니다.
    • enable.auto.commit: 자동 커밋으로 할지 수동 커밋으로 할지 선택합니다. 기본값은 true입니다.
    • auto.commit.interval.ms: 자동커밋일 경우 오프셋 커밋 간격을 지정합니다. 기본값은 5000(5초)입니다.
    • max.poll.records: poll() 메서드를 통해 반환되는 레코드 개수를 지정합니다. 기본값은 500입니다.
    • session.timeout.ms: 컨슈머가 브로커와 연결이 끊기는 최대 시간입니다. 이 시간 내에 하트비트(heartbeat)를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱을 시작합니다. 보통 하트비트 시간 간격의 3배로 설정합니다. 기본값은 10000(10초)입니다.
    • heartbeat.interval.ms: 하트비트를 전송하는 시간 간격입니다. 기본값은 3000(3초)입니다.
    • max.poll.interval.ms: poll() 메서드를 호출하는 간격의 최대 시간을 지정합니다. poll() 메서드를 호출한 이후에 데이터를 처리하는 데에 시간이 너무 많이 걸리는 경우 비정상으로 판단하고 리밸런싱을 시작합니다. 기본값은 300000(5분)입니다.
    • isolation.level: 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용합니다. 이 옵션은 read_committed, read_uncommitted로 설정할 수 있습니다. 기본값은 read_uncommitted입니다.

     

    리밸런스 리스너를 가진 컨슈머

      리밸런스 발생 시 데이터를 중복 처리하지 않게 하기 위해서는 리밸런스 발생 시 처리한 데이터를 기준으로 커밋을 시도해야 합니다. 리밸런스 발생을 감지하기 위해 카프카 라이브러리는 ConsumerRebalanceListener 인터페이스를 지원합니다.

    package com.example;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.time.Duration;
    import java.util.*;
    
    public class ConsumerWithRebalanceListener {
        private final static Logger logger = LoggerFactory.getLogger(ConsumerWithRebalanceListener.class);
        private final static String TOPIC_NAME = "test";
        private final static String BOOTSTRAP_SERVERS = "localhost:9092";
        private final static String GROUP_ID = "test-group";
    
    
        private static KafkaConsumer<String, String> consumer;
        private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap();
    
        public static void main(String[] args) {
            Properties configs = new Properties();
            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 리밸런스 발생 시 수동 커밋을 하기 위해 config 값 설정
            configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
            consumer = new KafkaConsumer<>(configs);
            
            // RebalanceListener는 subscribe() 메서드에 오버라이드 변수로 포함
            consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener());
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    logger.info("{}", record);
                    
                    /* 컨슈머 재시작 시 파티션에서 가장 마지막으로 커밋된 오프셋부터 레코드를
                       읽기 시작하기 때문에 오프셋 지정 커밋 시 offset+1 값을 넣음
                    */
                    currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1, null));
                    consumer.commitSync(currentOffsets);
                }
            }
        }
    
        private static class RebalanceListener implements ConsumerRebalanceListener {
            // 리밸런스가 끝난 뒤 파티션이 할당 완료되면 호출되는 메서드
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                logger.warn("Partitions are assigned");
    
            }
            
            // 리밸런스가 시작되기 직전에 호출되는 메서드
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                logger.warn("Partitions are revoked");
                
                // 리밸런스 발생 시 커밋
                consumer.commitSync(currentOffsets);
            }
        }
    }

     

    컨슈머 종료

      컨슈머를 안전하게 종료하기 위해 KafkaConsumer 클래스는 wakeup() 메서드를 지원합니다. wakeup() 메서드가 실행된 이후 poll() 메서드가 호출되면 WakeupException 예외가 발생합니다. Wakeup Exception 예외를 받은 뒤에 데이터 처리를 위해 사용한 자원들을 해제하면 됩니다. 마지막에는 close() 메서드를 호출하여 카프카 클러스터에 컨슈머가 안전하게 종료되었음을 명시적으로 알려주면 종료가 완료되었다고 볼 수 있습니다. 자바 애플리케이션의 경우 코드 내부에 셧다운 훅(shutdown hook)을 구현하여 안전한 종료를 명시적으로 구현할 수 있습니다. 셧다운 훅이란 사용자 또는 운영체제로부터 종료 요청을 받으면 실행하는 스레드를 뜻합니다.

    package com.example;
    
    import com.google.gson.Gson;
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.errors.WakeupException;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class ConsumerWithSyncOffsetCommit {
        private final static Logger logger = LoggerFactory.getLogger(ConsumerWithSyncOffsetCommit.class);
        private final static String TOPIC_NAME = "test";
        private final static String BOOTSTRAP_SERVERS = "localhost:9092";
        private final static String GROUP_ID = "test-group";
        private static KafkaConsumer<String, String> consumer;
    
        public static void main(String[] args) {
            Runtime.getRuntime().addShutdownHook(new ShutdownThread());
    
            Properties configs = new Properties();
            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
            consumer = new KafkaConsumer<>(configs);
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
    
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    for (ConsumerRecord<String, String> record : records) {
                        logger.info("{}", record);
                    }
                }
            } catch (WakeupException e) {
                logger.warn("Wakeup consumer");
            } finally {
                consumer.close();
            }
        }
    
        static class ShutdownThread extends Thread {
            public void run() {
                logger.info("Shutdown hook");
                consumer.wakeup();
            }
        }
    }

     

    3. 어드민 API

      프로듀서와 컨슈머를 통해 데이터를 주고받는 것만큼 카프카에 설정된 내부 옵션을 설정하고 확인하는 것이 중요합니다. 카프카 커맨드 라인 인터페이스로 명령을 내려 확인하는 방법도 있지만, 카프카 클라이언트에서는 내부 옵션들을 설정하고 조회하는 AdminClient 클래스를 제공합니다. Admin 클래스를 활용하면 클러스터의 옵션과 관련된 부분을 자동화할 수 있습니다. 다음은 활용 예시입니다.

    • 카프카 컨슈머를 멀티 스레드로 생성할 때, 구독하는 토픽의 파티션 개수만큼 스레드를 생성하고 싶을 때, 스레드 생성 전에 해당 토픽의 파티션 개수를 어드민 API를 통해 가져올 수 있습니다.
    • AdminClient 클래스로 구현한 웹 대시보드를 통해 ACL(Access Control List)이 적용된 클러스터의 리소스 접근 권한 규칙 추가를 할 수 있습니다.
    • 특정 토픽의 데이터양이 늘어남을 감지하고 AdminClient 클래스로 해당 토픽의 파티션을 늘릴 수 있습니다.
      // 어드민 API 선언 (다른 추가 설정 없이 클러스터 정보에 대한 설정만 하면 됨)
      Properties configs = new Properties();
      configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(configs);
    KafkaAdminClient 메서드명 설명
    describeCluster(DescribeClusterOptions options) 브로커의 정보 조회
    listTopics(ListTopicsOptions options) 토픽 리스트 조회
    listConsumerGroups(ListConsumerGroupsOptions options) 컨슈머 그룹 조회
    createTopics(Collection <NewTopic> newTopics, CreateTopicsOptions options) 신규 토픽 생성
    createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) 파티션 개수 변경
    createAcls(Collection<AclBinding> acls, CreateAclsOptions options) 접근 제어 규칙 생성
    package com.example;
    
    import org.apache.kafka.clients.admin.*;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.Node;
    import org.apache.kafka.common.config.ConfigResource;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Collections;
    import java.util.Map;
    import java.util.Optional;
    import java.util.Properties;
    
    public class KafkaAdminClient {
        private final static Logger logger = LoggerFactory.getLogger(KafkaAdminClient.class);
        private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    
        public static void main(String[] args) throws Exception {
    
            Properties configs = new Properties();
            configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            AdminClient admin = AdminClient.create(configs);
            
            logger.info("== Get broker information");
            for (Node node : admin.describeCluster().nodes().get()) {
                logger.info("node : {}", node);
                ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
                DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
                describeConfigs.all().get().forEach((broker, config) -> {
                    config.entries().forEach(configEntry -> logger.info(configEntry.name() + "= " + configEntry.value()));
                });
            }
    
            logger.info("== Get default num.partitions");
            for (Node node : admin.describeCluster().nodes().get()) {
                ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
                DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
                Config config = describeConfigs.all().get().get(cr);
                Optional<ConfigEntry> optionalConfigEntry = config.entries().stream().filter(v -> v.name().equals("num.partitions")).findFirst();
                ConfigEntry numPartitionConfig = optionalConfigEntry.orElseThrow(Exception::new);
                logger.info("{}", numPartitionConfig.value());
            }
    
            logger.info("== Topic list");
            for (TopicListing topicListing : admin.listTopics().listings().get()) {
                logger.info("{}", topicListing.toString());
            }
    
            logger.info("== test topic information");
            Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
            logger.info("{}", topicInformation);
    
            logger.info("== Consumer group list");
            ListConsumerGroupsResult listConsumerGroups = admin.listConsumerGroups();
            listConsumerGroups.all().get().forEach(v -> {
                logger.info("{}", v);
            });
    
            admin.close();
        }
    }
    반응형

    댓글

Designed by Tistory.