1. 이벤트 흐름
https://github.com/Griotold/kafka-practice-sparta
https://griotold.tistory.com/80
지난 게시글에서는 order MSA에서 주문 생성시 "order-created" 토픽에 메시지를 전달하는 것 까지했다.
이번 게시글에서는 payment MSA에서 메시지를 수신하고,
결제를 생성한 뒤 "payment-completed" 토픽에 메시지를 전달하는 것을 구현해보자.
2. payment MSA의 application.yml
spring:
application:
name: payment-msa
datasource:
url: jdbc:h2:mem:testdb
driverClassName: org.h2.Driver
username: sa
password:
h2:
console:
enabled: true
jpa:
hibernate:
ddl-auto: update
show-sql: true
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: ${spring.application.name}-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: '*'
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
server:
port: 8082
logging:
level:
root: INFO
order-msa 와 비교해볼 때, 바뀐 부분은 application.name 밖에 없다.
3. PaymentTopic
package com.spartacoding.msa.payment;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 토픽 이름이 많아지면 enum 으로 관리한다.
* */
@Getter
@AllArgsConstructor
public enum PaymentTopic {
// 주문 생성 이벤트 (Order MSA가 발행)
ORDER_CREATED("order-created"),
// 결제 완료 이벤트 (Payment MSA가 발행)
PAYMENT_COMPLETED("payment-completed")
;
private final String topic;
}
"order-created" 토픽에서 메시지를 수신해야 하고,
"payment-completed" 토픽에 메시지를 발행해야 한다.
4. OrderCreatedEvent
package com.spartacoding.msa.payment.events;
import java.math.BigDecimal;
public record OrderCreatedEvent(
Long orderId,
String productId,
Integer quantity,
BigDecimal totalPrice,
String customerEmail
) {
}
order-msa의 있는 객체와 동일하게 맞춰준다.
5. PaymentSuccessEvent
package com.spartacoding.msa.payment.events;
import com.spartacoding.msa.payment.domain.Payment;
import java.math.BigDecimal;
public record PaymentSuccessEvent(
Long id,
Long orderId,
BigDecimal amount,
String status
) {
public static PaymentSuccessEvent from(Payment payment) {
return new PaymentSuccessEvent(
payment.getId(),
payment.getOrderId(),
payment.getAmount(),
payment.getStatus().name()
);
}
}
"payment-completed" 토픽에 발행할 이벤트이다.
6. PaymentConsumer
package com.spartacoding.msa.payment.infrastructure.messaging;
import com.spartacoding.msa.payment.EventSerializer;
import com.spartacoding.msa.payment.PaymentTopic;
import com.spartacoding.msa.payment.application.PaymentApplicationService;
import com.spartacoding.msa.payment.domain.Payment;
import com.spartacoding.msa.payment.domain.PaymentRepository;
import com.spartacoding.msa.payment.events.OrderCreatedEvent;
import com.spartacoding.msa.payment.events.PaymentSuccessEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class PaymentConsumer {
private final PaymentRepository paymentRepository;
private final PaymentProducer paymentProducer;
/**
* 주문 생성에 대한 이벤트 구독, 처리
* */
@KafkaListener(topics = "order-created")
public void processPayment(String kafkaMessage) {
log.info("processPayment.kafkaMessage = {}", kafkaMessage);
// 1. DB 저장
OrderCreatedEvent orderCreatedEvent = EventSerializer.deserialize(kafkaMessage, OrderCreatedEvent.class);
Payment payment = Payment.create(orderCreatedEvent.orderId(), orderCreatedEvent.totalPrice());
paymentRepository.save(payment);
PaymentSuccessEvent paymentSuccessEvent = PaymentSuccessEvent.from(payment);
// 2. 결제 성공 이벤트 발행
paymentProducer.send(PaymentTopic.PAYMENT_COMPLETED.getTopic(), paymentSuccessEvent);
}
}
"order-created" 를 수신하는 Consumer이다.
- DB에 Payment를 저장하고
- "payment-complted" 토픽에 결제 성공 이벤트를 발행한다.
7. PaymentProducer
package com.spartacoding.msa.payment.infrastructure.messaging;
import com.spartacoding.msa.payment.EventSerializer;
import com.spartacoding.msa.payment.events.PaymentSuccessEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class PaymentProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, PaymentSuccessEvent event) {
String eventToJson = EventSerializer.serialize(event);
kafkaTemplate.send(topic, eventToJson);
log.info("Payment Producer Sent payment event: " + eventToJson);
}
}
"payment-completed" 토픽에 이벤트를 발행한다.
해당 토픽은 order-msa 와 notification-msa가 수신할 것이다.
8. EveneSerializer
package com.spartacoding.msa.payment;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EventSerializer {
private static final Logger logger = LoggerFactory.getLogger(EventSerializer.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
// 직렬화 (객체 -> JSON 문자열)
public static <T> String serialize(T object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
logger.error("Failed to serialize object: {}", object, e);
throw new RuntimeException("Serialization error", e);
}
}
// 역직렬화 (JSON 문자열 -> 객체)
public static <T> T deserialize(String json, Class<T> clazz) {
try {
return objectMapper.readValue(json, clazz);
} catch (JsonProcessingException e) {
logger.error("Failed to deserialize JSON: {}", json, e);
throw new RuntimeException("Deserialization error", e);
}
}
}
지난 블로그에도 있던 클래스라 설명은 생략하기로 한다.
9. 동작 확인
주문을 등록했을 때 "order-created" 토픽에 이벤트가 발행되고,
그 이벤트를 수신하여 결제 생성 -> "payment-completed" 토픽에 이벤트가 발행되는지를 확인해보자.
일단, "payment-completed" 토픽이 생겼다.
kafka-ui에 대해서는 지난 블로그에 적어놨으니 지난 블로그를 참고하자.
"payment-completed" 토픽에 메세지가 정상적으로 발행되었다.
"order-created" 토픽에서 이벤트를 수신했다고 로그가 찍혔고,
"payment-completed" 토픽에 이벤트가 발행되었따고 로그가 기록되었다.
'메시지큐 > kafka' 카테고리의 다른 글
Kafka를 사용하여 이벤트 중심 아키텍쳐 구현(Spring, MSA) - 2 (1) | 2025.01.16 |
---|---|
Kafka를 사용하여 이벤트 중심 아키텍쳐 구현(Spring, MSA) - 1 (0) | 2025.01.14 |