개요
이 글에서는 실무에서 자주 접하는 세 가지 배치 시나리오를 전체 코드와 함께 설명한다. 주문 정산 배치, 대용량 DB 마이그레이션, FTP 파일 가져오기 배치를 통해 Spring Batch의 실전 활용 패턴을 익힌다.
예제 1: 주문 정산 배치 (전체 코드)
시나리오
매일 새벽 2시, 전날 완료된 주문을 집계해 정산 테이블에 저장한다. 정산 실패한 주문은 별도 오류 테이블에 기록하고, 정산 완료 후 담당자에게 Slack 알림을 발송한다.
도메인 모델
@Data
@Builder
public class Order {
private Long id;
private Long userId;
private BigDecimal amount;
private String orderType; // NORMAL, SUBSCRIPTION, B2B
private String status; // COMPLETED, CANCELLED
private boolean settled;
private LocalDateTime createdAt;
}
@Data
@Builder
public class Settlement {
private Long orderId;
private Long userId;
private BigDecimal grossAmount;
private BigDecimal fee;
private BigDecimal netAmount;
private LocalDate targetDate;
private LocalDateTime settledAt;
}Job 전체 구성
@Configuration
@RequiredArgsConstructor
@Slf4j
public class SettlementJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
/**
* 3단계 Job 구성:
* Step 1: 주문 집계 및 정산 테이블 저장
* Step 2: 정산 완료 상태 업데이트
* Step 3: 정산 결과 Slack 알림
*/
@Bean
public Job settlementJob(Step aggregationStep,
Step statusUpdateStep,
Step notificationStep,
SettlementJobListener jobListener) {
return new JobBuilder("settlementJob", jobRepository)
.start(aggregationStep)
.next(statusUpdateStep)
.next(notificationStep)
.listener(jobListener)
.build();
}
// ===== Step 1: 정산 집계 =====
@Bean
public Step aggregationStep(
SettlementItemReader reader,
SettlementItemProcessor processor,
SettlementItemWriter writer,
SettlementSkipListener skipListener) {
return new StepBuilder("aggregationStep", jobRepository)
.<Order, Settlement>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipLimit(1000) // 최대 1,000건까지 Skip 허용
.skip(InvalidOrderException.class) // 유효하지 않은 주문 Skip
.skip(SettlementCalculationException.class) // 계산 오류 Skip
.noSkip(DataAccessException.class) // DB 오류는 Skip 금지
.listener(skipListener)
.build();
}
// ===== Step 2: 정산 완료 상태 업데이트 =====
@Bean
public Step statusUpdateStep(JdbcTemplate jdbcTemplate) {
return new StepBuilder("statusUpdateStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
String targetDate = (String) chunkContext.getStepContext()
.getJobParameters().get("targetDate");
int updatedCount = jdbcTemplate.update("""
UPDATE orders
SET settled = true, settled_at = NOW()
WHERE DATE(created_at) = ?
AND status = 'COMPLETED'
AND settled = false
""", targetDate);
log.info("정산 완료 상태 업데이트 - targetDate={}, count={}", targetDate, updatedCount);
// 다음 Step에 전달할 통계 저장
chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putInt("updatedOrderCount", updatedCount);
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
// ===== Step 3: Slack 알림 =====
@Bean
public Step notificationStep(SlackNotificationService slackService) {
return new StepBuilder("notificationStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
ExecutionContext jobContext = chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext();
String targetDate = (String) chunkContext.getStepContext()
.getJobParameters().get("targetDate");
int settledCount = jobContext.getInt("updatedOrderCount", 0);
long skipCount = chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getStepExecutions()
.stream()
.filter(s -> s.getStepName().equals("aggregationStep"))
.mapToLong(s -> s.getSkipCount())
.sum();
slackService.sendMessage("""
[정산 배치 완료] 📊
- 대상 날짜: %s
- 정산 건수: %,d건
- Skip 건수: %,d건
""".formatted(targetDate, settledCount, skipCount));
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
}Reader: @StepScope와 날짜 기반 쿼리
@Component
@StepScope
@RequiredArgsConstructor
@Slf4j
public class SettlementItemReader implements ItemStreamReader<Order> {
private final DataSource dataSource;
@Value("#{jobParameters['targetDate']}")
private String targetDate; // 5.x: LocalDate 타입으로도 사용 가능
private JdbcCursorItemReader<Order> delegate;
@PostConstruct
public void init() {
log.info("SettlementItemReader 초기화 - targetDate={}", targetDate);
this.delegate = new JdbcCursorItemReaderBuilder<Order>()
.name("settlementOrderReader")
.dataSource(dataSource)
.sql("""
SELECT id, user_id, amount, order_type, status, created_at
FROM orders
WHERE DATE(created_at) = ?
AND status = 'COMPLETED'
AND settled = false
ORDER BY id
""")
.preparedStatementSetter(ps -> ps.setString(1, targetDate))
.rowMapper(new BeanPropertyRowMapper<>(Order.class))
.fetchSize(500)
.build();
}
@Override
public Order read() throws Exception { return delegate.read(); }
@Override
public void open(ExecutionContext ctx) throws ItemStreamException { delegate.open(ctx); }
@Override
public void update(ExecutionContext ctx) throws ItemStreamException { delegate.update(ctx); }
@Override
public void close() throws ItemStreamException { delegate.close(); }
}Processor: 정산 금액 계산
@Component
@RequiredArgsConstructor
public class SettlementItemProcessor implements ItemProcessor<Order, Settlement> {
private final FeeCalculationService feeService;
@Value("#{jobParameters['targetDate']}")
private String targetDate;
@Override
public Settlement process(Order order) throws Exception {
// 검증
if (order.getAmount() == null || order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
throw new InvalidOrderException(
"유효하지 않은 주문 금액: orderId=" + order.getId());
}
// 수수료 계산 (주문 타입별 차등 적용)
BigDecimal fee = feeService.calculateFee(order.getAmount(), order.getOrderType());
BigDecimal netAmount = order.getAmount().subtract(fee);
return Settlement.builder()
.orderId(order.getId())
.userId(order.getUserId())
.grossAmount(order.getAmount())
.fee(fee)
.netAmount(netAmount)
.targetDate(LocalDate.parse(targetDate))
.settledAt(LocalDateTime.now())
.build();
}
}Writer: UPSERT로 멱등성 보장
@Component
@RequiredArgsConstructor
public class SettlementItemWriter implements ItemWriter<Settlement> {
private final JdbcTemplate jdbcTemplate;
private final NamedParameterJdbcTemplate namedJdbc;
@Override
public void write(Chunk<? extends Settlement> chunk) throws Exception {
List<? extends Settlement> items = chunk.getItems();
// MySQL UPSERT: 같은 order_id로 재실행해도 안전
namedJdbc.batchUpdate("""
INSERT INTO settlements (order_id, user_id, gross_amount, fee, net_amount,
target_date, settled_at)
VALUES (:orderId, :userId, :grossAmount, :fee, :netAmount,
:targetDate, :settledAt)
ON DUPLICATE KEY UPDATE
gross_amount = VALUES(gross_amount),
fee = VALUES(fee),
net_amount = VALUES(net_amount),
settled_at = VALUES(settled_at)
""",
items.stream()
.map(BeanPropertySqlParameterSource::new)
.toArray(SqlParameterSource[]::new)
);
log.debug("정산 저장 완료 - count={}", items.size());
}
}SkipListener: 정산 실패 주문 기록
@Component
@RequiredArgsConstructor
@Slf4j
public class SettlementSkipListener implements SkipListener<Order, Settlement> {
private final JdbcTemplate jdbcTemplate;
@Override
public void onSkipInProcess(Order item, Throwable t) {
log.warn("정산 Skip - orderId={}, reason={}", item.getId(), t.getMessage());
// 실패 이력 기록
jdbcTemplate.update("""
INSERT INTO settlement_errors (order_id, error_message, skipped_at)
VALUES (?, ?, NOW())
ON DUPLICATE KEY UPDATE
error_message = VALUES(error_message),
skipped_at = VALUES(skipped_at)
""",
item.getId(), t.getMessage()
);
}
@Override
public void onSkipInWrite(Settlement item, Throwable t) {
log.error("정산 쓰기 Skip - orderId={}, error={}", item.getOrderId(), t.getMessage());
jdbcTemplate.update("""
INSERT INTO settlement_errors (order_id, error_message, skipped_at)
VALUES (?, ?, NOW())
""", item.getOrderId(), t.getMessage());
}
}예제 2: 대용량 사용자 데이터 마이그레이션 배치
시나리오
레거시 DB의 legacy_users 테이블(1,000만 건)을 신규 DB의 users 테이블로 마이그레이션한다. ID 범위로 10개 파티션을 나눠 병렬 처리한다.
ColumnRangePartitioner 구현
@Component
@RequiredArgsConstructor
public class ColumnRangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
private final String tableName;
private final String columnName;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// 전체 ID 범위 조회
Map<String, Object> minMax = jdbcTemplate.queryForMap(
"SELECT MIN(%s) AS min_id, MAX(%s) AS max_id FROM %s"
.formatted(columnName, columnName, tableName)
);
long minId = ((Number) minMax.get("min_id")).longValue();
long maxId = ((Number) minMax.get("max_id")).longValue();
long rangeSize = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
for (int i = 0; i < gridSize; i++) {
long partitionMin = minId + (rangeSize * i);
long partitionMax = Math.min(partitionMin + rangeSize - 1, maxId);
ExecutionContext context = new ExecutionContext();
context.putLong("minId", partitionMin);
context.putLong("maxId", partitionMax);
context.putInt("partitionIndex", i);
partitions.put("partition-" + i, context);
log.info("파티션 생성 - index={}, min={}, max={}", i, partitionMin, partitionMax);
}
return partitions;
}
}마이그레이션 Job 전체 구성
@Configuration
@RequiredArgsConstructor
@Slf4j
public class UserMigrationJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final ColumnRangePartitioner partitioner;
@Bean
public Job userMigrationJob(Step partitionedMigrationStep) {
return new JobBuilder("userMigrationJob", jobRepository)
.start(partitionedMigrationStep)
.listener(migrationJobListener())
.build();
}
/**
* Manager Step: 파티션을 나눠 Worker Step들에게 위임
*/
@Bean
public Step partitionedMigrationStep(Step workerMigrationStep) {
// 파티셔너에 테이블/컬럼 정보 주입
partitioner.setTableName("legacy_users");
partitioner.setColumnName("id");
return new StepBuilder("partitionedMigrationStep", jobRepository)
.partitioner("workerMigrationStep", partitioner)
.step(workerMigrationStep)
.gridSize(10) // 10개 파티션
.taskExecutor(migrationTaskExecutor()) // 병렬 실행
.build();
}
/**
* Worker Step: 각 파티션의 ID 범위를 처리
*/
@Bean
public Step workerMigrationStep(
ItemReader<LegacyUser> legacyUserReader,
ItemProcessor<LegacyUser, User> userTransformProcessor,
ItemWriter<User> newUserWriter) {
return new StepBuilder("workerMigrationStep", jobRepository)
.<LegacyUser, User>chunk(1000, transactionManager)
.reader(legacyUserReader)
.processor(userTransformProcessor)
.writer(newUserWriter)
.faultTolerant()
.retryLimit(3)
.retry(TransientDataAccessException.class)
.build();
}
@Bean
public TaskExecutor migrationTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 파티션 수와 동일
executor.setMaxPoolSize(10);
executor.setQueueCapacity(0);
executor.setThreadNamePrefix("migration-");
executor.initialize();
return executor;
}
}Worker Reader: 파티션 범위 기반 읽기
@Bean
@StepScope
public JdbcCursorItemReader<LegacyUser> legacyUserReader(
@Qualifier("legacyDataSource") DataSource legacyDataSource,
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
log.info("LegacyUserReader 생성 - minId={}, maxId={}", minId, maxId);
return new JdbcCursorItemReaderBuilder<LegacyUser>()
.name("legacyUserReader")
.dataSource(legacyDataSource)
.sql("""
SELECT id, username, email, phone, created_date, status
FROM legacy_users
WHERE id BETWEEN ? AND ?
ORDER BY id
""")
.preparedStatementSetter(ps -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.rowMapper(new BeanPropertyRowMapper<>(LegacyUser.class))
.fetchSize(1000)
.build();
}Processor: 레거시 → 신규 도메인 변환
@Component
public class UserTransformProcessor implements ItemProcessor<LegacyUser, User> {
@Override
public User process(LegacyUser legacy) throws Exception {
// 레거시 데이터 정제
String normalizedPhone = normalizePhone(legacy.getPhone());
UserStatus status = mapStatus(legacy.getStatus());
// null 검사
if (legacy.getEmail() == null || legacy.getEmail().isBlank()) {
// 이메일 없는 레거시 계정은 Skip (null 반환)
log.warn("이메일 없는 레거시 유저 Skip - legacyId={}", legacy.getId());
return null;
}
return User.builder()
.legacyId(legacy.getId())
.username(legacy.getUsername().trim())
.email(legacy.getEmail().toLowerCase().trim())
.phone(normalizedPhone)
.status(status)
.createdAt(legacy.getCreatedDate().toLocalDateTime())
.migratedAt(LocalDateTime.now())
.build();
}
private String normalizePhone(String phone) {
if (phone == null) return null;
return phone.replaceAll("[^0-9]", ""); // 숫자만 추출
}
private UserStatus mapStatus(String legacyStatus) {
return switch (legacyStatus) {
case "A", "ACTIVE" -> UserStatus.ACTIVE;
case "I", "INACTIVE" -> UserStatus.INACTIVE;
case "D", "DELETED" -> UserStatus.WITHDRAWN;
default -> UserStatus.INACTIVE;
};
}
}진행 상황 모니터링 쿼리
-- 현재 실행 중인 Job 전체 진행 상황
SELECT
ji.JOB_NAME,
je.JOB_EXECUTION_ID,
je.STATUS,
je.START_TIME,
je.END_TIME,
se.STEP_NAME,
se.STATUS AS STEP_STATUS,
se.READ_COUNT,
se.WRITE_COUNT,
se.SKIP_COUNT,
ROUND(se.WRITE_COUNT * 100.0 /
NULLIF(se.READ_COUNT + se.WRITE_COUNT, 0), 1) AS PROGRESS_PCT
FROM BATCH_JOB_EXECUTION je
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
JOIN BATCH_STEP_EXECUTION se ON je.JOB_EXECUTION_ID = se.JOB_EXECUTION_ID
WHERE je.STATUS = 'STARTED'
ORDER BY se.START_TIME DESC;
-- 파티션별 처리 현황
SELECT
se.STEP_NAME,
se.STATUS,
se.READ_COUNT,
se.WRITE_COUNT,
se.SKIP_COUNT,
TIMESTAMPDIFF(SECOND, se.START_TIME, COALESCE(se.END_TIME, NOW())) AS ELAPSED_SECONDS
FROM BATCH_STEP_EXECUTION se
JOIN BATCH_JOB_EXECUTION je ON se.JOB_EXECUTION_ID = je.JOB_EXECUTION_ID
WHERE je.JOB_EXECUTION_ID = :jobExecutionId
AND se.STEP_NAME LIKE 'workerMigrationStep%'
ORDER BY se.STEP_NAME;예상 처리 시간 계산
@Component
@RequiredArgsConstructor
public class MigrationProgressEstimator {
private final JdbcTemplate jdbcTemplate;
public MigrationEstimate estimate(long jobExecutionId) {
Map<String, Object> stats = jdbcTemplate.queryForMap("""
SELECT
SUM(READ_COUNT) AS total_read,
SUM(WRITE_COUNT) AS total_written,
MAX(TIMESTAMPDIFF(SECOND, START_TIME, NOW())) AS elapsed_seconds
FROM BATCH_STEP_EXECUTION
WHERE JOB_EXECUTION_ID = ?
AND STEP_NAME LIKE 'workerMigrationStep%'
""", jobExecutionId);
long totalWritten = ((Number) stats.get("total_written")).longValue();
long elapsedSeconds = ((Number) stats.get("elapsed_seconds")).longValue();
long totalTarget = getTotalTarget(); // 전체 마이그레이션 대상 건수
double throughput = elapsedSeconds > 0
? (double) totalWritten / elapsedSeconds : 0;
long remainingItems = totalTarget - totalWritten;
long estimatedRemainingSeconds = throughput > 0
? (long) (remainingItems / throughput) : Long.MAX_VALUE;
return MigrationEstimate.builder()
.totalWritten(totalWritten)
.totalTarget(totalTarget)
.progressPercent(totalTarget > 0 ? totalWritten * 100.0 / totalTarget : 0)
.throughputPerSecond((long) throughput)
.estimatedRemainingSeconds(estimatedRemainingSeconds)
.build();
}
}예제 3: 외부 파일 가져오기 배치 (FTP → DB)
시나리오
매일 오전 6시 파트너사 FTP 서버에서 주문 CSV 파일을 내려받아 DB에 저장한다. 잘못된 행은 Skip하고 오류 리포트를 생성한다.
전체 Job 구성 (4단계)
@Configuration
@RequiredArgsConstructor
@Slf4j
public class FtpImportJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
@Bean
public Job ftpImportJob(Step downloadStep,
Step importStep,
Step moveFileStep) {
return new JobBuilder("ftpImportJob", jobRepository)
// 다운로드 → 파싱/저장 → 파일 이동
.start(downloadStep)
.next(importStep)
.next(moveFileStep)
.build();
}
// ===== Step 1: FTP 파일 다운로드 =====
@Bean
public Step downloadStep(FtpDownloadTasklet downloadTasklet) {
return new StepBuilder("downloadStep", jobRepository)
.tasklet(downloadTasklet, transactionManager)
.build();
}
// ===== Step 2: 파일 파싱 및 DB 저장 =====
@Bean
public Step importStep(
FlatFileItemReader<PartnerOrder> fileReader,
PartnerOrderValidationProcessor validationProcessor,
JdbcBatchItemWriter<PartnerOrder> orderWriter,
ImportErrorListener errorListener) {
return new StepBuilder("importStep", jobRepository)
.<PartnerOrder, PartnerOrder>chunk(1000, transactionManager)
.reader(fileReader)
.processor(validationProcessor)
.writer(orderWriter)
.faultTolerant()
.skipLimit(500) // 최대 500행 Skip
.skip(FlatFileParseException.class) // 파싱 오류 Skip
.skip(InvalidPartnerOrderException.class) // 검증 실패 Skip
.listener(errorListener) // 오류 리포트 생성
.build();
}
// ===== Step 3: 처리 완료 파일 이동 =====
@Bean
public Step moveFileStep(FileMoveTasklet moveTasklet) {
return new StepBuilder("moveFileStep", jobRepository)
.tasklet(moveTasklet, transactionManager)
.build();
}
}Tasklet: FTP 파일 다운로드
@Component
@StepScope
@RequiredArgsConstructor
@Slf4j
public class FtpDownloadTasklet implements Tasklet {
private final FTPClient ftpClient;
@Value("${ftp.host}")
private String ftpHost;
@Value("${ftp.remote-path}")
private String remotePath;
@Value("${batch.local-temp-dir}")
private String localTempDir;
@Value("#{jobParameters['targetDate']}")
private String targetDate;
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
String remoteFile = remotePath + "/orders_" + targetDate + ".csv";
String localFile = localTempDir + "/orders_" + targetDate + ".csv";
try {
ftpClient.connect(ftpHost);
ftpClient.login(ftpUser, ftpPassword);
ftpClient.enterLocalPassiveMode();
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
log.info("FTP 파일 다운로드 시작 - remote={}", remoteFile);
try (FileOutputStream fos = new FileOutputStream(localFile)) {
boolean success = ftpClient.retrieveFile(remoteFile, fos);
if (!success) {
throw new FtpDownloadException("파일 다운로드 실패: " + remoteFile);
}
}
// 다운로드 완료된 파일 경로를 JobExecutionContext에 저장
chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putString("localFilePath", localFile);
log.info("FTP 다운로드 완료 - local={}", localFile);
contribution.incrementWriteCount(1);
} finally {
if (ftpClient.isConnected()) {
ftpClient.logout();
ftpClient.disconnect();
}
}
return RepeatStatus.FINISHED;
}
}FlatFileItemReader: CSV 파일 읽기
@Bean
@StepScope
public FlatFileItemReader<PartnerOrder> partnerOrderFileReader(
@Value("#{jobExecutionContext['localFilePath']}") String filePath) {
log.info("CSV 파일 Reader 생성 - file={}", filePath);
return new FlatFileItemReaderBuilder<PartnerOrder>()
.name("partnerOrderFileReader")
.resource(new FileSystemResource(filePath))
.linesToSkip(1) // 헤더 행 건너뜀
.delimited()
.delimiter(",")
.names("partnerOrderId", "userId", "productCode", "quantity",
"unitPrice", "totalAmount", "orderDate")
.fieldSetMapper(fieldSet -> PartnerOrder.builder()
.partnerOrderId(fieldSet.readString("partnerOrderId"))
.userId(fieldSet.readLong("userId"))
.productCode(fieldSet.readString("productCode"))
.quantity(fieldSet.readInt("quantity"))
.unitPrice(fieldSet.readBigDecimal("unitPrice"))
.totalAmount(fieldSet.readBigDecimal("totalAmount"))
.orderDate(LocalDate.parse(fieldSet.readString("orderDate")))
.build())
.build();
}Processor: 데이터 검증
@Component
@Slf4j
public class PartnerOrderValidationProcessor
implements ItemProcessor<PartnerOrder, PartnerOrder> {
@Override
public PartnerOrder process(PartnerOrder order) throws Exception {
List<String> errors = new ArrayList<>();
// 필수 필드 검증
if (order.getPartnerOrderId() == null || order.getPartnerOrderId().isBlank()) {
errors.add("partnerOrderId가 비어있음");
}
if (order.getUserId() == null || order.getUserId() <= 0) {
errors.add("유효하지 않은 userId: " + order.getUserId());
}
// 데이터 타입 검증
if (order.getQuantity() <= 0) {
errors.add("수량은 0보다 커야 함: " + order.getQuantity());
}
if (order.getTotalAmount().compareTo(
order.getUnitPrice().multiply(BigDecimal.valueOf(order.getQuantity()))) != 0) {
errors.add("합계 금액 불일치");
}
// 날짜 검증 (미래 날짜 주문 불가)
if (order.getOrderDate().isAfter(LocalDate.now())) {
errors.add("미래 날짜 주문: " + order.getOrderDate());
}
if (!errors.isEmpty()) {
throw new InvalidPartnerOrderException(
"주문 검증 실패 - orderId=" + order.getPartnerOrderId() +
", errors=" + String.join(", ", errors)
);
}
return order;
}
}SkipListener: 오류 리포트 생성
@Component
@RequiredArgsConstructor
@Slf4j
public class ImportErrorListener
implements SkipListener<PartnerOrder, PartnerOrder>, StepExecutionListener {
private final List<ErrorReport> errorReports = new ArrayList<>();
@Value("${batch.error-report-dir}")
private String errorReportDir;
private String targetDate;
@Override
public void beforeStep(StepExecution stepExecution) {
this.targetDate = stepExecution.getJobParameters().getString("targetDate");
}
@Override
public void onSkipInRead(Throwable t) {
if (t instanceof FlatFileParseException e) {
errorReports.add(ErrorReport.builder()
.lineNumber(e.getLineNumber())
.input(e.getInput())
.reason("파싱 실패: " + t.getMessage())
.build());
}
}
@Override
public void onSkipInProcess(PartnerOrder item, Throwable t) {
errorReports.add(ErrorReport.builder()
.orderId(item.getPartnerOrderId())
.reason("검증 실패: " + t.getMessage())
.build());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if (!errorReports.isEmpty()) {
writeErrorReport();
log.warn("오류 리포트 생성 - targetDate={}, errorCount={}",
targetDate, errorReports.size());
}
return null;
}
private void writeErrorReport() {
String reportPath = errorReportDir + "/error_report_" + targetDate + ".csv";
try (PrintWriter writer = new PrintWriter(new FileWriter(reportPath))) {
writer.println("line_number,order_id,reason");
errorReports.forEach(r ->
writer.printf("%s,%s,%s%n", r.getLineNumber(), r.getOrderId(), r.getReason())
);
} catch (IOException e) {
log.error("오류 리포트 쓰기 실패", e);
}
}
}Tasklet: 처리 완료 후 파일 이동
@Component
@StepScope
@RequiredArgsConstructor
@Slf4j
public class FileMoveTasklet implements Tasklet {
@Value("${batch.processed-dir}")
private String processedDir;
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
String localFilePath = chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.getString("localFilePath");
Path source = Paths.get(localFilePath);
Path target = Paths.get(processedDir, source.getFileName().toString());
Files.createDirectories(target.getParent());
Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
log.info("파일 이동 완료 - {} → {}", source, target);
return RepeatStatus.FINISHED;
}
}공통 유틸리티 패턴
배치 실행 시간 측정 AOP
@Aspect
@Component
@Slf4j
public class BatchPerformanceAspect {
@Around("execution(* org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(..))")
public Object measureChunkTime(ProceedingJoinPoint pjp) throws Throwable {
long start = System.currentTimeMillis();
Object result = pjp.proceed();
long elapsed = System.currentTimeMillis() - start;
if (elapsed > 5000) { // 5초 이상 소요되는 청크 경고
log.warn("청크 처리 지연 감지 - elapsed={}ms", elapsed);
}
return result;
}
// Job 전체 실행 시간 측정
@Component
public static class JobTimingListener implements JobExecutionListener {
@Override
public void afterJob(JobExecution jobExecution) {
Duration duration = Duration.between(
jobExecution.getStartTime(), jobExecution.getEndTime());
log.info("Job 완료 - name={}, status={}, duration={}분 {}초",
jobExecution.getJobInstance().getJobName(),
jobExecution.getStatus(),
duration.toMinutes(),
duration.toSecondsPart());
}
}
}배치 메타데이터 DB와 업무 DB 분리 설정
대규모 배치에서는 메타데이터 DB와 업무 DB를 분리해 서로 영향을 주지 않도록 한다.
@Configuration
public class DataSourceConfig {
/**
* 배치 메타데이터 전용 DataSource (BATCH_* 테이블)
* Spring Batch 자동 구성이 이 DataSource를 사용하도록 설정
*/
@Bean
@Primary
@ConfigurationProperties("spring.datasource.batch-meta")
public DataSource batchMetaDataSource() {
return DataSourceBuilder.create().build();
}
/**
* 업무 DB DataSource (orders, users, settlements 등)
*/
@Bean
@ConfigurationProperties("spring.datasource.business")
public DataSource businessDataSource() {
return DataSourceBuilder.create().build();
}
/**
* 배치 메타데이터용 TransactionManager
*/
@Bean
@Primary
public PlatformTransactionManager batchTransactionManager(
@Qualifier("batchMetaDataSource") DataSource batchMetaDataSource) {
return new DataSourceTransactionManager(batchMetaDataSource);
}
/**
* 업무 DB용 TransactionManager
*/
@Bean
public PlatformTransactionManager businessTransactionManager(
@Qualifier("businessDataSource") DataSource businessDataSource) {
return new DataSourceTransactionManager(businessDataSource);
}
}# application.yml
spring:
datasource:
batch-meta:
url: jdbc:mysql://batch-meta-db:3306/batch_metadata
username: batch_user
password: ${BATCH_META_DB_PASSWORD}
hikari:
maximum-pool-size: 10
pool-name: BatchMetaPool
business:
url: jdbc:mysql://business-db:3306/business?rewriteBatchedStatements=true
username: batch_reader
password: ${BUSINESS_DB_PASSWORD}
hikari:
maximum-pool-size: 30 # 파티션 수(10) * 3 여유분
pool-name: BusinessPool
batch:
job:
enabled: false
jdbc:
initialize-schema: never # 스키마 자동 생성 비활성화 (운영환경)주의: Spring Batch가 @Primary DataSource를 자동으로 메타데이터 저장소로 사용한다. DefaultBatchConfiguration을 상속해 명시적으로 getDataSource()를 오버라이드하는 것이 더 안전하다.
@Configuration
public class BatchConfig extends DefaultBatchConfiguration {
@Autowired
@Qualifier("batchMetaDataSource")
private DataSource batchMetaDataSource;
@Override
protected DataSource getDataSource() {
return batchMetaDataSource; // 명시적으로 메타데이터 DB 지정
}
}