고급 패턴 — Event Sourcing, CQRS, Saga, Outbox, DLQ
Event Sourcing
개념
상태를 저장하는 대신 상태 변경 이벤트를 순서대로 저장.
전통적 방식:
orders 테이블
id=1, status=SHIPPED, amount=50000 ← 현재 상태만 저장
Event Sourcing:
order-events 토픽 (또는 이벤트 스토어)
key=order-1 → OrderCreated {amount: 50000}
key=order-1 → OrderPaid {paymentId: "P123"}
key=order-1 → OrderShipped {trackingNo: "T456"}
현재 상태 = 이벤트 순차 재생으로 복원
Kafka로 구현
// 이벤트 발행 (Log Compaction 토픽)
sealed class OrderEvent {
data class OrderCreated(
val orderId: String,
val userId: String,
val amount: Long,
val items: List<OrderItem>,
val timestamp: Instant,
) : OrderEvent()
data class OrderPaid(
val orderId: String,
val paymentId: String,
val timestamp: Instant,
) : OrderEvent()
data class OrderShipped(
val orderId: String,
val trackingNumber: String,
val timestamp: Instant,
) : OrderEvent()
data class OrderCancelled(
val orderId: String,
val reason: String,
val timestamp: Instant,
) : OrderEvent()
}
// 상태 복원 (Projection)
fun replayOrderState(events: List<OrderEvent>): OrderState {
return events.fold(OrderState.Empty) { state, event ->
when (event) {
is OrderEvent.OrderCreated -> state.apply(event)
is OrderEvent.OrderPaid -> state.apply(event)
is OrderEvent.OrderShipped -> state.apply(event)
is OrderEvent.OrderCancelled -> state.apply(event)
}
}
}이벤트 소싱 토픽 설정
# Log Compaction으로 설정 (최신 상태 보존)
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic order-events \
--partitions 12 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config retention.ms=-1 # 영구 보존CQRS (Command Query Responsibility Segregation)
개념
**쓰기(Command)**와 읽기(Query) 모델을 분리.
Command Side: Query Side:
OrderService → Kafka → OrderProjectionService
(이벤트 발행) (다양한 읽기 모델 생성)
EventStore Elasticsearch (검색)
Redis (캐시)
PostgreSQL (통계)
Kafka Streams로 Projection 생성
val builder = StreamsBuilder()
// 주문 이벤트 스트림에서 다양한 읽기 모델 생성
// 1. 사용자별 주문 집계 (KTable)
val ordersByUser: KTable<String, UserOrderSummary> = builder
.stream<String, OrderEvent>("order-events")
.filter { _, event -> event is OrderEvent.OrderCreated }
.groupBy { _, event -> (event as OrderEvent.OrderCreated).userId }
.aggregate(
{ UserOrderSummary() },
{ userId, event, summary ->
summary.addOrder((event as OrderEvent.OrderCreated))
},
Materialized.`as`("user-order-summaries")
)
// 2. 일별 매출 (Windowed)
val dailySales: KTable<Windowed<String>, Long> = builder
.stream<String, OrderEvent>("order-events")
.filter { _, event -> event is OrderEvent.OrderPaid }
.mapValues { event -> (event as OrderEvent.OrderPaid) }
.groupBy { _, _ -> "global" }
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1)))
.aggregate(
{ 0L },
{ _, event, total -> total + lookupOrderAmount(event.orderId) },
Materialized.`as`("daily-sales")
)Saga 패턴
문제: 분산 트랜잭션
주문 생성 시나리오:
1. Order Service: 주문 생성
2. Payment Service: 결제 처리
3. Inventory Service: 재고 차감
4. Shipping Service: 배송 준비
→ 2번 성공, 3번 실패 시?
→ 전통적 2PC는 Kafka에서 비실용적
→ Saga 패턴으로 해결
Choreography Saga (안무 방식)
각 서비스가 이벤트를 듣고 다음 단계를 트리거. 중앙 조정자 없음.
Order Service
→ order.created 발행
Payment Service (order.created 구독)
→ 결제 성공: payment.completed 발행
→ 결제 실패: payment.failed 발행
Inventory Service (payment.completed 구독)
→ 재고 차감 성공: inventory.reserved 발행
→ 재고 부족: inventory.failed 발행
Order Service (inventory.failed 구독)
→ 주문 취소 처리, order.cancelled 발행
Payment Service (order.cancelled 구독)
→ 결제 환불 (보상 트랜잭션)
// Order Service
@EventHandler("payment.failed", "inventory.failed")
fun handleFailure(event: FailureEvent) {
orderService.cancel(event.orderId, event.reason)
producer.send(ProducerRecord("order.cancelled", event.orderId,
OrderCancelled(event.orderId, event.reason)))
}Orchestration Saga (오케스트레이션 방식)
중앙 Saga Orchestrator가 각 서비스에 직접 명령.
class OrderSagaOrchestrator(
private val producer: KafkaProducer<String, String>,
) {
fun execute(orderId: String) {
val saga = OrderSagaState(orderId)
// 1단계: 결제 요청
producer.send(ProducerRecord(
"payment.commands",
orderId,
ProcessPaymentCommand(orderId).toJson()
))
saga.currentStep = SagaStep.PAYMENT_PROCESSING
}
@EventHandler("payment.result")
fun onPaymentResult(event: PaymentResult) {
if (event.success) {
// 2단계: 재고 차감
producer.send(ProducerRecord(
"inventory.commands",
event.orderId,
ReserveInventoryCommand(event.orderId).toJson()
))
} else {
// 보상: 주문 취소
compensate(event.orderId)
}
}
private fun compensate(orderId: String) {
producer.send(ProducerRecord(
"order.commands", orderId,
CancelOrderCommand(orderId).toJson()
))
}
}Transactional Outbox 패턴
문제: 이중 쓰기
// 위험한 코드
db.save(order) // DB 저장 성공
kafka.send(orderEvent) // Kafka 전송 실패 → 불일치!
또는:
kafka.send(orderEvent) // Kafka 전송 성공
db.save(order) // DB 저장 실패 → 이벤트만 발행됨!
해결: Outbox 테이블
1. DB 트랜잭션 안에서:
- orders 테이블에 주문 저장
- outbox 테이블에 이벤트 저장
두 작업이 하나의 DB 트랜잭션 → 원자적
2. 별도 프로세스 (Outbox Publisher 또는 Debezium):
outbox 테이블의 미발행 이벤트 → Kafka 발행
발행 성공 → outbox 레코드 삭제/마킹
// outbox 테이블
data class OutboxEvent(
val id: UUID = UUID.randomUUID(),
val aggregateType: String, // "Order"
val aggregateId: String, // orderId
val eventType: String, // "OrderCreated"
val payload: String, // JSON
val createdAt: Instant = Instant.now(),
val publishedAt: Instant? = null,
)
// 서비스 코드
@Transactional
fun createOrder(request: CreateOrderRequest): Order {
val order = orderRepository.save(Order.from(request))
outboxRepository.save(OutboxEvent(
aggregateType = "Order",
aggregateId = order.id,
eventType = "OrderCreated",
payload = OrderCreated(order.id, order.userId, order.amount).toJson(),
))
return order
// 트랜잭션 커밋 시 orders + outbox 동시에 저장
}Debezium으로 Outbox 발행
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events",
"transforms.outbox.table.fields.additional.placement": "type:header:eventType"
}
}Dead Letter Queue (DLQ)
처리 실패 메시지를 별도 토픽으로 보내 나중에 재처리.
DLQ 패턴
orders 토픽 → Consumer → 처리 성공 → 커밋
→ 처리 실패 → orders.DLQ 토픽으로 이동
orders.DLQ → DLQ Consumer → 원인 분석 → 수동 재처리 또는 폐기
구현
class OrderConsumerWithDLQ(
private val consumer: KafkaConsumer<String, String>,
private val dlqProducer: KafkaProducer<String, String>,
) {
private val maxRetries = 3
private val retryCount = mutableMapOf<TopicPartition, MutableMap<Long, Int>>()
fun run() {
consumer.subscribe(listOf("orders"))
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
for (record in records) {
val tp = TopicPartition(record.topic(), record.partition())
val retries = retryCount.getOrPut(tp) { mutableMapOf() }
.getOrDefault(record.offset(), 0)
try {
processOrder(record.value())
retries.remove(record.offset())
} catch (e: Exception) {
if (retries < maxRetries) {
retries[record.offset()] = retries + 1
logger.warn("재시도 ${retries + 1}/$maxRetries: ${record.offset()}", e)
// 이 레코드부터 다시 읽기
consumer.seek(tp, record.offset())
break
} else {
// DLQ로 이동
sendToDLQ(record, e)
retries.remove(record.offset())
}
}
}
consumer.commitSync()
}
}
private fun sendToDLQ(record: ConsumerRecord<String, String>, error: Exception) {
val dlqRecord = ProducerRecord(
"orders.DLQ",
record.key(),
record.value(),
).apply {
headers().add("original-topic", record.topic().toByteArray())
headers().add("original-partition", record.partition().toString().toByteArray())
headers().add("original-offset", record.offset().toString().toByteArray())
headers().add("error-message", (error.message ?: "unknown").toByteArray())
headers().add("error-class", error::class.java.name.toByteArray())
headers().add("failed-at", Instant.now().toString().toByteArray())
}
dlqProducer.send(dlqRecord)
logger.error("DLQ로 이동: topic=${record.topic()}, offset=${record.offset()}", error)
}
}DLQ 재처리
# DLQ 메시지 확인
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders.DLQ \
--from-beginning \
--property print.headers=true
# DLQ → 원본 토픽으로 재전송 (수정 후)
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders.DLQ \
--from-beginning | \
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic orders정리
- Event Sourcing: 상태 대신 이벤트 저장 → 이벤트 재생으로 상태 복원, Log Compaction 토픽 활용
- CQRS: Kafka Streams로 이벤트 스트림을 다양한 읽기 모델로 변환
- Saga: 분산 트랜잭션 대안 — Choreography(이벤트 기반), Orchestration(중앙 조정)
- Outbox: DB 트랜잭션과 Kafka 발행의 원자성 보장 — Debezium CDC 활용 권장
- DLQ: 처리 실패 메시지를 분리하여 나중에 재처리 — 헤더에 원본 정보 포함