메시지큐/kafka

Kafka를 사용하여 이벤트 중심 아키텍쳐 구현(Spring, MSA) - 2

Griotold 2025. 1. 15. 12:07

 

1. 이벤트 흐름

https://github.com/Griotold/kafka-practice-sparta

 

 

https://griotold.tistory.com/80

 

지난 게시글에서는  order MSA에서 주문 생성시 "order-created" 토픽에 메시지를 전달하는 것 까지했다.

 

이번 게시글에서는 payment MSA에서 메시지를 수신하고,

결제를 생성한 뒤 "payment-completed" 토픽에 메시지를 전달하는 것을 구현해보자.

 

2. payment MSA의 application.yml

spring:
  application:
    name: payment-msa
  datasource:
    url: jdbc:h2:mem:testdb
    driverClassName: org.h2.Driver
    username: sa
    password:
  h2:
    console:
      enabled: true
  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: ${spring.application.name}-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: '*'
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/
server:
  port: 8082
logging:
  level:
    root: INFO

 

order-msa 와 비교해볼 때, 바뀐 부분은 application.name 밖에 없다.

 

3. PaymentTopic

package com.spartacoding.msa.payment;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * 토픽 이름이 많아지면 enum 으로 관리한다.
 * */
@Getter
@AllArgsConstructor
public enum PaymentTopic {
    // 주문 생성 이벤트 (Order MSA가 발행)
    ORDER_CREATED("order-created"),

    // 결제 완료 이벤트 (Payment MSA가 발행)
    PAYMENT_COMPLETED("payment-completed")
    ;

    private final String topic;

}

 

"order-created" 토픽에서 메시지를 수신해야 하고,

"payment-completed" 토픽에 메시지를 발행해야 한다.

 

4. OrderCreatedEvent

package com.spartacoding.msa.payment.events;

import java.math.BigDecimal;

public record OrderCreatedEvent(
        Long orderId,
        String productId,
        Integer quantity,
        BigDecimal totalPrice,
        String customerEmail
) {
}

 

order-msa의 있는 객체와 동일하게 맞춰준다.

 

5. PaymentSuccessEvent

package com.spartacoding.msa.payment.events;

import com.spartacoding.msa.payment.domain.Payment;

import java.math.BigDecimal;

public record PaymentSuccessEvent(
        Long id,
        Long orderId,
        BigDecimal amount,
        String status
) {

    public static PaymentSuccessEvent from(Payment payment) {
        return new PaymentSuccessEvent(
                payment.getId(),
                payment.getOrderId(),
                payment.getAmount(),
                payment.getStatus().name()
        );
    }
}

 

"payment-completed" 토픽에 발행할 이벤트이다.

 

6. PaymentConsumer

package com.spartacoding.msa.payment.infrastructure.messaging;

import com.spartacoding.msa.payment.EventSerializer;
import com.spartacoding.msa.payment.PaymentTopic;
import com.spartacoding.msa.payment.application.PaymentApplicationService;
import com.spartacoding.msa.payment.domain.Payment;
import com.spartacoding.msa.payment.domain.PaymentRepository;
import com.spartacoding.msa.payment.events.OrderCreatedEvent;
import com.spartacoding.msa.payment.events.PaymentSuccessEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class PaymentConsumer {

    private final PaymentRepository paymentRepository;
    private final PaymentProducer paymentProducer;

    /**
     * 주문 생성에 대한 이벤트 구독, 처리
     * */
    @KafkaListener(topics = "order-created")
    public void processPayment(String kafkaMessage) {
        log.info("processPayment.kafkaMessage =  {}", kafkaMessage);

        // 1. DB 저장
        OrderCreatedEvent orderCreatedEvent = EventSerializer.deserialize(kafkaMessage, OrderCreatedEvent.class);

        Payment payment = Payment.create(orderCreatedEvent.orderId(), orderCreatedEvent.totalPrice());

        paymentRepository.save(payment);

        PaymentSuccessEvent paymentSuccessEvent = PaymentSuccessEvent.from(payment);

        // 2. 결제 성공 이벤트 발행
        paymentProducer.send(PaymentTopic.PAYMENT_COMPLETED.getTopic(), paymentSuccessEvent);
    }
}

 

"order-created" 를 수신하는 Consumer이다.

  1. DB에 Payment를 저장하고
  2. "payment-complted" 토픽에 결제 성공 이벤트를 발행한다.

 

7. PaymentProducer

package com.spartacoding.msa.payment.infrastructure.messaging;

import com.spartacoding.msa.payment.EventSerializer;
import com.spartacoding.msa.payment.events.PaymentSuccessEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class PaymentProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, PaymentSuccessEvent event) {
        String eventToJson = EventSerializer.serialize(event);
        kafkaTemplate.send(topic, eventToJson);
        log.info("Payment Producer Sent payment event: " + eventToJson);
    }
}

 

"payment-completed" 토픽에 이벤트를 발행한다.

해당 토픽은 order-msa 와 notification-msa가 수신할 것이다.

 

8. EveneSerializer

package com.spartacoding.msa.payment;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventSerializer {

    private static final Logger logger = LoggerFactory.getLogger(EventSerializer.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();

    // 직렬화 (객체 -> JSON 문자열)
    public static <T> String serialize(T object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            logger.error("Failed to serialize object: {}", object, e);
            throw new RuntimeException("Serialization error", e);
        }
    }

    // 역직렬화 (JSON 문자열 -> 객체)
    public static <T> T deserialize(String json, Class<T> clazz) {
        try {
            return objectMapper.readValue(json, clazz);
        } catch (JsonProcessingException e) {
            logger.error("Failed to deserialize JSON: {}", json, e);
            throw new RuntimeException("Deserialization error", e);
        }
    }
}

 

지난 블로그에도 있던 클래스라 설명은 생략하기로 한다.

 

9. 동작 확인

 

 

주문을 등록했을 때 "order-created" 토픽에 이벤트가 발행되고, 

그 이벤트를 수신하여 결제 생성 -> "payment-completed" 토픽에 이벤트가 발행되는지를 확인해보자.

 

 

일단, "payment-completed" 토픽이 생겼다.

kafka-ui에 대해서는 지난 블로그에 적어놨으니 지난 블로그를 참고하자.

 

 

"payment-completed" 토픽에 메세지가 정상적으로 발행되었다.

 

 

"order-created" 토픽에서 이벤트를 수신했다고 로그가 찍혔고,

"payment-completed" 토픽에 이벤트가 발행되었따고 로그가 기록되었다.