1. 이벤트 흐름
지난 게시글에서는 결제를 생성한 뒤, "payment-completed" 토픽에 이벤트를 발행하는 것 까지 했다.
https://griotold.tistory.com/81
이번 게시글에서는 해당 토픽에서 이벤트를 수신하여 주문의 상태를 COMPLETE 로 바꾸고,
"order-completed" 토픽에 이벤트를 발행하여 알림 서비스에서 알림 발송하는 것까지 해보자.
https://github.com/Griotold/kafka-practice-sparta
2. OrderConsumer
package com.spartacoding.msa.order.infrastructure.messaging;
import com.spartacoding.msa.order.EventSerializer;
import com.spartacoding.msa.order.OrderTopic;
import com.spartacoding.msa.order.application.OrderApplicationService;
import com.spartacoding.msa.order.domain.Order;
import com.spartacoding.msa.order.domain.OrderRepository;
import com.spartacoding.msa.order.domain.OrderStatus;
import com.spartacoding.msa.order.events.OrderCompletedEvent;
import com.spartacoding.msa.order.events.PaymentSuccessEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderConsumer {
private final OrderRepository orderRepository;
private final OrderProducer orderProducer;
// private final OrderApplicationService orderApplicationService;
/**
* 결제 성공에 대한 이벤트 구독, 처리
* */
@Transactional
@KafkaListener(topics = "payment-completed")
public void listen(String kafkaMessage) {
log.info("OrderConsumer.listen.kafkaMessage: {}", kafkaMessage);
// 1. Order COMPLETED 로 업데이트
PaymentSuccessEvent paymentSuccessEvent = EventSerializer.deserialize(kafkaMessage, PaymentSuccessEvent.class);
Order order = orderRepository.findById(paymentSuccessEvent.id())
.orElseThrow(() -> new RuntimeException("Order not found"));
if ("COMPLETED".equals(paymentSuccessEvent.status())) {
order.updateStatus(OrderStatus.COMPLETED);
}
// 2. "order-completed" 토픽에 이벤트 발행
orderProducer.send(OrderTopic.ORDER_COMPLETED.getTopic(), OrderCompletedEvent.from(order));
}
}
- order의 상태를 COMPLETE로 업데이트 해준다.
- "order-completed" 토픽에 주문이 완료되었다는 이벤트를 발행한다.
3. OrderProducer
package com.spartacoding.msa.order.infrastructure.messaging;
import com.spartacoding.msa.order.EventSerializer;
import com.spartacoding.msa.order.events.OrderCompletedEvent;
import com.spartacoding.msa.order.events.OrderCreatedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderCreatedEvent send(String topic, OrderCreatedEvent event) {
String eventToJson = EventSerializer.serialize(event);
kafkaTemplate.send(topic, eventToJson);
log.info("OrderProducer Sent OrderCreatedEvent: " + event);
return event;
}
public void send(String topic, OrderCompletedEvent event) {
String eventToJson = EventSerializer.serialize(event);
kafkaTemplate.send(topic, eventToJson);
log.info("OrderProducer Sent OrderCompletedEvent: " + event);
}
}
OrderCompletedEvent 를 파라미터로 받는 send() 메서드를 통해 "order-completed" 토픽에 이벤트를 발행한다.
4. OrderCompletedEvent
package com.spartacoding.msa.order.events;
import com.spartacoding.msa.order.domain.Order;
import java.math.BigDecimal;
public record OrderCompletedEvent(
Long orderId,
String productId,
Integer quantity,
BigDecimal totalPrice,
String customerEmail,
String status
) {
public static OrderCompletedEvent from(Order order) {
return new OrderCompletedEvent(
order.getId(),
order.getProductId(),
order.getQuantity(),
order.getTotalPrice(),
order.getCustomerEmail(),
order.getStatus().name()
);
}
}
OrderCompletedEvent 에는 status 까지 포함되어 있다.
여기에는 무조건 COMPLETE 이 들어오게 된다.
5. NotificationConsumer
package com.spartacoding.msa.notification.infrastructure.messaging;
import com.spartacoding.msa.notification.EventSerializer;
import com.spartacoding.msa.notification.events.OrderCompletedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationConsumer {
/**
* 주문 완료에 대한 이벤트를 구독, 처리
* 주문 완료에 대한 알림 메시지는 로그로 출력
* */
@KafkaListener(topics = "order-completed")
public void listen(String kafkaMessage) {
log.info("NotificationConsumer.listen.kafkaMessage: {}", kafkaMessage);
OrderCompletedEvent orderCompletedEvent = EventSerializer.deserialize(kafkaMessage, OrderCompletedEvent.class);
log.info("NotificationConsumer.listen.orderCompletedEvent: {}", orderCompletedEvent);
}
}
알림 발송은 로그로 대체한다.
6. 결과 확인
주문을 등록하고 받은 응답은 status가 CREATED 이다.
하지만 주문을 등록하자마자 메시징 시스템에 의해 payment 서비스를 거쳐서 COMPLETED 가 된다.
6 - 1. Kafka UI
총 3개의 토픽이 생성되었다.
- order-completed
- order-created
- payment-completed
새롭게 생긴 order-completed 에는 status가 COMPLETED 라는 정보까지 들고 있다.
notification-msa 에서 로그도 잘 찍히고 있다.
'메시지큐 > kafka' 카테고리의 다른 글
Kafka를 사용하여 이벤트 중심 아키텍쳐 구현(Spring, MSA) - 2 (0) | 2025.01.15 |
---|---|
Kafka를 사용하여 이벤트 중심 아키텍쳐 구현(Spring, MSA) - 1 (0) | 2025.01.14 |