-
11. 스프링 카프카 프로젝트(Spring Kafka Project)BackEnd/kafka 2021. 11. 20. 11:37반응형
Spring Boot와 Kafka를 이용하여 프로젝트를 구성해 보겠습니다. 제목과 내용을 입력하고 전송 버튼을 누르면 해당 이벤트를 카프카 토픽으로 전달하고, 컨슈머는 이를 파일로 저장합니다. 컨슈머는 파일이 아니라 HDFS(Hadoop Distributed File System), ElasticSearch, Redis 등 다양한 스토리지에 데이터를 적재할 수 있습니다. 전체소스는 github(https://github.com/HanseomKim/kafka/tree/master)을 참고하시면 됩니다.
1. 주키퍼와 카프카를 실행하고 토픽을 생성합니다.
PS C:\kafka\kafka_2.12-2.8.1> bin\windows\kafka-topics.bat --create ` >> --bootstrap-server localhost:9092 ` >> --replication-factor 1 ` >> --partitions 3 ` >> --topic test Created topic test.
2. Spring Boot 프로젝트를 https://start.spring.io/ 에서 생성합니다. [GENERATE]
3. IntelliJ에서 다운로드 받은 프로젝트를 Open 합니다. (File > Open > 작업폴더\프로젝트\build.gradle)
4. 웹페이지 개발
Spring Boot는 static/index.html을 올려두면 Welcome Page 기능을 제공합니다. 웹페이지에서는 단순하게 제목과 내용을 입력받고 프로듀서 API를 호출합니다.
5. Producer 개발
웹 페이지에서 생성된 이벤트를 받는 REST API 클라이언트를 만들고 전달받은 이벤트를 토픽으로 전달합니다. REST API는 RestController를 사용하여 구현합니다.
1) application.yaml
application.yaml 파일은 스프링 부트에서 사용할 리소스를 사람이 쉽게 읽을 수 있는 형태로 만든 설정 파일입니다. 스프링 카프카에서 사용하는 클러스터 정보, 직렬화 방식, 옵션들을 설정합니다.
spring: kafka: producer: bootstrap-servers: localhost:9092 batch-size: 10 retries: 1 acks: 1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer topic: test: test
2) ProducerController.java
카프카 프로듀서는 기본 프로듀서 팩토리를 통해 생성된 카프카 템플릿을 사용합니다. 기본 카프카 템플릿을 사용할 때는 application.yaml에 프로듀서 옵션을 넣고 사용할 수 있습니다. application.yaml에 설정한 프로듀서 옵션값은 애플리케이션이 실행될 때 자동으로 오버라이드되어 설정됩니다. 별도 구성이 필요할 경우 KafkaProducerConfiguration.java 파일을 참고하시면 됩니다.
package com.spring.kafka.controller; import com.spring.kafka.model.dto.EventDto; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Value("${spring.kafka.topic.test}") private String topic; private final Logger logger = LoggerFactory.getLogger(ProducerController.class); @Autowired KafkaTemplate kafkaTemplate; @GetMapping("/api/test") public void setKafkaRequest(@RequestParam String title, @RequestParam String content) { EventDto event = new EventDto(); event.setTitle(title); event.setContent(content); kafkaTemplate.send(topic, event).addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable ex) { logger.error(ex.getMessage(), ex); } @Override public void onSuccess(SendResult<String, String> result) { logger.info(result.toString()); } }); } }
6. Consumer 개발
1) KafkaConsumerConfiguration.java
Bean으로 등록한 kafkaListenerContainerFactory 메서드명이 ConsumerController.java에서 @KafkaListener 컨테이너 팩토리 등록 시 사용됩니다.
package com.spring.kafka.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfiguration { @Value("${spring.kafka.producer.bootstrap-servers}") private String servers; @Value("${spring.kafka.producer.batch-size}") private String size; @Value("${spring.kafka.producer.retries}") private String retries; @Value("${spring.kafka.producer.key-deserializer}") private String keyDeserializer; @Value("${spring.kafka.producer.value-deserializer}") private String valueDeserializer; public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
2) ConsumerController.java
package com.spring.kafka.controller; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.io.File; import java.io.FileWriter; import java.io.IOException; @Component public class ConsumerController { private final Logger logger = LoggerFactory.getLogger(ConsumerController.class); @KafkaListener(topics = "${spring.kafka.topic.test}", groupId = "test-group", containerFactory = "kafkaListenerContainerFactory") public void listen(ConsumerRecord consumerRecord) throws Exception { try { // 파일이 아닌 HDFS(Hadoop Distributed File System), ElasticSearch, Redis 등 다양한 스토리지에 적재 가능 File file = new File("C:\\kafka\\test.txt"); FileWriter writer = null; try { // 기존 파일의 내용에 이어서 쓰려면 true, 기존 내용을 없애고 새로 쓰려면 false writer = new FileWriter(file, true); writer.write(consumerRecord.value().toString()); writer.write("\n"); writer.flush(); System.out.println("DONE"); } catch(IOException e) { e.printStackTrace(); } finally { try { if(writer != null) writer.close(); } catch(IOException e) { e.printStackTrace(); } } } catch (Exception e) { logger.warn("exception:{},{}", e.getMessage(), e.toString()); } } }
7. 기능 확인
웹페이지에서 Title(hello ~ hello3), Content(kafka ~ kafka3) 입력하고 전송 시 지정한 경로에 파일로 저장된 것을 확인할 수 있습니다.
반응형'BackEnd > kafka' 카테고리의 다른 글
10. 카프카 명령어(Command) (0) 2021.11.14 09. 카프카 설치(Kafka Installation) (0) 2021.11.13 08. 스프링 카프카 컨슈머(Spring Kafka Consumer) (0) 2021.11.06 07. 스프링 카프카 프로듀서(Spring kafka Producer) (0) 2021.11.05 06. 카프카 클라이언트(Kafka Client with JAVA) (0) 2021.11.05