메시지큐/kafka

Kafka를 사용하여 이벤트 중심 아키텍쳐 구현(Spring, MSA) - 3

Griotold 2025. 1. 16. 12:40

1. 이벤트 흐름

지난 게시글에서는 결제를 생성한 뒤, "payment-completed" 토픽에 이벤트를 발행하는 것 까지 했다.

https://griotold.tistory.com/81

 

이번 게시글에서는 해당 토픽에서 이벤트를 수신하여 주문의 상태를 COMPLETE 로 바꾸고,

"order-completed" 토픽에 이벤트를 발행하여 알림 서비스에서 알림 발송하는 것까지 해보자.

https://github.com/Griotold/kafka-practice-sparta

 

GitHub - Griotold/kafka-practice-sparta

Contribute to Griotold/kafka-practice-sparta development by creating an account on GitHub.

github.com

 

2. OrderConsumer

package com.spartacoding.msa.order.infrastructure.messaging;

import com.spartacoding.msa.order.EventSerializer;
import com.spartacoding.msa.order.OrderTopic;
import com.spartacoding.msa.order.application.OrderApplicationService;
import com.spartacoding.msa.order.domain.Order;
import com.spartacoding.msa.order.domain.OrderRepository;
import com.spartacoding.msa.order.domain.OrderStatus;
import com.spartacoding.msa.order.events.OrderCompletedEvent;
import com.spartacoding.msa.order.events.PaymentSuccessEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderConsumer {

    private final OrderRepository orderRepository;
    private final OrderProducer orderProducer;
//    private final OrderApplicationService orderApplicationService;

    /**
     * 결제 성공에 대한 이벤트 구독, 처리
     * */
    @Transactional
    @KafkaListener(topics = "payment-completed")
    public void listen(String kafkaMessage) {
        log.info("OrderConsumer.listen.kafkaMessage: {}", kafkaMessage);

        // 1. Order COMPLETED 로 업데이트
        PaymentSuccessEvent paymentSuccessEvent = EventSerializer.deserialize(kafkaMessage, PaymentSuccessEvent.class);

        Order order = orderRepository.findById(paymentSuccessEvent.id())
                .orElseThrow(() -> new RuntimeException("Order not found"));

        if ("COMPLETED".equals(paymentSuccessEvent.status())) {
            order.updateStatus(OrderStatus.COMPLETED);
        }

        // 2. "order-completed" 토픽에 이벤트 발행
        orderProducer.send(OrderTopic.ORDER_COMPLETED.getTopic(), OrderCompletedEvent.from(order));
    }
}

 

  1. order의 상태를 COMPLETE로 업데이트 해준다.
  2. "order-completed" 토픽에 주문이 완료되었다는 이벤트를 발행한다.

 

3. OrderProducer

package com.spartacoding.msa.order.infrastructure.messaging;

import com.spartacoding.msa.order.EventSerializer;
import com.spartacoding.msa.order.events.OrderCompletedEvent;
import com.spartacoding.msa.order.events.OrderCreatedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RequiredArgsConstructor
public class OrderProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderCreatedEvent send(String topic, OrderCreatedEvent event) {

        String eventToJson = EventSerializer.serialize(event);
        kafkaTemplate.send(topic, eventToJson);
        log.info("OrderProducer Sent OrderCreatedEvent: " + event);

        return event;
    }

    public void send(String topic, OrderCompletedEvent event) {

        String eventToJson = EventSerializer.serialize(event);
        kafkaTemplate.send(topic, eventToJson);
        log.info("OrderProducer Sent OrderCompletedEvent: " + event);
    }

}

 

OrderCompletedEvent 를 파라미터로 받는 send() 메서드를 통해 "order-completed" 토픽에 이벤트를 발행한다.

 

4. OrderCompletedEvent

package com.spartacoding.msa.order.events;

import com.spartacoding.msa.order.domain.Order;

import java.math.BigDecimal;

public record OrderCompletedEvent(
        Long orderId,
        String productId,
        Integer quantity,
        BigDecimal totalPrice,
        String customerEmail,
        String status
) {
    public static OrderCompletedEvent from(Order order) {
        return new OrderCompletedEvent(
                order.getId(),
                order.getProductId(),
                order.getQuantity(),
                order.getTotalPrice(),
                order.getCustomerEmail(),
                order.getStatus().name()
        );
    }
}

 

OrderCompletedEvent 에는 status 까지 포함되어 있다. 

여기에는 무조건 COMPLETE 이 들어오게 된다.

 

5. NotificationConsumer

package com.spartacoding.msa.notification.infrastructure.messaging;

import com.spartacoding.msa.notification.EventSerializer;
import com.spartacoding.msa.notification.events.OrderCompletedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationConsumer {

    /**
     * 주문 완료에 대한 이벤트를 구독, 처리
     * 주문 완료에 대한 알림 메시지는 로그로 출력
     * */

    @KafkaListener(topics = "order-completed")
    public void listen(String kafkaMessage) {
        log.info("NotificationConsumer.listen.kafkaMessage: {}", kafkaMessage);

        OrderCompletedEvent orderCompletedEvent = EventSerializer.deserialize(kafkaMessage, OrderCompletedEvent.class);

        log.info("NotificationConsumer.listen.orderCompletedEvent: {}", orderCompletedEvent);
    }
}

 

알림 발송은 로그로 대체한다.

 

6. 결과 확인

 

주문을 등록하고 받은 응답은 status가 CREATED 이다.

하지만 주문을 등록하자마자 메시징 시스템에 의해 payment 서비스를 거쳐서 COMPLETED 가 된다.

 

6 - 1. Kafka UI

 

총 3개의 토픽이 생성되었다.

 

  • order-completed
  • order-created
  • payment-completed

 

새롭게 생긴 order-completed 에는 status가 COMPLETED 라는 정보까지 들고 있다.

 

 

notification-msa 에서 로그도 잘 찍히고 있다.