ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 07. 스프링 카프카 프로듀서(Spring kafka Producer)
    BackEnd/kafka 2021. 11. 5. 20:30
    반응형

      스프링 카프카(Spring kafka)는 카프카를 스프링 프레임워크에서 효과적으로 사용할 수 있도록 만들어진 라이브러리입니다. 기존 카프카 클라이언트 라이브러리를 래핑하여 만든 스프링 카프카 라이브러리는 카프카 클라이언트에서 사용하는 여러 가지 패턴을 미리 제공합니다.

     

      스프링 카프카 라이브러리를 추가하면 관련 디펜던시로 카프카 클라이언트 라이브러리가 추가됩니다. 스프링 카프카 라이브러리는 어드민, 컨슈머, 프로듀서, 스트림즈 기능을 제공합니다.

    // build.gradle에 디펜던시 추가
    dependencies {
        compile 'org.springframework.kafka:spring-kafka:2.5.10.RELEASE'
    }

     

      스프링 카프카 프로듀서는 '카프카 템플릿(Kafka Template)' 클래스를 사용하여 데이터를 전송할 수 있습니다. 카프카 템플릿은 프로듀서 팩토리(ProducerFactory) 클래스를 통해 생성할 수 있습니다.

     

    1. 기본 카프카 템플릿

      기본 카프카 템플릿은 기본 프로듀서 팩토리를 통해 생성된 카프카 템플릿을 사용합니다. 기본 카프카 템플릿을 사용할 때는 application.yaml에 프로듀서 옵션을 넣고 사용할 수 있습니다. application.yaml에 설정한 프로듀서 옵션값은 애플리케이션이 실행될 때 자동으로 오버라이드되어 설정됩니다. 스프링 카프카 프로듀서를 사용할 경우에는 필수 옵션이 없습니다. 옵션을 설정하지 않으면 bootstrap-servers는 localhost:9092, key-serializer와 value-serializer는 StringSerializer로 자동 설정되어 실행됩니다.

    # application.yaml 프로듀서 옵션값
    spring.kafka.producer.acks
    spring.kafka.producer.batch-size
    spring.kafka.producer.bootstrap-servers
    spring.kafka.producer.buffer-memory
    spring.kafka.producer.client-id
    spring.kafka.producer.compression-type
    spring.kafka.producer.key-serializer
    spring.kafka.producer.properties.*
    spring.kafka.producer.retries
    spring.kafka.producer.transaction-id-prefix
    spring.kafka.producer.value-serializer

      test0부터 test9까지 메시지 값을 클러스터로 보내는 프로듀서 애플리케이션입니다.

    package com.example;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.core.KafkaTemplate;
    
    @SpringBootApplication
    public class SpringProducerApplication implements CommandLineRunner {
    
        private static String TOPIC_NAME = "test";
    
        @Autowired
        private KafkaTemplate<Integer, String> template;
    
        public static void main(String[] args) {
            SpringApplication application = new SpringApplication(SpringProducerApplication.class);
            application.run(args);
        }
    
        @Override
        public void run(String... args) {
            for (int i = 0; i < 10; i++) {
                template.send(TOPIC_NAME, "test" + i);
            }
            System.exit(0);
        }
    }

     

      KafkaTemplate은 send(String topic, V data) 이외에도 여러 가지 데이터 전송 메서드들을 오버로딩하여 제공합니다.

    • send(String topic, K key, V data): 메시지 키, 값을 포함하여 특정 토픽으로 전달
    • send(String topic, Integer partition, K key, V data): 메시지 키, 값이 포함된 레코드를 특정 토픽의 특정 파티션으로 전달
    • send(String topic, Integer partition, Long timestamp, K key, V data): 메시지 키, 값 타임스탬프가 포함된 레코드를 특정 토픽의 특정 파티션으로 전달
    • send(ProducerRecord<K, V> record): 프로듀서 레코드(ProducerRecord) 객체를 전송

     

    2. 커스텀 카프카 템플릿

      커스텀 카프카 템플릿은 프로듀서 팩토리를 통해 만든 카프카 템플릿 객체를 빈으로 등록하여 사용하는 것입니다. 프로듀서에 필요한 각종 옵션을 선언하여 사용할 수 있습니다. 한 스프링 카프카 애플리케이션 내부에 다양한 종류의 카프카 프로듀서 인스턴스를 생성하고 싶다면 이 방식을 사용하면 됩니다. 즉, A클러스터로 전송하는 카프카 프로듀서와 B클러스터로 전송하는 카프카 프로듀서를 동시에 사용하고 싶다면 커스텀 카프카 템플릿을 사용하여 2개의 카프카 템플릿을 빈으로 등록하여 사용할 수 있습니다.

    package com.example;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.*;
    import java.util.HashMap;
    import java.util.Map;
    
    // KafkaTemplate 빈 객체를 등록하기 위해 @Configuration 선언
    @Configuration
    public class KafkaTemplateConfiguration {
    
        @Bean
        public KafkaTemplate<String, String> customKafkaTemplate() {
    
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.ACKS_CONFIG, "all");
    
            // ProducerFactory를 사용하여 KafkaTemplate 객체를 만들 때는 프로듀서 옵션을 직접 넣음
            ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
    
            return new KafkaTemplate<>(pf);
        }
    }

      스프링 카프카에서는 KafkaTemplate 외에도 아래 템플릿을 제공합니다.

    • ReplyingKafkaTemplate: 컨슈머가 특정 데이터를 전달받았는지 여부 확인 가능
    • RoutingKafkaTemplate: 전송하는 토픽별 옵션을 다르게 설정 가능
    package com.example;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.WebApplicationType;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.core.KafkaProducerException;
    import org.springframework.kafka.core.KafkaSendCallback;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.util.concurrent.ListenableFuture;
    
    @SpringBootApplication
    public class SpringProducerApplication implements CommandLineRunner {
    
        private static String TOPIC_NAME = "test";
    
        // Bean 객체로 등록한 customKafkaTemplate 주입받도록 메서드 이름과 동일한 변수명 선언
        @Autowired
        private KafkaTemplate<String, String> customKafkaTemplate;
    
        public static void main(String[] args) {
            SpringApplication application = new SpringApplication(SpringProducerApplication.class);
            application.run(args);
        }
    
        @Override
        public void run(String... args) {
            // ListenableFuture: 전송 이후 정상 적재됐는지 여부 확인
            ListenableFuture<SendResult<String, String>> future = customKafkaTemplate.send(TOPIC_NAME, "test");
            
            // 비동기 확인 방법
            future.addCallback(new KafkaSendCallback<String, String>() {
                @Override
                public void onSuccess(SendResult<String, String> result) {
                  // 브로커에 정상 적재 시 수행
                }
    
                @Override
                public void onFailure(KafkaProducerException ex) {
                  // 적재되지 않고 이슈 발생 시 수행
                }
            });
            System.exit(0);
        }
    }

     

    반응형

    댓글

Designed by Tistory.