Saga Pattern(사가 패턴)
분산된 여러 서비스가 하나의 트랜잭션처럼 동작해야 하는 상황이 있습니다. 그러나 전통적인 데이터베이스 트랜잭션과 달리, 마이크로서비스 환경에서는 이를 일관되게 처리하기 어렵습니다.
Saga Pattern은 이러한 문제를 해결하기 위해 도입되었습니다. 이 패턴은 일련의 작은 트랜잭션으로 대규모 트랜잭션을 나눕니다. 각 작은 트랜잭션은 독립적으로 커밋되고, 만약 중간에 실패가 발생하면 보상 트랜잭션을 실행하여 상태를 이전으로 복구합니다.
사용 코드 및 아키텍처
Saga 패턴의 동작을 확인하기 위한 코드는 크게 3개의 애플리케이션이 존재합니다.
- OrderApplication - 주문 App
- ProductApplication - 상품 App
- PaymentApplication - 결제 App
아키텍처는 위와 같습니다.
-
- Order 저장
- Order 애플리케이션에서 주문 데이터를 Map 메모리에 저장합니다.
- Order → Product
- Order 애플리케이션이 market.product 큐에 주문 데이터를 전송합니다.
- Product 컨슈밍
- Product 애플리케이션이 market.product 큐에서 데이터를 컨슈밍하여 처리합니다.
- Product → Payment
- Product 애플리케이션이 주문 데이터를 market.payment 큐에 전송합니다.
- Payment 컨슈밍
- Payment 애플리케이션이 market.payment 큐에서 데이터를 컨슈밍하여 결제 처리를 시도합니다.
- Payment 에러 발생 시
- 결제 처리 중 에러가 발생하면 Payment 애플리케이션이 에러 메시지를 market.err.product 에러 큐에 전송합니다.
- Product 에러 처리
- Product 애플리케이션이 market.err.product 큐에서 에러 메시지를 컨슈밍하여 이를 market.err.order 에러 큐로 전송합니다.
- Order 에러 처리
- Order 애플리케이션이 market.err.order 큐에서 에러 메시지를 컨슈밍하여 해당 주문의 취소 작업(보상 트랜잭션)을 수행하고, Order Store의 데이터를 업데이트합니다.
- Order 저장
각 Exchange를 추가하면 위와 같은 아키텍처가 형성됩니다.
OrderApplication
dependencies
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
application.properties
- Spring 애플리케이션 이름
- order
- RabbitMQ 익스체인지
- market
- market.err (에러 처리용)
- RabbitMQ 큐
- market.product
- market.payment
- market.err.order (에러 처리용)
- market.err.product (에러 처리용)
- RabbitMQ 서버
- 호스트: localhost
- 포트: 5672
- 사용자 이름: guest
- 비밀번호: guest
Order
- 주요 필드
- UUID orderId: 주문 ID
- String userId: 사용자 ID
- String orderStatus: 주문 상태
- String errorType: 에러 유형 (취소 시 어떤 에러로 인해 취소되었는지 기록)
- 주요 메서드
- cancleOrder(String receiveErrorType): 주문을 취소하고, 에러 유형을 기록하는 메서드
OrderApplicationQueueConfig
OrderApplicationQueueConfig 클래스는 RabbitMQ의 큐, 익스체인지, 바인딩 설정을 구성하는 코드입니다. 이 코드는 RabbitMQ를 사용하여 메시지를 주고받기 위한 설정을 정의합니다.
- Jackson2JsonMessageConverter
- 메시지를 JSON 형식으로 직렬화/역직렬화하기 위해 Jackson2JsonMessageConverter를 Bean으로 등록합니다.
- 이 설정을 통해 메시지를 JSON으로 변환하여 RabbitMQ에 보내거나, RabbitMQ로부터 받은 JSON 메시지를 객체로 변환할 수 있습니다.
- Exchange와 Queue의 설정
- RabbitMQ에서 사용하는 TopicExchange와 Queue를 Bean으로 정의합니다.
- @Value 어노테이션을 사용해 application.properties파일에서 값을 주입받습니다.
- Binding 설정
- Exchange와 Queue를 바인딩합니다. 바인딩은 메시지가 특정 Exchange에 도착했을 때, 어떤 Queue로 라우팅될지를 정의합니다.
OrderEndpoint
OrderEndpoint 클래스에서는 주문을 생성하고 조회하는 API와 에러 메시지를 수신하여 주문을 롤백하는 기능을 포함하고 있습니다.
- RabbitMQ 메시지 리스너
- errOrder 메서드는 RabbitMQ의 에러 큐에서 메시지를 수신하여 주문을 롤백합니다.
- REST API 엔드포인트
- getOrder와 order 메서드는 각각 주문 조회와 생성 기능을 제공합니다.
OrderService
OrderService 클래스는 주문 생성, 조회, 롤백 기능을 제공하며 RabbitMQ를 사용하여 메시지를 전송하는 역할을 수행합니다.
createOrder(OrderRequestDto orderRequestDto)
- OrderRequestDto로부터 Order 객체를 생성합니다.
- 생성된 주문을 orderStore(현재는 Map 추후 DB로 변경하여 영속성 관리 필요!) 에 저장합니다.
- DeliveryMessage 객체를 생성하여, productQueue 큐로 RabbitMQ 메시지를 전송합니다.
- 최종적으로 생성된 주문 객체를 반환합니다.
getOrder(UUID orderId)
- orderId에 해당하는 주문을 orderStore에서 조회하고 반환합니다.
rollbackOrder(DeliveryMessage deliveryMessage)
- 전달된 DeliveryMessage에 포함된 주문 ID를 사용해 해당 주문을 조회합니다.
- 조회된 주문의 상태를 CANCELED로 변경하고, 에러 타입을 기록합니다.
공통 코드 : DeliveryMessage(DTO)
DeliveryMessage 클래스는 주문 관련 데이터를 RabbitMQ를 통해 다른 마이크로서비스로 전송하기 위해 사용하는 데이터 전송 객체(DTO)입니다.
앞서 언급했듯이, 현재는 데이터를 메모리 내의 Map으로 관리하고 있으므로, 추후에 데이터베이스를 추가하여 영속성을 관리하는 것이 필요합니다.
ProductApplication
dependencies
dependencies {
// Jackson 의존성
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
메시지 전송과 전달을 JSON 형식으로 처리하기 위해 Jackson 라이브러리를 사용했습니다.
application.properties
ProductApplicationQueueConfig
ProductApplicationQueueConfig 클래스는 RabbitMQ와 함께 사용하는 메시지 컨버터를 설정하는 Spring Configuration 클래스입니다. 메시지 컨버터는 객체와 JSON 간의 변환을 처리하는 데 사용됩니다.
ProductEndpoint
ProductEndpoint 클래스는 RabbitMQ에서 수신한 메시지를 처리하는 컴포넌트입니다.
이 클래스는 ProductService와 통합되어 메시지를 수신하고 적절한 처리를 수행합니다. 두 개의 @RabbitListener 메서드를 통해 메시지를 처리하며, 각각의 큐에서 메시지를 수신합니다.
메시지 수신
- @RabbitListener(queues = "${message.queue.product}")
- message.queue.product 큐에서 메시지를 수신합니다.
- receiveMessage() 메서드
- 수신된 DeliveryMessage 객체를 로그로 출력하고, productService.reduceProductAmount(deliveryMessage)를 호출하여 상품 수량을 감소시킵니다.
- @RabbitListener(queues = "${message.queue.err.product}"): message.queue.err.product 큐에서 에러 메시지를 수신합니다.
- receiveErrorMessage() 메서드
- 수신된 에러 메시지를 로그로 출력하고, productService.rollbackProduct(deliveryMessage)를 호출하여 상품 롤백 처리를 수행합니다.
ProductService
ProductService 클래스는 상품의 수량을 조정하고, 문제가 발생한 경우 롤백 처리를 하는 서비스를 담당합니다. 이 클래스는 RabbitTemplate을 사용하여 RabbitMQ 큐에 메시지를 전송합니다.
메시지 전송
- reduceProductAmount(DeliveryMessage deliveryMessage)
상품 ID가 1이 아니거나 수량이 1보다 큰 경우 롤백 처리 (rollbackProduct 호출).- 그렇지 않으면 정상적으로 결제 큐(paymentQueue)에 메시지를 전송합니다.
- rabbitTemplate.convertAndSend(paymentQueue, deliveryMessage)는 deliveryMessage 객체(Dto)를 paymentQueue 큐로 전송합니다.
- rollbackProduct(DeliveryMessage deliveryMessage)
- 에러 타입이 비어있는 경우, 기본 에러 메시지를 설정합니다. 이는
결제 처리 과정에서 발생한 에러가 아니라 Product 애플리케이션 내의 에러를 나타내므로, 이후에 롤백 큐(errOrderQueue)로 해당 메시지를 전송합니다. - rabbitTemplate.convertAndSend(errOrderQueue, deliveryMessage)는 deliveryMessage 객체를 errOrderQueue 큐로 전송합니다.
- 에러 타입이 비어있는 경우, 기본 에러 메시지를 설정합니다. 이는
- 로깅
- log.info("PRODUCT ROLLBACK!!!"): 롤백 처리 시 로그를 남깁니다.
PaymentApplication
dependencies
dependencies {
// Jackson 의존성
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
Payment에서도 마찬가지로 메시지 전송과 전달을 JSON 형식으로 처리하기 위해 Jackson 라이브러리를 사용했습니다.
application.properties
Payment(Entity)
Payment 클래스는 결제 정보를 나타내는 도메인 객체입니다. 현재는 단순한 POJO (Plain Old Java Object)로 정의되어 있으며, 후에 JPA 엔티티로 변환할 예정입니다.
PaymentApplicationQueueConfig
마찬가지로 PaymentApplicationQueueConfig 클래스도 Spring 애플리케이션에서 메시지 전송 및 수신을 JSON 형식으로 처리하기 위한 설정을 포함합니다.
이 설정은 RabbitMQ와의 메시지 전송 및 수신 과정에서 JSON 메시지 컨버터를 사용하도록 구성되어 있습니다.
PaymentEndpoint
PaymentEndpoint 클래스는 RabbitMQ에서 전송된 결제 관련 메시지를 수신하여 처리하는 역할을 합니다. 이 클래스는 PaymentService를 이용하여 수신한 메시지를 기반으로 결제 작업을 수행합니다.
PaymentService
PaymentService 클래스는 결제 처리 및 오류 발생 시 롤백을 담당합니다. 이 클래스는 결제 메시지를 수신하여 Payment 객체를 생성하고, 특정 조건(10000원 이상)에 따라 에러 큐로 메시지를 전송합니다.
- creatPayment 메서드
- DeliveryMessage 객체를 받아서 Payment 객체를 생성하고, 결제 금액이 10,000 이상일 경우 에러 큐로 메시지를 전송합니다. (오타: creatPayment → createPayment)
- rollbackPayment 메서드
- 에러 메시지를 지정된 에러 큐로 전송합니다.
동작 확인
주문 요청
먼저, http://localhost:8080/order 엔드포인트를 통해 OrderRequestDto 객체를 포함한 주문 요청을 해보겠습니다.
주문 조회
GET /order/{orderId} 요청을 보내면, 생성한 주문에 대한 동일한 데이터를 반환받을 수 있습니다. 이 엔드포인트를 통해 주문 ID를 기반으로 주문 정보를 조회할 수 있습니다.
주문 Queue 확인
RabbitMQ의 관리자 페이지에 접속하여 확인하면, market.product 큐에 메시지가 적재된 것을 볼 수 있습니다.
현재 컨슈머가 설정되어 있지 않기 때문에, Order 서비스에서 메시지가 발행되고 market.product 큐에 쌓여 있는 상태입니다. 이는 메시지가 큐에 저장되고, 후속 처리나 컨슈머가 메시지를 소비할 준비가 되어야 함을 의미합니다.
주문 Queue 데이터 확인
market.product 큐에 들어가서 'Get Message' 기능을 사용해 메시지를 확인해보면, 메시지가 Jackson 라이브러리를 통해 직렬화되어 JSON 형식으로 저장된 것을 볼 수 있습니다.
이 JSON 메시지는 MessageDelivery를 통해 생성된 Delivery 객체의 내용을 포함하고 있습니다.
Product Consumer 실행
product 컨슈머를 실행하면, 이전에 market.product 큐에 적재된 메시지를 즉시 받아서 처리하게 됩니다. 이로 인해 로그에 메시지 처리 결과가 기록됩니다.
즉, 이 로그는 product 컨슈머가 큐에 저장된 메시지를 성공적으로 받아서 처리한 것을 의미합니다.
Product -> Payment로 메시지 전달
Order에 요청을 하면 Product Application이 받아서 다시 payment 큐에 메시지를 전달하는 것을 확인해보겠습니다.
먼저 order->product로 요청을 보내줍니다.
receiveMessage() 메서드를 통해 deliveryMessage를 market.payment 큐로 전달합니다.
Product 애플리케이션에서 위와 같이 로그가 기록된 것을 보면, receiveMessage() 메서드가 정상적으로 호출되어 deliveryMessage의 내용이 로그로 찍혔음을 확인할 수 있습니다.
RabbitMQ 관리자 페이지로 가서 메시지를 확인해보면, Payment 컨슈머가 아직 설정되지 않았기 때문에 market.payment 큐에 메시지가 적재된 상태를 볼 수 있습니다. 이는 메시지가 order에서 product를 거쳐 payment 큐로 정상적으로 전달되었음을 의미합니다.
Payment Consumer(성공)
paymentApp을 실행하면, payment 큐에 적재되어 있던 메시지가 즉시 처리됩니다.
실행 후 로그에서 'PAYMENT MESSAGE'라는 메시지가 기록된 것을 확인할 수 있습니다. 이는 paymentApp이 payment 큐의 메시지를 성공적으로 처리했음을 의미합니다.
들어온 데이터는 먼저 로그에 기록됩니다. 이후, createPayment() 메서드로 넘어가서 Payment 객체가 생성되고, 결제 금액이 확인됩니다. 이 과정이 정상적으로 수행되면 전체 패턴이 올바르게 작동하는 것입니다.
Payment Consumer(실패)
Payment App이 프로듀서 역할을 하며, 에러가 발생할 경우 RabbitTemplate를 통해 에러 큐(market.err.product)로 메시지를 전송합니다.
만약 결제 금액이 10,000원을 초과하면, 에러가 발생하여 메시지가 market.err.product 큐에 적재됩니다.
이제, Order 애플리케이션에서 10,000원이 넘는 결제 금액을 요청해 보겠습니다. 이 요청이 에러를 발생시키고 market.err.product 큐로 메시지가 전송되는 과정을 확인할 수 있습니다.
결제 금액이 10,001로 요청되면 에러가 발생하여 rollbackPayment() 메서드가 실행됩니다. 이 과정에서 log.error를 통해 에러 로그가 기록된 것을 확인할 수 있습니다.
RabbitMQ 관리자 페이지에서 큐를 확인하면, market.err.product 큐에 메시지가 하나 적재된 것을 볼 수 있습니다. 이는 결제 금액이 10,000원을 초과하여 에러가 발생했으며, 발생한 에러 메시지가 market.err.product 큐에 적재되었음을 의미합니다.
결제 에러 확인
위 아키텍처를 보면, Product 애플리케이션이 market.err.product 큐를 리스닝하여 에러 메시지를 수신하고, 이를 다시 market.err.order 큐로 전달하는 과정을 확인해 보겠습니다.
Product 애플리케이션은 에러 이벤트를 리스닝하기 위해 @RabbitListener 어노테이션을 사용하여 market.err.product 큐를 바라봅니다.
이전에 결제 금액이 10,000원을 초과하여 에러가 발생했고, 이로 인해 market.err.product 큐에 에러 메시지가 적재되었습니다.
ProductApp이 실행되면, receiveErrorMessage() 메서드가 market.err.product 큐를 모니터링하고 있기 때문에 'ERROR RECEIVE !!!' 로그가 기록됩니다.
이어서 rollbackProduct() 메서드가 호출되어 'PRODUCT ROLLBACK !!!' 로그가 찍히는 것을 확인할 수 있습니다.
RabbitMQ 관리자 페이지에서 ProductApp이 실행되면, ProductEndpoint의 receiveErrorMessage() 메서드가 market.err.product 큐를 리스닝하여 에러 메시지를 수신하고, 이를 다시 market.err.order 큐로 전달된 것을 확인할 수 있습니다.
또한, 'Get Message' 기능을 사용하여 큐의 메시지를 확인하면, 동일한 orderID(6f57bce4~~)의 데이터가 적재되어 있는 것을 볼 수 있습니다.
상품 에러 확인
Product 애플리케이션에서 productId가 1이 아니거나 productQuantity가 1 이상인 경우, 이는 Product 애플리케이션 자체에서 에러가 발생했음을 의미합니다.
이 경우, Payment 애플리케이션에 전달할 필요 없이 바로 market.err.order 큐에 에러 메시지를 전송합니다.
즉, Payment 애플리케이션으로는 가지 않고, Order 애플리케이션에서 직접 Product 애플리케이션으로 온 객체입니다.
deliveryMessage에서 getErrorType()으로 에러 메시지를 가져오고 hasText() 메서드를 통해 체크했을 때, 에러 메시지가 존재하지 않으면 이는 Order 애플리케이션에서 온 순수한 에러 메시지라는 의미입니다.
즉, Product Error입니다.
이 과정은 아키텍처의 빨간색 부분에 해당합니다.
정리를 해보면 다음과 같습니다.
- Product 애플리케이션에서 에러가 발생한 경우
- 조건
- productId != 1 또는 productQuantity > 1
- 처리
- 에러 메시지를 market.err.order 큐로 직접 전송합니다.
Payment 애플리케이션에서 에러가 발생한 경우
- 조건
- payment.getPayAmount() >= 10,000
- 처리
- market.err.order 큐에 "PAYMENT_LIMIT_EXCEEDS" 에러 메시지가 적재됩니다.
RabbitMQ 관리자 페이지를 확인하면, market.err.order 큐에 두 개의 에러 메시지가 적재된 것을 볼 수 있습니다."
Get Message’ 기능을 사용하여 큐의 메시지를 확인해보면, 다음과 같은 두 가지 에러 메시지를 확인할 수 있습니다:
- 첫 번째 메시지는 Payment 애플리케이션에서 발생한 에러입니다.
- 두 번째 메시지는 Product 애플리케이션에서 발생한 에러입니다."
상품 rollback
마지막으로, market.err.order 큐의 적재된 에러 메시지를 처리하는 로직을 구성해보겠습니다.
Order 애플리케이션에서 market.err.order 큐를 리스닝하여 에러를 처리하는 방식을 설정합니다.
OrderEndpoint 설정
- OrderEndpoint에서 @RabbitListener를 사용하여 market.err.order 큐를 리스닝합니다.
- orderService의 rollbackOrder() 메서드를 호출하여 에러 메시지를 처리합니다.
rollbackOrder() 메서드
- 현재 Order는 메모리의 Map (orderStore)에 저장되고 있으므로, rollbackOrder() 메서드에서 해당 Order를 가져옵니다.
- cancleOrder() 메서드를 호출하여 에러 타입을 기록합니다.
메모리에 있던 orderStore가 초기화 되었기 때문에 에러 메세지를 받아서 제대로 처리가 안될 것이기 때문에 Purge를 해줍니다.
전체 로직 동작 확인
정상 요청
정상적인 요청을 처리하면 errorType 필드에 null 값이 들어가며, 이는 요청이 정상적으로 처리되었음을 확인할 수 있습니다
Product 에러 발생(수량 에러)
Product 애플리케이션에서 에러가 발생했기 때문에 errorType 필드에 "PRODUCT ERROR"가 설정됩니다.
보상 트랜잭션으로 인해, orderStatus는 OrderStore에 저장된 Order 객체에서 cancleOrder() 메서드를 호출하여 "CANCELED"로 변경된 것을 확인할 수 있습니다.
Payment 에러 발생(금액 에러)
위와 같이 errorType 필드에는 "PAYMENT_LIMIT_EXCEEDS"가 설정되며, 이는 지불 금액이 설정된 한도를 초과했기 때문에 발생한 에러입니다.
이 에러로 인해 OrderStore에 저장된 Order 객체는 cancleOrder() 메서드를 호출하여 orderStatus가 "CANCELED"로 변경됩니다. 이는 보상 트랜잭션을 통해 주문을 취소하고 상태를 정리하는 과정입니다."
'Architecture > MSA' 카테고리의 다른 글
[MSA] 대규모 시스템에서 이벤트 소싱과 CQRS로 일관성 확보하기 (0) | 2024.08.15 |
---|---|
[MSA] 분산 시스템에서 2단계 커밋(2PC)과 SAGA 패턴을 통한 데이터 일관성 유지 (0) | 2024.08.14 |
[MSA] MSA란 무엇일까❓ (1) | 2024.07.31 |