9oormthonUNIV 스터디 프로젝트

Spring Boot + Kafka + Docker로 비동기 메시징 구현하기

연향동큰손 2025. 5. 5. 13:58

카프카(Kafka)란?

카프카(Kafka)는 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 설계된 고성능 분산 이벤트 스트리밍 플랫폼이다.

 

 

Kafka 구성요소

 

  • Producer :  Kafka에 이벤트를 Publish(게시)하는 클라이언트, topic에 데이터를 생성하고 전송하는 역할
  • Consumer :  이벤트를 Subscribe(읽고 처리)하는 클라이언트, topic내부 파티션에 저장된 데이터를 가져오는 역할 

Kafka에서는 Producer와 Consumer가 완전히 분리되어 있다.

이는 Kafka의 높은 확장성을 위한 핵심 요소이다.

 

  • Topic :  데이터가 들어갈 수 있는 공간을 이야기하며, 여러 개의 토픽이 생성될 수 있다.

topic은 한개 이상의 파티션으로 구성되어 있다. 이때 파티션은 데이터를 분산처리하기 위한 단위로 볼 수 있다.

 

하나의 파티션은 큐(queue)와 같이 파티션 끝에서부터 데이터가 쌓이게 되고, consumer는 가장 오래된 순으로 데이터를 가져가게 된다.

 

kafka는 consumer가 데이터를 가져가도 파티션에 있는 데이터가 삭제되지 않는다는 특징이 있다.

때문에 동일 데이터에 대해 여러 번 처리를 할 수 있으며, 이는 Kafka를 사용하는 중요한 이유 중 하나이다.

 

 

Kafka Cluster

Kafka의 주요 구성 요소중 Broker, Zookeeper는 카프카 클러스터를 구성하는 것들이다. Apache Zookeeper는 카프카 클러스터 윗 단에서 역할을 하고 Broker는 카프카 클러스터 내에서 역할을 한다.

 

Zookeeper의 역할

  • 브로커 상태 관리 (Broker Discovery & Coordination)
  • 리더 선출 (Leader Election) :각 파티션에는 리더가 존재하는데, 이 리더를 Zookeeper가 선출한다.
  • 파티션 메타데이터 저장
  • 컨피규레이션 저장소

 

Broker의 역할

  • Producer가 전송한 메시지를 저장
  • Consumer가 요청하면 저장된 메시지를 전송

Broker는 쉽게 말해서 Producer와 Consumer 사이에서 전달자(중계자) 역할을 한다. 

 


Kafka 와 Spring Boot 애플리케이션 연동

 

Kafka Broker를 Docker로 실행하기 위해 docker-compose.yml을 만들어주고 Docker에서 실행해준다.

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

 

 

 

 

 

Consumer

<application.yml>

server:
  port: 8090

spring:
  application:
    name: consumer_application

  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: consumer_group01
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

<KafkaConsumerConfig>

// Kafka 관련 설정을 위한 클래스
@EnableKafka // Kafka 리스너 사용을 위한 어노테이션
@Configuration
public class KafkaConsumerConfig {

    private final Environment env;

    // 생성자를 통해 application.yml 환경 변수 주입
    public KafkaConsumerConfig(Environment env) {
        this.env = env;
    }

    // Kafka Consumer 설정값 정의
    @Bean
    public Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.consumer.bootstrap-servers")); // Kafka 서버 주소
        props.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id")); // Consumer 그룹 ID
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("spring.kafka.consumer.auto-offset-reset")); // 메시지 읽기 시작 위치 설정
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 메시지 키 역직렬화 방식
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 메시지 값 역직렬화 방식
        return props;
    }

    // Kafka ConsumerFactory 생성 (Consumer 인스턴스 생성 담당)
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    // Kafka Listener Container Factory 생성
    // @KafkaListener가 메시지를 수신할 수 있도록 컨테이너 설정
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

 

 

<KafkaConsumerService>

// Kafka에서 메시지를 수신하는 서비스 클래스
@Service
@Slf4j
public class KafkaConsumerService {

    // Kafka에서 "my-topic"으로 들어오는 메시지를 수신
    // consumer_group01 그룹에 속함
    @KafkaListener(topics = "my-topic", groupId = "consumer_group01")
    public void consume(String message) {
        // 수신한 메시지를 로그로 출력
        log.info("Consumed Message : {}", message);
    }
}

 

 

Producer

 

<application.yml>

server:
  port: 8080

spring:
  application:
    name: producer_application

  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

topic:
  name: my-topic

 

<KafkaProducerConfig>

// Kafka Producer 설정 클래스
@Configuration
public class KafkaProducerConfig {

    // application.yml 등의 설정 값을 가져오기 위한 Environment 주입
    private final Environment env;

    public KafkaProducerConfig(Environment environment) {
        this.env = environment;
    }

    // Kafka Producer 관련 설정을 담은 Bean
    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        
        // Kafka 서버 주소 설정 (e.g., localhost:9092)
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                env.getProperty("spring.kafka.producer.bootstrap-servers"));

        // 메시지 키 직렬화 방식 (문자열)
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 메시지 값 직렬화 방식 (문자열)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return props;
    }

    // ProducerFactory Bean 생성 (KafkaTemplate에서 사용됨)
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(this.producerConfig());
    }

    // KafkaTemplate Bean 생성: 실제 메시지를 전송할 때 사용
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(this.producerFactory());
    }
}

이 설정을 통해 Spring Boot 애플리케이션에서 Kafka로 메시지를 전송할 수 있는 환경이 갖추게 된다.

 

 

<KafkaProducerService>

// Kafka로 메시지를 전송하는 서비스 클래스
@Service
@RequiredArgsConstructor 
@Slf4j 
public class KafkaProducerService {

    // application.yml에서 설정한 topic.name 값을 주입
    @Value("${topic.name}")
    private String topicName;

    // 메시지를 실제로 전송하는 KafkaTemplate
    private final KafkaTemplate<String, String> kafkaTemplate;

    // 메시지를 Kafka로 전송하는 메서드
    public void sendMessageToKafka(String message) {
        // 전송할 메시지를 로그로 출력
        log.info("Producer Message : {}", message);

        // 지정된 토픽으로 메시지 전송
        kafkaTemplate.send(topicName, message);
    }
}

 

 

<KafkaProducerController>

@RestController
@RequiredArgsConstructor
public class KafkaProducerController {

    private final KafkaProducerService kafkaProducerService;

    @PostMapping("/api/comment")
    public ResponseEntity<Void> sendMessage(@RequestParam String message) {
        kafkaProducerService.sendMessageToKafka(message);
        return ResponseEntity.ok().build();
    }
}

 

 

테스트

실행 결과 Consumer에서 정상적으로 메시지가 도착한 것을 확인할 수 있었다.