1. 이벤트 흐름
Eureka, gateway, order MSA, Payment MSA, Notification MSA
로 동작하는 MSA를 개발하고, 카프카를 통해 이벤트 중심 아키텍쳐를 구현해보자
이번 블로그에서는 order MSA를 중심으로 다룰 것이다.
https://github.com/Griotold/kafka-practice-sparta
2. docker-compose.yml
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: sparta-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
kafka:
image: confluentinc/cp-kafka:latest
container_name: sparta-kafka
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: sparta-kafka-ui
depends_on:
- kafka
ports:
- 8080:8080
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
zipkin:
image: openzipkin/zipkin:latest
container_name: sparta-zipkin
ports:
- "9411:9411"
실습을 위해 kafka, kafka-ui, zookeeper, zipkin을 docker-compose 기술을 활용하여 띄워둔다.
docker-compose up
2 - 1. 잠깐! Kafka의 포트는 왜 9092, 29092 두 개 일까?
`9092`와 `29092`는 서로 다른 네트워크 환경에서 클라이언트 연결을 처리하기 위해 사용된다.
9092: 외부 접근용
- 9092 포트는 Docker 컨테이너 외부에서 Kafka에 연결하는 클라이언트를 위해 사용된다.
- 예를 들어, 호스트 머신에서 실행중인 애플리케이션이나 외부 시스템이 Kafka와 통신할 때 9092를 사용한다.
29092: 내부 접근용
29092 포트는 Docker 네트워크 내부에서 Kafka에 연결하는 클라이언트를 위해 사용된다.
예를 들어, Docker Compose 환경 내의 kafka-ui가 Kafka와 통신할 때 이 포트를 사용하고 있다.
3. order MSA 의 build.gradle
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.3'
id 'io.spring.dependency-management' version '1.1.6'
}
group = 'com.spartacoding.msa'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "2023.0.3")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'io.zipkin.reporter2:zipkin-reporter-brave'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'io.micrometer:micrometer-tracing-bridge-brave'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'com.h2database:h2'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
tasks.named('test') {
useJUnitPlatform()
}
spring-kafka 의존성이 들어가 있다.
4. application.yml
spring:
application:
name: order-msa
datasource:
url: jdbc:h2:mem:testdb
driverClassName: org.h2.Driver
username: sa
password:
h2:
console:
enabled: true
jpa:
hibernate:
ddl-auto: update
show-sql: true
kafka:
bootstrap-servers: localhost:9092
consumer: # TODO kafka consumer, producer에 대한 설정값 수정은 여기에서
group-id: ${spring.application.name}-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: '*'
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
server:
port: 8083
kafka로 시작하는 부분을 보면, kafka 를 사용하기 위한 설정값들을 지정할 수 있다.
위 처럼, application.yml에서 지정해줘도 되지만, 자바 코드를 사용하여 아래처럼 사용할 수도 있다.
둘 중 하나를 선택하면 된다.
참고) KafkaProducerConfig
package com.example.order.messagequeue;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
5. OrderTopic
package com.spartacoding.msa.order;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 토픽 이름이 많아지면 enum으로 관리한다.
* */
@Getter
@AllArgsConstructor
public enum OrderTopic {
// 주문 생성 이벤트(Order MSA가 발행)
ORDER_CREATED("order-created"),
// 결제 완료 이벤트 (Payment MSA가 발행)
PAYMENT_COMPLETED("payment-completed"),
;
private final String topic;
}
토픽은 많아질 것을 대비하여 enum으로 관리하는 것이 좋다.
주문이 생성될 때, "order-created" 토픽에 이벤트를 발행할 것이기 때문에 미리 지정해놓았고,
payment MSA가 결제를 완료하면, "payment-completed" 토픽에 이벤트를 발행할 것이고,
order MSA가 이를 처리할 것이기 때문에 미리 지정해 놓았다.
6. EventSerializer
package com.spartacoding.msa.order;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EventSerializer {
private static final Logger logger = LoggerFactory.getLogger(EventSerializer.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
// 직렬화 (객체 -> JSON 문자열)
public static <T> String serialize(T object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
logger.error("Failed to serialize object: {}", object, e);
throw new RuntimeException("Serialization error", e);
}
}
// 역직렬화 (JSON 문자열 -> 객체)
public static <T> T deserialize(String json, Class<T> clazz) {
try {
return objectMapper.readValue(json, clazz);
} catch (JsonProcessingException e) {
logger.error("Failed to deserialize JSON: {}", json, e);
throw new RuntimeException("Deserialization error", e);
}
}
}
스프링 앱과 카프카끼리 통신하기 위해서 자바 객체 <---> JSON 으로 변환을 해줘야한다.
이를 직렬화, 역직렬화라고 한다.
위 처럼, 유틸 클래스를 선언해서 사용하면 편한다.
7. OrderProducer
package com.spartacoding.msa.order.infrastructure.messaging;
import com.spartacoding.msa.order.EventSerializer;
import com.spartacoding.msa.order.events.OrderCreatedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderCreatedEvent send(String topic, OrderCreatedEvent event) {
String eventToJson = EventSerializer.serialize(event);
kafkaTemplate.send(topic, event.toString());
log.info("Order Producer Sent order event: " + event);
return event;
}
/**
* 재시도 로직
* */
public void send2(String topic, OrderCreatedEvent event) {
String eventToJson = EventSerializer.serialize(event);
int maxRetries = 3;
int retryDelay = 1000; // 1초
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
kafkaTemplate.send(topic, eventToJson).get(); // 동기 호출로 결과 확인
log.info("Order Producer Sent order event: {}", eventToJson);
return; // 성공하면 메서드 종료
} catch (Exception e) {
log.error("Attempt {} failed to send message to topic {}: {}", attempt, topic, e.getMessage());
if (attempt == maxRetries) {
throw new RuntimeException("Failed to send message after retries", e);
}
try {
Thread.sleep(retryDelay); // 재시도 간격
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Retry interrupted", ie);
}
}
}
}
//@Retryable(
// value = {Exception.class}, // 재시도 대상 예외
// maxAttempts = 3, // 최대 재시도 횟수
// backoff = @Backoff(delay = 2000) // 재시도 간격 (2초)
// )
// public OrderCreatedEvent send3(String topic, OrderCreatedEvent event) {
// String eventToJson = EventSerializer.serialize(event);
//
// // Kafka 메시지 전송
// kafkaTemplate.send(topic, eventToJson);
// log.info("Order Producer Sent order event: {}", eventToJson);
//
// return event;
// }
}
7 - 1. send()
KafkaTemplate를 통해 지정한 토픽으로 이벤트를 발행할 수 있다.
이벤트를 발행하기 전에 위에서 본 EventSerializer를 통해 변환해주고 발행해준다.
7 - 2. send2()
토픽에 이벤트를 발행하는 것이 실패했을 때를 대비해서 재시도 로직을 추가한 메서드이다.
1초를 쉬었다가 다시 발행하고, 3번동안 문제가 발생하면 예외를 발생시킨다.
7 - 3. send3()
주석 처리 되어 있는 send3() Spring-retry 프로젝트를 의존해서 재시도 로직을 수행할 수 있는 코드이다.
https://www.baeldung.com/spring-retry
8. OrderConsumer
package com.spartacoding.msa.order.infrastructure.messaging;
import com.spartacoding.msa.order.application.OrderApplicationService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class OrderConsumer {
private final OrderApplicationService orderApplicationService;
// TODO order-msa가 구독하는 이벤트를 처리하는 로직을 개발해주세요
}
이거는 차차 구현하기로 하자.
결제가 완료되면 결제 완료 이벤트를 처리하는 로직이 들어가야 할 것이다.
9. OrderApplicationService
package com.spartacoding.msa.order.application;
import com.spartacoding.msa.order.OrderTopic;
import com.spartacoding.msa.order.domain.Order;
import com.spartacoding.msa.order.domain.OrderRepository;
import com.spartacoding.msa.order.domain.OrderStatus;
import com.spartacoding.msa.order.dto.OrderCreateDto;
import com.spartacoding.msa.order.dto.OrderResponseDto;
import com.spartacoding.msa.order.events.OrderCreatedEvent;
import com.spartacoding.msa.order.infrastructure.messaging.OrderProducer;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Transactional(readOnly = true)
@Service
@RequiredArgsConstructor
public class OrderApplicationService {
private final OrderRepository orderRepository;
private final OrderProducer orderProducer;
@Transactional
public OrderResponseDto createOrder(OrderCreateDto orderCreateDto) {
log.info("createOrder.OrderCreateDto = {}", orderCreateDto);
// 1. DB 저장
Order order = Order.create(
orderCreateDto.getProductId(),
orderCreateDto.getQuantity(),
OrderStatus.CREATED,
orderCreateDto.getCustomerEmail(),
orderCreateDto.getTotalPrice());
orderRepository.save(order);
// 2. 카프카 발행
OrderCreatedEvent event = OrderCreatedEvent.from(order);
log.info("createOrder.OrderCreatedEvent = {}", event);
orderProducer.send(OrderTopic.ORDER_CREATED.getTopic(), event);
return OrderResponseDto.from(order);
}
public void completeOrder() {
// TODO 주문이 성공적으로 완료된 로직과, 이벤트를 발행해 주세요
}
public List<OrderResponseDto> getAllOrders() {
List<Order> orders = orderRepository.findAll();
return orders.stream()
.map(this::convertToDto)
.collect(Collectors.toList());
}
private OrderResponseDto convertToDto(Order order) {
return new OrderResponseDto(
order.getId(),
order.getProductId(),
order.getQuantity(),
order.getTotalPrice(),
order.getStatus().name(),
order.getCustomerEmail()
);
}
}
createOrder
주문 요청이 들어오면, 1. DB 저장, 2. Kafka Topic에 이벤트 발행
completeOrder
결제 처리가 된 경우 이벤트를 수신해서 처리하는 로직이 구현되어야 하고, 그거는 차차 하기로 하자.
10. 동작 확인
서버를 동작 시킨 후, 주문 등록을 호출해보자
OrderProducer 에서 OrderCreatedEvent 를 보냈다고 로그가 찍혔고,
Kafka -UI 에 도 order-created 토픽에 이벤트가 동일하게 들어 있음을 확인할 수 있다.
/localhost:8080
'메시지큐 > kafka' 카테고리의 다른 글
Kafka를 사용하여 이벤트 중심 아키텍쳐 구현(Spring, MSA) - 2 (1) | 2025.01.16 |
---|---|
Kafka를 사용하여 이벤트 중심 아키텍쳐 구현(Spring, MSA) - 2 (0) | 2025.01.15 |