병렬 처리 전략 비교

Spring Batch에서 제공하는 병렬 처리 방식은 크게 4가지다. 상황에 따라 적절한 전략을 선택해야 성능과 안정성을 모두 잡을 수 있다.

전략핵심 개념적합한 상황재시작 지원복잡도
Multi-threaded Step단일 Step에서 청크를 여러 스레드로 처리I/O 바운드 작업, Reader가 Thread-safe할 때제한적낮음
Parallel Steps (Split Flow)독립적인 Step들을 병렬 실행독립적인 비즈니스 플로우가 여러 개일 때완전 지원낮음
Partitioning데이터를 나누어 Worker Step이 병렬 처리대용량 단일 데이터셋 처리완전 지원중간
Remote ChunkingManager가 읽기, Worker가 처리/쓰기 (네트워크)처리 로직이 극도로 무거울 때복잡높음

1. Multi-threaded Step

개념

하나의 Step 내에서 청크(Chunk)를 여러 스레드가 동시에 처리한다. 구현이 단순하며, TaskExecutor만 Step에 연결하면 된다. 단, Reader가 Thread-safe하지 않으면 데이터 중복 또는 손실이 발생한다.

ThreadPoolTaskExecutor 설정

@Configuration
public class ThreadPoolConfig {
 
    @Bean
    public ThreadPoolTaskExecutor batchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);        // 기본 스레드 수
        executor.setMaxPoolSize(8);         // 최대 스레드 수
        executor.setQueueCapacity(25);      // 큐 대기 용량
        executor.setThreadNamePrefix("batch-thread-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
}

Multi-threaded Step 전체 코드

@Configuration
@RequiredArgsConstructor
public class MultiThreadedJobConfig {
 
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final DataSource dataSource;
    private final ThreadPoolTaskExecutor batchTaskExecutor;
 
    @Bean
    public Job multiThreadedJob() {
        return new JobBuilder("multiThreadedJob", jobRepository)
                .start(multiThreadedStep())
                .build();
    }
 
    @Bean
    public Step multiThreadedStep() {
        return new StepBuilder("multiThreadedStep", jobRepository)
                .<User, UserDto>chunk(100, transactionManager)
                .reader(threadSafeUserReader())
                .processor(userProcessor())
                .writer(userWriter())
                .taskExecutor(batchTaskExecutor)  // TaskExecutor 적용
                .build();
    }
 
    @Bean
    @StepScope
    public SynchronizedItemStreamReader<User> threadSafeUserReader() {
        // Thread-safe하지 않은 JdbcCursorItemReader를 래핑
        JdbcCursorItemReader<User> reader = new JdbcCursorItemReaderBuilder<User>()
                .name("userReader")
                .dataSource(dataSource)
                .sql("SELECT id, name, email FROM users WHERE status = 'ACTIVE'")
                .rowMapper(new BeanPropertyRowMapper<>(User.class))
                .build();
 
        // SynchronizedItemStreamReader로 동기화 처리
        SynchronizedItemStreamReader<User> synchronizedReader = new SynchronizedItemStreamReader<>();
        synchronizedReader.setDelegate(reader);
        return synchronizedReader;
    }
 
    @Bean
    public ItemProcessor<User, UserDto> userProcessor() {
        return user -> {
            // Thread-safe한 처리 로직 (stateless 권장)
            return new UserDto(user.getId(), user.getName().toUpperCase(), user.getEmail());
        };
    }
 
    @Bean
    public JdbcBatchItemWriter<UserDto> userWriter() {
        return new JdbcBatchItemWriterBuilder<UserDto>()
                .dataSource(dataSource)
                .sql("INSERT INTO user_dto (id, name, email) VALUES (:id, :name, :email) " +
                     "ON DUPLICATE KEY UPDATE name = :name, email = :email")
                .beanMapped()
                .build();
    }
}

Thread-safe하지 않은 Reader 문제

JdbcCursorItemReader는 DB 커서를 내부적으로 유지하므로 멀티스레드 환경에서 사용하면 안 된다. 두 스레드가 동시에 read()를 호출하면 커서 위치가 꼬인다.

// 잘못된 방식 - JdbcCursorItemReader를 그대로 사용
@Bean
public JdbcCursorItemReader<User> unsafeReader() {
    return new JdbcCursorItemReaderBuilder<User>()
            .name("unsafeReader")
            .dataSource(dataSource)
            .sql("SELECT * FROM users")
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .build();
    // Multi-threaded Step에서 사용하면 데이터 중복/누락 발생!
}
 
// 올바른 방식 1 - SynchronizedItemStreamReader로 래핑
@Bean
public SynchronizedItemStreamReader<User> safeReader() {
    JdbcCursorItemReader<User> delegate = new JdbcCursorItemReaderBuilder<User>()
            .name("userCursorReader")
            .dataSource(dataSource)
            .sql("SELECT * FROM users")
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .build();
 
    SynchronizedItemStreamReader<User> reader = new SynchronizedItemStreamReader<>();
    reader.setDelegate(delegate);
    return reader;
}
 
// 올바른 방식 2 - JdbcPagingItemReader 사용 (페이지 단위라 본질적으로 Thread-safe)
@Bean
public JdbcPagingItemReader<User> pagingReader(PagingQueryProvider queryProvider) {
    return new JdbcPagingItemReaderBuilder<User>()
            .name("userPagingReader")
            .dataSource(dataSource)
            .queryProvider(queryProvider)
            .rowMapper(new BeanPropertyRowMapper<>(User.class))
            .pageSize(100)
            .build();
}

재시작 지원 제한 이유

Multi-threaded Step은 실패 후 재시작 시 어느 청크부터 다시 처리해야 할지 특정하기 어렵다. 청크들이 순서 없이 병렬 처리되므로 ExecutionContext에 정확한 재시작 위치를 저장하기 불가능하다.

// 재시작 불필요 설정 (멱등성이 보장된 경우)
@Bean
public Step multiThreadedStep() {
    return new StepBuilder("multiThreadedStep", jobRepository)
            .<User, UserDto>chunk(100, transactionManager)
            .reader(threadSafeUserReader())
            .processor(userProcessor())
            .writer(userWriter())
            .taskExecutor(batchTaskExecutor)
            .allowStartIfComplete(true)  // 이미 완료된 Step도 재실행 허용
            .build();
}

언제 쓰는가: I/O 바운드 작업(파일 읽기/쓰기, 외부 API 호출)이 병목일 때, Reader를 Thread-safe하게 만들 수 있을 때, 구현을 단순하게 유지하고 싶을 때.


2. Parallel Steps (Split Flow)

개념

서로 독립적인 Step들을 동시에 실행한다. Split Flow를 사용하면 모든 병렬 Step이 완료된 후 다음 Step으로 진행한다.

전체 코드: 파일 처리 + 이메일 발송 병렬 실행

@Configuration
@RequiredArgsConstructor
public class ParallelStepsJobConfig {
 
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final ThreadPoolTaskExecutor batchTaskExecutor;
 
    @Bean
    public Job parallelStepsJob() {
        // flow1과 flow2를 병렬로 실행한 뒤, 마지막 Step 실행
        return new JobBuilder("parallelStepsJob", jobRepository)
                .start(splitFlow())
                .next(finalStep())  // 두 Flow가 모두 완료된 후 실행
                .end()
                .build();
    }
 
    @Bean
    public Flow splitFlow() {
        return new FlowBuilder<SimpleFlow>("splitFlow")
                .split(batchTaskExecutor)
                .add(fileProcessingFlow(), emailSendingFlow())
                .build();
    }
 
    // Flow 1: 파일 처리
    @Bean
    public Flow fileProcessingFlow() {
        return new FlowBuilder<SimpleFlow>("fileProcessingFlow")
                .start(fileProcessingStep())
                .build();
    }
 
    @Bean
    public Step fileProcessingStep() {
        return new StepBuilder("fileProcessingStep", jobRepository)
                .<Order, ProcessedOrder>chunk(200, transactionManager)
                .reader(orderFileReader())
                .processor(orderProcessor())
                .writer(processedOrderWriter())
                .build();
    }
 
    // Flow 2: 이메일 발송
    @Bean
    public Flow emailSendingFlow() {
        return new FlowBuilder<SimpleFlow>("emailSendingFlow")
                .start(emailSendingStep())
                .build();
    }
 
    @Bean
    public Step emailSendingStep() {
        return new StepBuilder("emailSendingStep", jobRepository)
                .<Member, EmailResult>chunk(50, transactionManager)
                .reader(pendingEmailReader())
                .processor(emailProcessor())
                .writer(emailResultWriter())
                .build();
    }
 
    // 병렬 처리 완료 후 최종 집계 Step
    @Bean
    public Step finalStep() {
        return new StepBuilder("finalStep", jobRepository)
                .tasklet((contribution, chunkContext) -> {
                    // 두 Flow의 결과를 종합
                    log.info("파일 처리 및 이메일 발송 모두 완료. 집계 시작.");
                    return RepeatStatus.FINISHED;
                }, transactionManager)
                .build();
    }
 
    // Reader/Processor/Writer 빈들은 생략...
}

핵심 동작: split(taskExecutor).add(flow1, flow2)에서 두 Flow는 각각 별도 스레드에서 실행된다. 둘 중 하나라도 실패하면 전체 Job이 FAILED 상태가 된다.


3. Partitioning (핵심 전략)

개념

Manager Step이 전체 데이터를 여러 파티션으로 분할하고, 각 파티션을 Worker Step에 할당하여 병렬 처리한다. 각 Worker는 독립적인 StepExecution을 갖는다.

Manager Step
    ├── 파티션 1 (ID: 1 ~ 1000)   → Worker Step (Thread 1)
    ├── 파티션 2 (ID: 1001 ~ 2000) → Worker Step (Thread 2)
    ├── 파티션 3 (ID: 2001 ~ 3000) → Worker Step (Thread 3)
    └── 파티션 4 (ID: 3001 ~ 4000) → Worker Step (Thread 4)

Partitioner 인터페이스 구현: ColumnRangePartitioner

@Component
@RequiredArgsConstructor
public class ColumnRangePartitioner implements Partitioner {
 
    private final JdbcTemplate jdbcTemplate;
 
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        // 처리 대상 데이터의 min/max ID 조회
        Map<String, Object> result = jdbcTemplate.queryForMap(
                "SELECT MIN(id) AS min_id, MAX(id) AS max_id FROM orders WHERE status = 'PENDING'"
        );
 
        long minId = ((Number) result.get("min_id")).longValue();
        long maxId = ((Number) result.get("max_id")).longValue();
 
        // 파티션당 처리할 데이터 범위 계산
        long targetSize = (maxId - minId) / gridSize + 1;
 
        Map<String, ExecutionContext> partitions = new HashMap<>();
 
        long start = minId;
        long end = start + targetSize - 1;
 
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putLong("minId", start);
            context.putLong("maxId", Math.min(end, maxId));
            context.putInt("partitionNumber", i);
 
            partitions.put("partition" + i, context);
 
            start += targetSize;
            end += targetSize;
 
            if (start > maxId) {
                break;
            }
        }
 
        return partitions;
    }
}

TaskExecutorPartitionHandler 설정

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
    handler.setTaskExecutor(batchTaskExecutor);  // 병렬 실행 executor
    handler.setStep(workerStep());               // Worker Step 지정
    handler.setGridSize(4);                      // 파티션 수 (스레드 수)
    return handler;
}

Worker Step: @StepScope + ExecutionContext에서 범위 읽기

@Bean
@StepScope
public JdbcPagingItemReader<Order> partitionedOrderReader(
        @Value("#{stepExecutionContext['minId']}") Long minId,
        @Value("#{stepExecutionContext['maxId']}") Long maxId) {
 
    log.info("파티션 처리 범위: {} ~ {}", minId, maxId);
 
    SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
    queryProvider.setDataSource(dataSource);
    queryProvider.setSelectClause("SELECT id, product_id, amount, status");
    queryProvider.setFromClause("FROM orders");
    queryProvider.setWhereClause("WHERE id >= :minId AND id <= :maxId AND status = 'PENDING'");
    queryProvider.setSortKey("id");
 
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("minId", minId);
    parameterValues.put("maxId", maxId);
 
    try {
        return new JdbcPagingItemReaderBuilder<Order>()
                .name("partitionedOrderReader")
                .dataSource(dataSource)
                .queryProvider(queryProvider.getObject())
                .parameterValues(parameterValues)
                .rowMapper(new BeanPropertyRowMapper<>(Order.class))
                .pageSize(100)
                .build();
    } catch (Exception e) {
        throw new RuntimeException("Reader 생성 실패", e);
    }
}

Manager Step + Worker Step 전체 @Configuration

@Configuration
@RequiredArgsConstructor
public class PartitioningJobConfig {
 
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final DataSource dataSource;
    private final ColumnRangePartitioner columnRangePartitioner;
    private final ThreadPoolTaskExecutor batchTaskExecutor;
 
    @Bean
    public Job partitioningJob() {
        return new JobBuilder("partitioningJob", jobRepository)
                .start(managerStep())
                .build();
    }
 
    // Manager Step: 파티션 분할 후 Worker에 위임
    @Bean
    public Step managerStep() {
        return new StepBuilder("managerStep", jobRepository)
                .partitioner("workerStep", columnRangePartitioner)  // Partitioner 지정
                .partitionHandler(partitionHandler())
                .build();
    }
 
    @Bean
    public PartitionHandler partitionHandler() {
        TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
        handler.setTaskExecutor(batchTaskExecutor);
        handler.setStep(workerStep());
        handler.setGridSize(4);  // CPU 코어 수 또는 데이터 크기에 따라 조정
        return handler;
    }
 
    // Worker Step: Manager가 생성한 각 파티션에서 실행
    @Bean
    public Step workerStep() {
        return new StepBuilder("workerStep", jobRepository)
                .<Order, ProcessedOrder>chunk(100, transactionManager)
                .reader(partitionedOrderReader(null, null))  // @StepScope로 Late Binding
                .processor(orderProcessor())
                .writer(processedOrderWriter())
                .faultTolerant()
                .skipLimit(10)
                .skip(DataAccessException.class)
                .build();
    }
 
    @Bean
    @StepScope
    public JdbcPagingItemReader<Order> partitionedOrderReader(
            @Value("#{stepExecutionContext['minId']}") Long minId,
            @Value("#{stepExecutionContext['maxId']}") Long maxId) {
        // 위의 코드와 동일
        // ...
        return null; // 실제 구현 생략
    }
 
    @Bean
    public ItemProcessor<Order, ProcessedOrder> orderProcessor() {
        return order -> {
            // 주문 처리 로직
            ProcessedOrder processed = new ProcessedOrder();
            processed.setOrderId(order.getId());
            processed.setProcessedAt(LocalDateTime.now());
            return processed;
        };
    }
 
    @Bean
    public JdbcBatchItemWriter<ProcessedOrder> processedOrderWriter() {
        return new JdbcBatchItemWriterBuilder<ProcessedOrder>()
                .dataSource(dataSource)
                .sql("UPDATE orders SET status = 'PROCESSED', processed_at = :processedAt WHERE id = :orderId")
                .beanMapped()
                .build();
    }
}

gridSize vs 실제 파티션 수

gridSize는 Partitioner에게 “몇 개로 나눠라”라고 힌트를 주는 값이다. 실제 파티션 수는 Partitioner 구현에 따라 달라질 수 있다. 예를 들어, 데이터가 10건밖에 없는데 gridSize가 4라면 파티션이 4개보다 적게 생성될 수 있다.

// ColumnRangePartitioner에서 gridSize를 기준으로 파티션 생성
// 실제 파티션 수 = Math.min(gridSize, 실제 데이터 범위)

파티셔닝이 대용량 처리에 효과적인 이유

  1. 독립적인 StepExecution: 각 파티션이 별도 StepExecution을 가져 실패/재시작이 파티션 단위로 처리된다
  2. 완전한 재시작 지원: 실패한 파티션만 재처리 가능
  3. 데이터 격리: 각 Worker가 서로 다른 ID 범위를 처리하므로 동기화 불필요
  4. 선형적 성능 향상: 4 파티션 = 약 4배 빠른 처리 (I/O 한계까지)

4. Remote Chunking

개념

Manager가 데이터를 읽어 메시지 큐(RabbitMQ, Kafka)로 전송하고, 여러 Worker 인스턴스가 각 메시지를 받아 처리와 쓰기를 담당한다. 처리 로직이 극도로 무거울 때 사용한다.

Manager 서버                     Worker 서버들
┌──────────────┐    RabbitMQ    ┌──────────────┐
│ ItemReader   │ ──→ Queue ──→  │ ItemProcessor│
│ (데이터 읽기) │                 │ ItemWriter   │ x N
└──────────────┘                └──────────────┘

Spring Integration + RabbitMQ 기반 아키텍처

<!-- Maven 의존성 -->
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-amqp</artifactId>
</dependency>
// Manager 측 설정
@Configuration
public class RemoteChunkingManagerConfig {
 
    @Bean
    public Step managerStep(
            RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory) {
 
        return managerStepBuilderFactory
                .<User, User>get("managerStep")
                .chunk(100)
                .reader(userReader())
                .outputChannel(requestsChannel())   // Worker로 청크 전송
                .inputChannel(repliesChannel())      // Worker 처리 결과 수신
                .build();
    }
 
    @Bean
    public DirectChannel requestsChannel() {
        return new DirectChannel();
    }
 
    @Bean
    public DirectChannel repliesChannel() {
        return new DirectChannel();
    }
 
    @Bean
    public AmqpOutboundEndpoint amqpOutboundEndpoint(AmqpTemplate amqpTemplate) {
        AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(amqpTemplate);
        endpoint.setExchangeName("batch.requests");
        endpoint.setOutputChannel(repliesChannel());
        return endpoint;
    }
}
 
// Worker 측 설정
@Configuration
public class RemoteChunkingWorkerConfig {
 
    @Bean
    public IntegrationFlow workerIntegrationFlow(
            RemoteChunkingWorkerBuilder<User, User> workerBuilder) {
 
        return workerBuilder
                .itemProcessor(userProcessor())
                .itemWriter(userWriter())
                .inputChannel(requestsChannel())
                .outputChannel(repliesChannel())
                .build();
    }
}

Remote Chunking vs Partitioning 선택 기준

기준PartitioningRemote Chunking
병목 지점데이터 읽기/쓰기데이터 처리(CPU 집약)
네트워크 통신없음 (로컬)있음 (메시지 큐)
인프라 복잡도낮음높음 (MQ 필요)
확장성수직 확장 (스레드)수평 확장 (Worker 서버 추가)
재시작완전 지원복잡
추천 상황대부분의 대용량 배치ML 추론, 이미지 처리 등 CPU 집약 작업

실무 권장: 대부분의 대용량 배치는 Partitioning으로 충분하다. Remote Chunking은 처리 로직이 단일 서버 CPU를 포화시킬 정도로 무거울 때만 고려한다.


실무 팁

스레드 수 결정 방법

// CPU 바운드 작업: 코어 수와 동일하게
int cpuCores = Runtime.getRuntime().availableProcessors();
 
// I/O 바운드 작업: 코어 수의 2~4배 (대기 시간이 많으므로)
int ioThreads = cpuCores * 2;
 
// DB 작업: 커넥션 풀 크기 이하로 설정
// HikariCP maxPoolSize = 10이라면, gridSize는 8 이하 권장

청크 크기와 병렬 처리 조합

// 총 10만 건, 4개 파티션, 청크 크기 100
// - 각 파티션: 2만 5천 건
// - 각 파티션당 250회 트랜잭션
// - 총 커밋: 1000회 (단일 스레드 대비 4배 빠름)
 
// 청크가 너무 크면: 메모리 사용량 증가, 실패 시 재처리 범위 커짐
// 청크가 너무 작으면: 트랜잭션 오버헤드 증가
// 권장: 100 ~ 1000 사이에서 벤치마크

파티셔닝 모니터링

// Manager Step이 생성한 파티션 목록 확인
@AfterJob
public void logPartitions(JobExecution jobExecution) {
    jobExecution.getStepExecutions().forEach(stepExecution -> {
        log.info("Step: {}, ReadCount: {}, WriteCount: {}, Status: {}",
                stepExecution.getStepName(),
                stepExecution.getReadCount(),
                stepExecution.getWriteCount(),
                stepExecution.getStatus());
    });
}