@Service@RequiredArgsConstructorpublic class KafkaOrderProducer { private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate; public void sendOrderCreated(OrderCreatedEvent event) { kafkaTemplate.send("order-created", event.orderId().toString(), event) .thenAccept(result -> log.info("Sent order {} to partition {}", event.orderId(), result.getRecordMetadata().partition())) .exceptionally(ex -> { log.error("Failed to send order {}", event.orderId(), ex); return null; }); } // 트랜잭션 발행 (Exactly-once) @Transactional("kafkaTransactionManager") public void sendWithTransaction(List<OrderCreatedEvent> events) { events.forEach(event -> kafkaTemplate.send("order-created", event.orderId().toString(), event) ); }}
메시지 소비
@Component@RequiredArgsConstructor@Slf4jpublic class KafkaOrderConsumer { @KafkaListener( topics = "order-created", groupId = "order-processor", concurrency = "3" // 파티션 수만큼 병렬 소비 ) public void handleOrderCreated( @Payload OrderCreatedEvent event, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.OFFSET) long offset, Acknowledgment acknowledgment ) { log.info("Processing order {} from partition {} offset {}", event.orderId(), partition, offset); try { orderProcessingService.process(event); acknowledgment.acknowledge(); // 수동 커밋 } catch (Exception e) { log.error("Failed to process order {}", event.orderId(), e); // 재처리 로직 또는 DLT로 전송 } } // 배치 소비 @KafkaListener(topics = "order-analytics", containerFactory = "batchListenerContainerFactory") public void handleBatch(List<OrderCreatedEvent> events, Acknowledgment acknowledgment) { log.info("Batch processing {} events", events.size()); analyticsService.processBatch(events); acknowledgment.acknowledge(); }}
에러 처리 & DLT
@Configurationpublic class KafkaErrorConfig { @Bean public DefaultErrorHandler errorHandler(KafkaTemplate<?, ?> template) { // 실패한 메시지를 Dead Letter Topic으로 전송 DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, (record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()) ); // 지수 백오프: 1초, 2초, 4초 재시도 후 DLT로 ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0); backOff.setMaxAttempts(3); DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff); handler.addNotRetryableExceptions(BusinessException.class); // 즉시 DLT return handler; }}