Spring Boot로 Producer와 Consumer 서버를 생성하고 Kafka Server에 메시지를 넣어 통신하는 것을 실습했다.
준비물
- Producer 프로젝트 (Spring Boot)
- Consumer 프로젝트 (Spring Boot)
- Kafka Server(EC2)
구현 로직
- 클라이언트가 REST API(`/api/emails`)로 이메일 발송 요청(JSON)을 보냄
- Producer(Spring Boot 서비스)가 요청을 받아서 Kafka 토픽(“email.send”)에 메시지를 발행
- Kafka 브로커는 그 메시지를 해당 토픽에 저장
- Consumer(Spring Boot 서비스)가 토픽을 구독하다가 새 메시지를 읽음
- Consumer가 JSON을 EmailSendMessage 객체로 변환
- Consumer에서 실제 이메일 발송 작업
Producer
<application.yml>
spring:
kafka:
bootstrap-servers: 43.200.43.104:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-server에 Kafka 서버의 IP주소 : Port Number를 적어준다.
그리고 Kafka Producer는 Key와 Value를 byte배열로 전송해야 하므로`org.apache.kafka.common.serialization.StringSerializer`를 이용해서 String을 byte[]로 변환 해준다.
<EmailSendMessage>
package com.example.emailsendproducer;
public class EmailSendMessage { //카프카로 보내는 Message 객체
private String from; //발신자 이메일
private String to; //수신자 이메일
private String subject; //이메일 제목
private String body; //이메일 본문
public EmailSendMessage(String body, String subject, String to, String from) {
this.body = body;
this.subject = subject;
this.to = to;
this.from = from;
}
public String getFrom() {
return from;
}
public String getTo() {
return to;
}
public String getSubject() {
return subject;
}
public String getBody() {
return body;
}
}
EmailSendMessage는 Kafka로 보낼 메시지 객체이다.
<SendEmailRequestDto>
package com.example.emailsendproducer;
public class SendEmailRequestDto { //서버로 보낼 JSON 데이터를 담는 DTO
private String from;
private String to;
private String subject;
private String body;
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
public String getTo() {
return to;
}
public void setTo(String to) {
this.to = to;
}
public String getSubject() {
return subject;
}
public void setSubject(String subject) {
this.subject = subject;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
Controller에서 JSON 요청 본문을 받을 때 사용하기 위해서 DTO객체를 생성했다.
<EmailService>
package com.example.emailsendproducer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class EmailService {
private final KafkaTemplate<String, String> kafkaTemplate; //카프카로 메시지를 보내는 객체
public EmailService(KafkaTemplate<String, String> kafkaTemplate){
this.kafkaTemplate=kafkaTemplate;
}
public void sendEmail(SendEmailRequestDto request){ //DTO를 EmailSendMessage 객체로 변환
EmailSendMessage emailSendMessage = new EmailSendMessage(
request.getFrom(),
request.getTo(),
request.getSubject(),
request.getBody()
);
this.kafkaTemplate.send("email.send",toJsonString(emailSendMessage)); //email.send라는 토픽에 전달
}
private String toJsonString(Object object) { //객체를 JSON 문자열로 직렬화 (카프카의 key-value 형식이 String형식이기 때문)
ObjectMapper objectMapper = new ObjectMapper();
try {
String message = objectMapper.writeValueAsString(object);
return message;
}catch(JsonProcessingException e){
throw new RuntimeException("Json 직렬화 실패");
}
}
}
KafkaTemplate은 Spring 에서 Kafka에 메시지를 쉽게 보낼 수 있게 해주는 핵심 클래스이다.
주로 Kafka Topic에 메시지를 전송할 때 사용되고, 핵심 메서드는 send()메서드이다.
kafkaTemplate.send("토픽이름", "메시지내용");
KafkaTemplate<String, String> => 메시지의 Key와 Value를 모두 String 형식으로 보낸다는 의미이다.
Controller에서 사용하게 될 SendEmail메서드는 DTO를 EmailSendMessage객체로 변환하고 toJsonString메서드로 JSON 문자열로 직렬화하여 Kafka Server로 전송하도록 구현했다.
<EmailController>
package com.example.emailsendproducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/emails")
public class EmailController {
private final EmailService emailService;
public EmailController(EmailService emailService){
this.emailService = emailService;
}
@PostMapping
public ResponseEntity<String> sendEmail(
@RequestBody SendEmailRequestDto sendEmailRequestDto
){
emailService.sendEmail(sendEmailRequestDto);
return ResponseEntity.ok("이메일 발송 요청 완료");
}
}
Consumer
<application.yml>
server:
port: 0 # 사용 가능한 랜덤 포트를 찾아서 서버를 실행 (Producer 서버와의 포트 충돌을 방지)
spring:
kafka:
# Kafka 서버 주소 (EC2에 카프카를 설치했기 때문에 EC2 주소를 입력해야 한다.)
bootstrap-servers: 43.200.43.104:9092
consumer:
# 메시지의 key 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 메시지의 value 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 컨슈머 그룹이 미리 안 만들어져있는 경우에, 컨슈머 그룹을 직접 생성해서 메시지를 처음부터 읽음.
# 만약 컨슈머 그룹이 이미 만들어져있다면, 해당 컨슈머 그룹이 읽었던 메시지부터 읽음.
# 이 옵션을 주지 않으면 컨슈머 그룹을 직접 생성해서 메시지를 읽을 때,
# 기존에 쌓여있던 메시지를 읽지 않고 컨슈머 그룹이 생성된 이후에 들어온 메시지부터 읽어버린다.
# 그럼 컨슈머 그룹이 생성되기 전에 쌓여있던 메시지들이 처리되지 않고 누락돼버린다.
auto-offset-reset: earliest
<EmailSendMessage>
package com.example.emailsendconsumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class EmailSendMessage {
private String from;
private String to;
private String subject;
private String body;
public EmailSendMessage() {
}
public EmailSendMessage(String body, String subject, String to, String from) {
this.body = body;
this.subject = subject;
this.to = to;
this.from = from;
}
public String getFrom() {
return from;
}
public String getTo() {
return to;
}
public String getSubject() {
return subject;
}
public String getBody() {
return body;
}
public static EmailSendMessage fromJson(String json){
try{
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(json, EmailSendMessage.class);
}catch(JsonProcessingException e){
throw new RuntimeException("Json 파싱 실패");
}
}
}
Kafka로부터 받은 JSON 문자열을 자바 객체로 변환하기 위한 클래스이다.
<EmailSendConsumer>
package com.example.emailsendconsumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class EmailSendConsumer {
@KafkaListener(
topics = "email.send",
groupId = "email-send-group"
)
public void consume(String message){
System.out.println("Kafka로부터 받아온 메시지 : " + message);
EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message);
// 실제 이메일 발송 로직 생략
try {
Thread.sleep(3000);
}catch (InterruptedException e){
throw new RuntimeException("이메일 발송 실패");
}
System.out.println("이메일 발송 완료");
}
}
@KafkaListener
- topics에 메시지가 도착하게 되면 메서드를 자동으로 실행한다.
- 같은 groupId에 해당하는 Consumer들이 메시지를 나눠서 처리
해당 클래스에서 비동기 처리로 인해 실제 처리 완료와 상관없이 응답을 받을 수 있는지 테스트 해보기 위해 Sleep 함수를 넣었다.
Postman으로 메시지를 전송한 결과 Consumer에서 메시지 내용을 정상적으로 확인 가능했다.
그리고 Kafka Server에서도 메시지가 정상적으로 전송된것을 확인할 수 있었다.
'BackEnd > Kafka' 카테고리의 다른 글
[Kafka] Kafka 메시지 처리 성능 향상 (Partition) (0) | 2025.08.15 |
---|---|
[Kafka] Dead Letter Topic (2) | 2025.08.14 |
Kafka란? (1) | 2025.08.12 |