BackEnd/Kafka

[Kafka] MSA 구조에서의 Kafka 사용

연향동큰손 2025. 8. 19. 16:37

 

MSA(Microservice Architecture)란?

MSA는 큰 애플리케이션을 여러 개의 작고 독립적인 서비스로 나누어 개발하고 운영하는 아키텍처이다.

 

각 서비스는 독립적으로 배포와 확장이 가능하고, 명확히 분리된 API로 통신을 하게 된다.

 

MSA의 특징

  • 독립적으로 개발, 배포, 확장 가능
  • API 기반의 통신으로 서비스 간 느슨한 결합 유지
  • 서비스 마다 서로 다른 언어나 기술 스택 사용 가능
  • 장애 격리 : 특정 서비스 장애가 전체 시스템에 영향 최소화

 

이러한 MSA와 같은 분산환경에서 Kafka가 유용하게 사용될 수 있다.

 

서비스간 직접 호출 대신 Kafka 메시지 큐를 통해 이벤트 기반의 통신을 통해 서로 의존성을 줄일 수 있고,

 

비동기 작업 처리로 응답속도를 향상시킬 수 있다.

 

또한  Kafka의 토픽은 여러 개의 파티션으로 나뉘며, 각 파티션은 독립적으로 읽기/쓰기가 가능하여 클러스터 내 다른 브로커에 I/O부하를 분산하여 처리량을 높일 수 있다.

 

 

MSA 프로젝트에 Kafka 적용해보기

 

https://github.com/yangwoohyeon/MSA_Kafka

 

GitHub - yangwoohyeon/MSA_Kafka

Contribute to yangwoohyeon/MSA_Kafka development by creating an account on GitHub.

github.com

 

구현 기능

  • 회원가입 서비스, 프로젝트에서는 Producer의 역할 (localhost:8080)
  • 회원가입에 성공하면, DB에 저장 후, 회원정보(userId, email)를 Kafka 서버로 전송
  • Kafka 서버에서 Email Service로 회원정보 전달
  • Email Service에서는 회원가입 축하 이메일 전송(이메일 전송 로직 구현 생략)
  • Kafka 서버는 AWS EC2에 구현 ( Partition개수: 3개, Replication : 3개)

 

 

 

 

1. 토픽 생성

메시지 분산 저장 및 병렬 처리를 위해 3개의 파티션을 확보하고, 데이터 안정성과 가용성을 위해 각 파티션을 3개의 브로커에 복제

bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic user.signed-up \
--partitions 3 \
--replication-factor 3

 

 

2. DLT 토픽 생성

메시지 처리 중 오류가 발생하여 정상적으로 메시지를 처리하지 못할 경우, 처리하지 못한 메시지를 별도로 분리하여 관리하고, 시스템의 안정성과 신뢰성을 높이기 위해 DLT 토픽을 생성해준다.

bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic user.signed-up.dlt \
--partitions 1 \
--replication-factor 3

 

 

3. 회원가입 서비스 user-service 구현

 

<UsersignedUpEvent>

Kafka에 회원정보를 전달할때 Key와 Value를 String 타입으로 직렬화 하도록 지정했기 때문에 회원정보를 문자열로 변환하여 전송해야 한다.

따라서 Kafka로 보낼 정보를 객체로 캡슐화 하고, 객체를 JSON 형식의 String으로 변환하여 전송하기 위해 회원 정보를 저장할 UsersignedUpEvent 클래스를 생성해줬다.

package com.example.userservice;


//Kafka에 넣을 메시지 객체
public class UsersignedUpEvent {

    private Long userId;

    private String email;

    private String name;

    /*비밀번호는 이메일 서버에서 필요하지 않기 때문에 포함 X*/
    public UsersignedUpEvent(Long userId, String email, String name) {
        this.userId = userId;
        this.email = email;
        this.name = name;
    }

    public Long getUserId() {
        return userId;
    }

    public String getEmail() {
        return email;
    }

    public String getName() {
        return name;
    }
}

 

 

<UserService>

회원가입에 성공할 경우 회원 정보를 DB에 저장하고, toJsonString메서드를 이용해 Kafka에 전송할 객체를 JSON 형식의 String으로 변환하여 user.signed-up 토픽으로 전송한다.

package com.example.userservice;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class UserService {

    private final UserRepository userRepository;
    private final KafkaTemplate<String,String> kafkaTemplate; //Kafka 메시지 발행 객체

    public UserService(UserRepository userRepository, KafkaTemplate<String, String> kafkaTemplate) {
        this.userRepository = userRepository;
        this.kafkaTemplate = kafkaTemplate;
    }

    public void signUp(SignUpRequestDto signUpRequestDto){
        User user = new User( //유저 생성
                signUpRequestDto.getEmail(),
                signUpRequestDto.getName(),
                signUpRequestDto.getPassword()
        );
        User savedUser = userRepository.save(user); //유저 정보 DB에 저장

        UsersignedUpEvent usersignedUpEvent = new UsersignedUpEvent(
                savedUser.getId(),
                savedUser.getEmail(),
                savedUser.getName()
        );
        this.kafkaTemplate.send("user.signed-up",toJsonString(usersignedUpEvent));
    }

    private String toJsonString(Object object){ //Kafka에 전송할 객체를 Json형식의 String 으로 변환하기 위해 사용
        ObjectMapper objectMapper = new ObjectMapper();
        try{
            String message = objectMapper.writeValueAsString(object);
            return message;
        }catch(JsonProcessingException e){
            throw new RuntimeException("Json 직렬화 실패");
        }
    }
}

 

 

4. consumer 역할을 하는 email-service 구현

 

<UserSignedUpEvent>

Kafka에서 받은 메시지를 객체로 역직렬화 하기위한 DTO이다.

 

package com.example.email_service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

//Kafka에서 받은 이벤트 메시지를 JSON -> 객체로 역질렬화하는 DTO
public class UserSignedUpEvent {
    private Long userId;
    private String email;
    private String name;

    public UserSignedUpEvent(){ //역직렬화에 필요

    }

    public UserSignedUpEvent(Long userId, String email, String name) {
        this.userId = userId;
        this.email = email;
        this.name = name;
    }

    public static UserSignedUpEvent fromJson(String json){ //수신한 JSON 문자열을 객체로 변환
        try{
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.readValue(json, UserSignedUpEvent.class);
        }catch(JsonProcessingException e){
            throw new RuntimeException("JSON 파싱 실패");
        }
    }

    public Long getUserId() {
        return userId;
    }

    public String getEmail() {
        return email;
    }

    public String getName() {
        return name;
    }
}

 

 

<UserSignedUpEventConsumer>

user.signed-up 토픽을 구독하여 메시지를 수신할 수 있도록 했다.

메시지 처리에 실패할 경우 재시도 정책을 통해 5번까지 메시지 재처리하도록 하고, 모든 재시도에 실패할 경우 user.signed-up.dlt라는 DLT로 메시지를 이동 시킨다.

3개의 파티션에서 메시지를 병렬처리 하도록 하기 위해 concurrency값은 3으로 설정 하였다.

package com.example.email_service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Service;

@Service
public class UserSignedUpEventConsumer {

    private EmailLogRepository emailLogRepository;

    public UserSignedUpEventConsumer(EmailLogRepository emailLogRepository) {
        this.emailLogRepository = emailLogRepository;
    }

    @KafkaListener(
            topics = "user.signed-up", //이 토픽에서 메시지를 수신
            groupId = "email-service",
            concurrency="3" //파티션 개수만큼 설정
    )
    @RetryableTopic( //재시도 정책 정의
            attempts = "5",
            backoff = @Backoff(delay=1000, multiplier = 2),
            dltTopicSuffix = ".dlt"
    )
    public void consume(String message) throws InterruptedException {
        UserSignedUpEvent userSignedUpEvent = UserSignedUpEvent.fromJson(message); //수신한 메시지를 객체로 변환

        //실제 이메일 발송 로직은 생략
        String receiverEmail = userSignedUpEvent.getEmail();
        String subject = userSignedUpEvent.getName()+ "님, 회원 가입을 축하드립니다!";;
        Thread.sleep(3000);
        System.out.println("이메일 발송 완료");

        EmailLog emailLog = new EmailLog(
                userSignedUpEvent.getUserId(),
                receiverEmail,
                subject
                );

        emailLogRepository.save(emailLog);
    }
}

'BackEnd > Kafka' 카테고리의 다른 글

[Kafka] Kafka의 고가용성  (1) 2025.08.18
[Kafka] Kafka 메시지 처리 성능 향상 (Partition)  (2) 2025.08.15
[Kafka] Dead Letter Topic  (2) 2025.08.14
Spring Boot + Kafka 메시지 테스트  (2) 2025.08.14
Kafka란?  (1) 2025.08.12