카프카(Kafka)란?
카프카(Kafka)는 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 설계된 고성능 분산 이벤트 스트리밍 플랫폼이다.
Kafka 구성요소
- Producer : Kafka에 이벤트를 Publish(게시)하는 클라이언트, topic에 데이터를 생성하고 전송하는 역할
- Consumer : 이벤트를 Subscribe(읽고 처리)하는 클라이언트, topic내부 파티션에 저장된 데이터를 가져오는 역할
Kafka에서는 Producer와 Consumer가 완전히 분리되어 있다.
이는 Kafka의 높은 확장성을 위한 핵심 요소이다.
- Topic : 데이터가 들어갈 수 있는 공간을 이야기하며, 여러 개의 토픽이 생성될 수 있다.
topic은 한개 이상의 파티션으로 구성되어 있다. 이때 파티션은 데이터를 분산처리하기 위한 단위로 볼 수 있다.
하나의 파티션은 큐(queue)와 같이 파티션 끝에서부터 데이터가 쌓이게 되고, consumer는 가장 오래된 순으로 데이터를 가져가게 된다.
kafka는 consumer가 데이터를 가져가도 파티션에 있는 데이터가 삭제되지 않는다는 특징이 있다.
때문에 동일 데이터에 대해 여러 번 처리를 할 수 있으며, 이는 Kafka를 사용하는 중요한 이유 중 하나이다.
Kafka Cluster
Kafka의 주요 구성 요소중 Broker, Zookeeper는 카프카 클러스터를 구성하는 것들이다. Apache Zookeeper는 카프카 클러스터 윗 단에서 역할을 하고 Broker는 카프카 클러스터 내에서 역할을 한다.
Zookeeper의 역할
- 브로커 상태 관리 (Broker Discovery & Coordination)
- 리더 선출 (Leader Election) :각 파티션에는 리더가 존재하는데, 이 리더를 Zookeeper가 선출한다.
- 파티션 메타데이터 저장
- 컨피규레이션 저장소
Broker의 역할
- Producer가 전송한 메시지를 저장
- Consumer가 요청하면 저장된 메시지를 전송
Broker는 쉽게 말해서 Producer와 Consumer 사이에서 전달자(중계자) 역할을 한다.
Kafka 와 Spring Boot 애플리케이션 연동
Kafka Broker를 Docker로 실행하기 위해 docker-compose.yml을 만들어주고 Docker에서 실행해준다.
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.4.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Consumer
<application.yml>
server:
port: 8090
spring:
application:
name: consumer_application
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: consumer_group01
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
<KafkaConsumerConfig>
// Kafka 관련 설정을 위한 클래스
@EnableKafka // Kafka 리스너 사용을 위한 어노테이션
@Configuration
public class KafkaConsumerConfig {
private final Environment env;
// 생성자를 통해 application.yml 환경 변수 주입
public KafkaConsumerConfig(Environment env) {
this.env = env;
}
// Kafka Consumer 설정값 정의
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.consumer.bootstrap-servers")); // Kafka 서버 주소
props.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id")); // Consumer 그룹 ID
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("spring.kafka.consumer.auto-offset-reset")); // 메시지 읽기 시작 위치 설정
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 메시지 키 역직렬화 방식
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 메시지 값 역직렬화 방식
return props;
}
// Kafka ConsumerFactory 생성 (Consumer 인스턴스 생성 담당)
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
// Kafka Listener Container Factory 생성
// @KafkaListener가 메시지를 수신할 수 있도록 컨테이너 설정
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
<KafkaConsumerService>
// Kafka에서 메시지를 수신하는 서비스 클래스
@Service
@Slf4j
public class KafkaConsumerService {
// Kafka에서 "my-topic"으로 들어오는 메시지를 수신
// consumer_group01 그룹에 속함
@KafkaListener(topics = "my-topic", groupId = "consumer_group01")
public void consume(String message) {
// 수신한 메시지를 로그로 출력
log.info("Consumed Message : {}", message);
}
}
Producer
<application.yml>
server:
port: 8080
spring:
application:
name: producer_application
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic:
name: my-topic
<KafkaProducerConfig>
// Kafka Producer 설정 클래스
@Configuration
public class KafkaProducerConfig {
// application.yml 등의 설정 값을 가져오기 위한 Environment 주입
private final Environment env;
public KafkaProducerConfig(Environment environment) {
this.env = environment;
}
// Kafka Producer 관련 설정을 담은 Bean
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>();
// Kafka 서버 주소 설정 (e.g., localhost:9092)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
env.getProperty("spring.kafka.producer.bootstrap-servers"));
// 메시지 키 직렬화 방식 (문자열)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 메시지 값 직렬화 방식 (문자열)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
// ProducerFactory Bean 생성 (KafkaTemplate에서 사용됨)
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(this.producerConfig());
}
// KafkaTemplate Bean 생성: 실제 메시지를 전송할 때 사용
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(this.producerFactory());
}
}
이 설정을 통해 Spring Boot 애플리케이션에서 Kafka로 메시지를 전송할 수 있는 환경이 갖추게 된다.
<KafkaProducerService>
// Kafka로 메시지를 전송하는 서비스 클래스
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducerService {
// application.yml에서 설정한 topic.name 값을 주입
@Value("${topic.name}")
private String topicName;
// 메시지를 실제로 전송하는 KafkaTemplate
private final KafkaTemplate<String, String> kafkaTemplate;
// 메시지를 Kafka로 전송하는 메서드
public void sendMessageToKafka(String message) {
// 전송할 메시지를 로그로 출력
log.info("Producer Message : {}", message);
// 지정된 토픽으로 메시지 전송
kafkaTemplate.send(topicName, message);
}
}
<KafkaProducerController>
@RestController
@RequiredArgsConstructor
public class KafkaProducerController {
private final KafkaProducerService kafkaProducerService;
@PostMapping("/api/comment")
public ResponseEntity<Void> sendMessage(@RequestParam String message) {
kafkaProducerService.sendMessageToKafka(message);
return ResponseEntity.ok().build();
}
}
테스트
실행 결과 Consumer에서 정상적으로 메시지가 도착한 것을 확인할 수 있었다.
'9oormthonUNIV 스터디 프로젝트' 카테고리의 다른 글
GitHub Actions + Docker로 CI/CD 구축 (0) | 2025.05.25 |
---|---|
Jenkins + Docker를 활용한 기본 CI/CD 파이프라인 구축 (0) | 2025.05.10 |
Spring Boot 프로젝트를 AWS EC2에 Docker로 배포하기 (0) | 2025.04.30 |
S3를 이용한 이미지 관리[SpringBoot] (0) | 2025.04.15 |