BackEnd/Kafka

Spring Boot + Kafka 메시지 테스트

연향동큰손 2025. 8. 14. 15:59

 

Spring Boot로 Producer와 Consumer 서버를 생성하고 Kafka Server에 메시지를 넣어 통신하는 것을 실습했다.

 

준비물

  • Producer 프로젝트 (Spring Boot)
  • Consumer 프로젝트 (Spring Boot)
  • Kafka Server(EC2)

 

구현 로직

  1. 클라이언트가 REST API(`/api/emails`)로 이메일 발송 요청(JSON)을 보냄
  2. Producer(Spring Boot 서비스)가 요청을 받아서 Kafka 토픽(“email.send”)에 메시지를 발행
  3. Kafka 브로커는 그 메시지를 해당 토픽에 저장
  4. Consumer(Spring Boot 서비스)가 토픽을 구독하다가 새 메시지를 읽음
  5. Consumer가 JSON을 EmailSendMessage 객체로 변환
  6. Consumer에서 실제 이메일 발송 작업

 

Producer

<application.yml>

spring:
  kafka:
    bootstrap-servers: 43.200.43.104:9092
    producer:

      key-serializer: org.apache.kafka.common.serialization.StringSerializer

      value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

bootstrap-server에 Kafka 서버의 IP주소 : Port Number를 적어준다.

 

그리고 Kafka Producer는 KeyValuebyte배열로 전송해야 하므로`org.apache.kafka.common.serialization.StringSerializer`를 이용해서 String을 byte[]로 변환 해준다.

 

 

<EmailSendMessage>

package com.example.emailsendproducer;

public class EmailSendMessage { //카프카로 보내는 Message 객체
    private String from; //발신자 이메일
    private String to; //수신자 이메일
    private String subject;  //이메일 제목
    private String body;  //이메일 본문

    public EmailSendMessage(String body, String subject, String to, String from) {
        this.body = body;
        this.subject = subject;
        this.to = to;
        this.from = from;
    }

    public String getFrom() {
        return from;
    }

    public String getTo() {
        return to;
    }

    public String getSubject() {
        return subject;
    }

    public String getBody() {
        return body;
    }
}

 

EmailSendMessage는 Kafka로 보낼 메시지 객체이다.

 

 

<SendEmailRequestDto>

package com.example.emailsendproducer;

public class SendEmailRequestDto {  //서버로 보낼 JSON 데이터를 담는 DTO
    private String from;
    private String to;
    private String subject;
    private String body;

    public String getFrom() {
        return from;
    }

    public void setFrom(String from) {
        this.from = from;
    }

    public String getTo() {
        return to;
    }

    public void setTo(String to) {
        this.to = to;
    }

    public String getSubject() {
        return subject;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }

}

 

Controller에서 JSON 요청 본문을 받을 때 사용하기 위해서 DTO객체를 생성했다.

 

 

<EmailService>

package com.example.emailsendproducer;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class EmailService {
    private final KafkaTemplate<String, String> kafkaTemplate; //카프카로 메시지를 보내는 객체

    public EmailService(KafkaTemplate<String, String> kafkaTemplate){
        this.kafkaTemplate=kafkaTemplate;
    }

    public void sendEmail(SendEmailRequestDto request){ //DTO를 EmailSendMessage 객체로 변환
        EmailSendMessage emailSendMessage = new EmailSendMessage(
            request.getFrom(),
            request.getTo(),
            request.getSubject(),
            request.getBody()
        );

        this.kafkaTemplate.send("email.send",toJsonString(emailSendMessage)); //email.send라는 토픽에 전달
    }
    private String toJsonString(Object object) { //객체를 JSON 문자열로 직렬화 (카프카의 key-value 형식이 String형식이기 때문)
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            String message = objectMapper.writeValueAsString(object);
            return message;
        }catch(JsonProcessingException e){
            throw new RuntimeException("Json 직렬화 실패");
        }
    }
}

 

KafkaTemplate은 Spring 에서 Kafka에 메시지를 쉽게 보낼 수 있게 해주는 핵심 클래스이다.

 

주로 Kafka Topic에 메시지를 전송할 때 사용되고, 핵심 메서드는 send()메서드이다.

kafkaTemplate.send("토픽이름", "메시지내용");

 

KafkaTemplate<String, String> => 메시지의 Key와 Value를 모두 String 형식으로 보낸다는 의미이다.

 

Controller에서 사용하게 될 SendEmail메서드는 DTO를 EmailSendMessage객체로 변환하고 toJsonString메서드로 JSON 문자열로 직렬화하여 Kafka Server로 전송하도록 구현했다.

 

<EmailController>

package com.example.emailsendproducer;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/emails")
public class EmailController {
    private final EmailService emailService;

    public EmailController(EmailService emailService){
        this.emailService = emailService;
    }

    @PostMapping
    public ResponseEntity<String> sendEmail(
            @RequestBody SendEmailRequestDto sendEmailRequestDto
    ){
        emailService.sendEmail(sendEmailRequestDto);
        return ResponseEntity.ok("이메일 발송 요청 완료");
    }

}

 

 

Consumer

<application.yml>

server:
  port: 0 # 사용 가능한 랜덤 포트를 찾아서 서버를 실행 (Producer 서버와의 포트 충돌을 방지)
spring:
  kafka:
    # Kafka 서버 주소 (EC2에 카프카를 설치했기 때문에 EC2 주소를 입력해야 한다.)
    bootstrap-servers: 43.200.43.104:9092
    consumer:
    # 메시지의 key 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 메시지의 value 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 컨슈머 그룹이 미리 안 만들어져있는 경우에, 컨슈머 그룹을 직접 생성해서 메시지를 처음부터 읽음.
    # 만약 컨슈머 그룹이 이미 만들어져있다면, 해당 컨슈머 그룹이 읽었던 메시지부터 읽음.
    # 이 옵션을 주지 않으면 컨슈머 그룹을 직접 생성해서 메시지를 읽을 때,
    # 기존에 쌓여있던 메시지를 읽지 않고 컨슈머 그룹이 생성된 이후에 들어온 메시지부터 읽어버린다.
    # 그럼 컨슈머 그룹이 생성되기 전에 쌓여있던 메시지들이 처리되지 않고 누락돼버린다.
      auto-offset-reset: earliest

 

 

 

 

<EmailSendMessage>

package com.example.emailsendconsumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;


public class EmailSendMessage {
    private String from;
    private String to;
    private String subject;
    private String body;

    public EmailSendMessage() {
    }

    public EmailSendMessage(String body, String subject, String to, String from) {
        this.body = body;
        this.subject = subject;
        this.to = to;
        this.from = from;
    }

    public String getFrom() {
        return from;
    }

    public String getTo() {
        return to;
    }

    public String getSubject() {
        return subject;
    }

    public String getBody() {
        return body;
    }

    public static EmailSendMessage fromJson(String json){
        try{
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.readValue(json, EmailSendMessage.class);
        }catch(JsonProcessingException e){
            throw new RuntimeException("Json 파싱 실패");
        }
    }
}

 

Kafka로부터 받은 JSON 문자열을 자바 객체로 변환하기 위한 클래스이다.

 

 

<EmailSendConsumer>

package com.example.emailsendconsumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class EmailSendConsumer {
    @KafkaListener(
            topics = "email.send",
            groupId = "email-send-group"
    )
    public void consume(String message){
        System.out.println("Kafka로부터 받아온 메시지 : " + message);
        EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message);

        // 실제 이메일 발송 로직 생략
        try {
            Thread.sleep(3000);
        }catch (InterruptedException e){
            throw new RuntimeException("이메일 발송 실패");
        }
        System.out.println("이메일 발송 완료");
    }
}

 

@KafkaListener

  • topics에 메시지가 도착하게 되면 메서드를 자동으로 실행한다.
  • 같은 groupId에 해당하는 Consumer들이 메시지를 나눠서 처리

해당 클래스에서 비동기 처리로 인해 실제 처리 완료와 상관없이 응답을 받을 수 있는지 테스트 해보기 위해 Sleep 함수를 넣었다.

 

 

Postman으로 메시지를 전송한 결과 Consumer에서 메시지 내용을 정상적으로 확인 가능했다.

 

그리고 Kafka Server에서도 메시지가 정상적으로 전송된것을 확인할 수 있었다.

'BackEnd > Kafka' 카테고리의 다른 글

[Kafka] Kafka 메시지 처리 성능 향상 (Partition)  (0) 2025.08.15
[Kafka] Dead Letter Topic  (2) 2025.08.14
Kafka란?  (1) 2025.08.12