ItemReader 기본 개념

public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException,
                    ParseException, NonTransientResourceException;
}

read()null을 반환하면 입력이 끝났다는 신호다. Spring Batch는 null을 받으면 해당 Step의 읽기를 종료하고 남은 아이템으로 마지막 청크를 처리한다.

ItemStreamReader — 재시작 지원

public interface ItemStreamReader<T> extends ItemStream, ItemReader<T> {}
 
public interface ItemStream {
    void open(ExecutionContext executionContext);   // 스트림 열기, 상태 복원
    void update(ExecutionContext executionContext); // 청크마다 현재 상태 저장
    void close();                                  // 스트림 닫기
}
  • open(): 배치 시작(또는 재시작) 시 호출된다. ExecutionContext에 저장된 오프셋을 읽어 중단 지점부터 재개한다.
  • update(): 청크가 커밋될 때마다 호출된다. 현재 위치를 ExecutionContext에 저장한다.
  • close(): Step 종료 시 호출된다. 파일 스트림이나 DB 커넥션을 닫는다.

Thread-safety 주의사항

Spring Batch의 기본 제공 ItemStreamReader 구현체(FlatFileItemReader, JdbcCursorItemReader 등)는 대부분 Thread-safe하지 않다. Multi-threaded Step에서 사용하려면 SynchronizedItemStreamReader로 래핑해야 한다.

@Bean
public SynchronizedItemStreamReader<Order> synchronizedOrderReader() {
    return new SynchronizedItemStreamReaderBuilder<Order>()
        .delegate(orderFlatFileReader()) // Thread-safe하지 않은 원본 Reader
        .build();
}

FlatFileItemReader

CSV 파일 (DelimitedLineTokenizer)

// 처리할 CSV 파일 예시
// orderId,customerId,amount,orderDate
// 1001,CUST001,50000,2026-03-27
// 1002,CUST002,30000,2026-03-27
 
@Bean
@StepScope
public FlatFileItemReader<Order> orderCsvReader(
        @Value("#{jobParameters['filePath']}") String filePath) {
 
    return new FlatFileItemReaderBuilder<Order>()
        .name("orderCsvReader")
        .resource(new FileSystemResource(filePath))
        .linesToSkip(1)         // 헤더 행 건너뜀
        .encoding("UTF-8")
        .strict(true)           // 파일 없으면 예외 발생 (false면 무시)
        .delimited()
            .delimiter(",")
            .names("orderId", "customerId", "amount", "orderDate")
        .targetType(Order.class)
        .build();
}

@StepScope를 선언하면 Step이 실행되는 시점에 Bean이 생성되므로 #{jobParameters['filePath']}와 같은 SpEL 표현식으로 JobParameters를 주입받을 수 있다.

고정 길이 파일 (FixedLengthTokenizer)

// 처리할 고정 길이 파일 예시
// 1001CUST00150000000000002026-03-27
// 필드: orderId(4), customerId(7), amount(11), orderDate(10)
 
@Bean
public FlatFileItemReader<Order> fixedLengthOrderReader() {
    return new FlatFileItemReaderBuilder<Order>()
        .name("fixedLengthOrderReader")
        .resource(new ClassPathResource("data/orders.dat"))
        .fixedLength()
            .columns(
                new Range(1, 4),   // orderId
                new Range(5, 11),  // customerId
                new Range(12, 22), // amount
                new Range(23, 32)  // orderDate
            )
            .names("orderId", "customerId", "amount", "orderDate")
        .targetType(Order.class)
        .build();
}

DefaultLineMapper + BeanWrapperFieldSetMapper (수동 조립)

타입 변환이나 커스텀 매핑이 필요할 때 직접 조립한다.

@Bean
public FlatFileItemReader<Order> customMappedReader() {
    DefaultLineMapper<Order> lineMapper = new DefaultLineMapper<>();
 
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setNames("orderId", "customerId", "amount", "orderDate");
    tokenizer.setDelimiter(",");
 
    BeanWrapperFieldSetMapper<Order> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
    fieldSetMapper.setTargetType(Order.class);
    // 날짜 타입 변환 커스터마이징
    fieldSetMapper.setConversionService(conversionService());
 
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);
 
    FlatFileItemReader<Order> reader = new FlatFileItemReader<>();
    reader.setName("customMappedReader");
    reader.setResource(new ClassPathResource("data/orders.csv"));
    reader.setLinesToSkip(1);
    reader.setEncoding("UTF-8");
    reader.setLineMapper(lineMapper);
    return reader;
}

JsonItemReader

// JSON 파일 예시
// [
//   {"orderId": 1001, "customerId": "CUST001", "amount": 50000},
//   {"orderId": 1002, "customerId": "CUST002", "amount": 30000}
// ]
 
@Bean
public JsonItemReader<Order> orderJsonReader() {
    return new JsonItemReaderBuilder<Order>()
        .name("orderJsonReader")
        .jsonObjectReader(new JacksonJsonObjectReader<>(Order.class))
        .resource(new ClassPathResource("data/orders.json"))
        .build();
}

MultiResourceItemReader

여러 CSV 파일을 순서대로 처리한다. 파일이 동적으로 생성되는 환경에서 유용하다.

@Bean
@StepScope
public MultiResourceItemReader<Order> multiFileOrderReader(
        @Value("#{jobParameters['filePattern']}") String filePattern) throws IOException {
 
    // 패턴에 맞는 모든 리소스 조회
    Resource[] resources = ResourcePatternUtils
        .getResourcePatternResolver(new DefaultResourceLoader())
        .getResources(filePattern);
 
    // 파일명 기준 정렬 (처리 순서 보장)
    Arrays.sort(resources, Comparator.comparing(r -> r.getFilename()));
 
    return new MultiResourceItemReaderBuilder<Order>()
        .name("multiFileOrderReader")
        .resources(resources)
        .delegate(orderCsvReader(null)) // 단일 파일 처리 Reader 위임
        .build();
}

JdbcCursorItemReader

DB 커서를 열어 한 행씩 읽는 방식이다. 단일 커넥션을 유지하면서 데이터를 스트리밍으로 가져온다.

@Bean
@StepScope
public JdbcCursorItemReader<Order> orderCursorReader(
        @Value("#{jobParameters['targetDate']}") LocalDate targetDate) {
 
    return new JdbcCursorItemReaderBuilder<Order>()
        .name("orderCursorReader")
        .dataSource(dataSource)
        .sql("""
            SELECT order_id, customer_id, amount, order_date, status
            FROM orders
            WHERE order_date = ?
              AND status = 'PENDING'
            ORDER BY order_id
            """)
        .preparedStatementSetter(ps -> ps.setDate(1, Date.valueOf(targetDate)))
        .rowMapper(new BeanPropertyRowMapper<>(Order.class))
        .fetchSize(1000)  // 한 번의 네트워크 왕복으로 가져올 행 수
        .build();
}

fetchSize 튜닝: DB 드라이버가 한 번에 서버에서 가져오는 행 수를 설정한다. 기본값이 작으면 행마다 네트워크 왕복이 발생해 성능이 크게 저하된다. 일반적으로 chunk size와 같거나 2~5배 정도로 설정한다.

단일 커넥션 장기 점유 주의: 커서 방식은 처리가 완료될 때까지 DB 커넥션을 점유한다. 처리 시간이 길거나 커넥션 풀이 작은 환경에서는 커넥션 고갈이 발생할 수 있다. 이 경우 JdbcPagingItemReader를 사용한다.


JdbcPagingItemReader

페이지 단위로 쿼리를 반복 실행하는 방식이다. 청크마다 커넥션을 반환하므로 커넥션 점유 문제가 없다.

SqlPagingQueryProviderFactoryBean 사용

@Bean
public JdbcPagingItemReader<Order> orderPagingReader() throws Exception {
 
    SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
    queryProvider.setDataSource(dataSource);
    queryProvider.setSelectClause("SELECT order_id, customer_id, amount, order_date, status");
    queryProvider.setFromClause("FROM orders");
    queryProvider.setWhereClause("WHERE status = 'PENDING'");
    queryProvider.setSortKeys(Map.of("order_id", Order.ASCENDING)); // 정렬 키 필수
 
    return new JdbcPagingItemReaderBuilder<Order>()
        .name("orderPagingReader")
        .dataSource(dataSource)
        .queryProvider(queryProvider.getObject())
        .parameterValues(Map.of("status", "PENDING"))
        .pageSize(500)
        .rowMapper(new BeanPropertyRowMapper<>(Order.class))
        .build();
}

sortKeys가 필수인 이유: 페이징 쿼리는 OFFSETLIMIT(또는 DB별 동등 문법)을 사용한다. 정렬 기준이 없으면 페이지가 넘어갈 때 이전 페이지 데이터가 다시 등장하거나 누락될 수 있다. 반드시 고유한 컬럼(보통 PK)을 정렬 기준으로 설정해야 한다.

MySqlPagingQueryProvider 직접 구현

@Bean
public JdbcPagingItemReader<Order> mysqlOrderReader() throws Exception {
 
    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("order_id, customer_id, amount, order_date");
    queryProvider.setFromClause("orders o INNER JOIN customers c ON o.customer_id = c.id");
    queryProvider.setWhereClause("o.status = :status AND o.order_date = :targetDate");
    queryProvider.setSortKeys(Map.of(
        "o.order_date", Order.ASCENDING,
        "o.order_id", Order.ASCENDING
    ));
 
    return new JdbcPagingItemReaderBuilder<Order>()
        .name("mysqlOrderReader")
        .dataSource(dataSource)
        .queryProvider(queryProvider)
        .parameterValues(Map.of(
            "status", "PENDING",
            "targetDate", LocalDate.of(2026, 3, 27)
        ))
        .pageSize(500)
        .rowMapper(new BeanPropertyRowMapper<>(Order.class))
        .build();
}

JpaCursorItemReader / JpaPagingItemReader

JPA를 사용하는 환경에서 엔티티를 직접 읽을 수 있다.

// JpaCursorItemReader (Spring Batch 4.3+ 추가)
@Bean
@StepScope
public JpaCursorItemReader<Order> jpaOrderCursorReader(
        EntityManagerFactory entityManagerFactory,
        @Value("#{jobParameters['targetDate']}") LocalDate targetDate) {
 
    return new JpaCursorItemReaderBuilder<Order>()
        .name("jpaOrderCursorReader")
        .entityManagerFactory(entityManagerFactory)
        .queryString("SELECT o FROM Order o WHERE o.orderDate = :targetDate AND o.status = 'PENDING' ORDER BY o.id")
        .parameterValues(Map.of("targetDate", targetDate))
        .build();
}
 
// JpaPagingItemReader
@Bean
@StepScope
public JpaPagingItemReader<Order> jpaOrderPagingReader(
        EntityManagerFactory entityManagerFactory) {
 
    return new JpaPagingItemReaderBuilder<Order>()
        .name("jpaOrderPagingReader")
        .entityManagerFactory(entityManagerFactory)
        .queryString("SELECT o FROM Order o WHERE o.status = 'PENDING' ORDER BY o.id")
        .pageSize(500)
        .build();
}

커서 vs 페이징 선택 기준

기준JdbcCursorItemReaderJdbcPagingItemReader
커넥션 점유 시간처리 완료까지 1개 점유청크마다 반환
Thread-safetyThread-safe 아님Thread-safe (병렬 가능)
대용량 안전성커넥션 타임아웃 위험상대적으로 안전
재시작 지원ExecutionContext로 커서 위치 복원페이지 번호 저장 후 복원
성능빠름 (단일 쿼리)약간 느림 (반복 쿼리)
적합한 상황단일 스레드, 빠른 처리멀티스레드, 안정성 우선

커스텀 ItemStreamReader 구현 — API 기반 Reader

외부 API를 호출해 데이터를 가져오는 Reader다. 페이지네이션을 지원하며, 재시작 시 마지막 처리한 페이지부터 재개한다.

@Component
@StepScope
public class ExternalApiOrderReader implements ItemStreamReader<Order> {
 
    private static final String CURRENT_PAGE_KEY = "external.api.current.page";
 
    private final ExternalOrderApiClient apiClient;
 
    private int currentPage = 0;
    private int totalPages = Integer.MAX_VALUE;
    private Queue<Order> itemBuffer = new LinkedList<>();
 
    public ExternalApiOrderReader(ExternalOrderApiClient apiClient) {
        this.apiClient = apiClient;
    }
 
    @Override
    public void open(ExecutionContext executionContext) {
        // 재시작 시 이전 페이지 번호 복원
        if (executionContext.containsKey(CURRENT_PAGE_KEY)) {
            currentPage = executionContext.getInt(CURRENT_PAGE_KEY);
            log.info("API Reader 재시작: 페이지 {}부터 재개", currentPage);
        }
    }
 
    @Override
    public void update(ExecutionContext executionContext) {
        // 청크 커밋마다 현재 페이지 번호 저장
        executionContext.putInt(CURRENT_PAGE_KEY, currentPage);
    }
 
    @Override
    public void close() {
        itemBuffer.clear();
    }
 
    @Override
    public Order read() throws Exception {
        // 버퍼에 아이템이 있으면 반환
        if (!itemBuffer.isEmpty()) {
            return itemBuffer.poll();
        }
 
        // 모든 페이지 처리 완료
        if (currentPage >= totalPages) {
            return null; // 입력 종료
        }
 
        // 다음 페이지 API 호출
        PageResponse<Order> response = apiClient.fetchOrders(currentPage, 100);
        totalPages = response.getTotalPages();
        itemBuffer.addAll(response.getContent());
        currentPage++;
 
        return itemBuffer.isEmpty() ? null : itemBuffer.poll();
    }
}
@Bean
public Step apiOrderStep() {
    return new StepBuilder("apiOrderStep", jobRepository)
        .<Order, OrderDto>chunk(100, transactionManager)
        .reader(externalApiOrderReader)
        .processor(orderProcessor())
        .writer(orderWriter())
        .build();
}

실무 팁: JdbcCursorItemReaderJdbcPagingItemReader는 용도가 명확히 다르다. 단일 스레드 배치에서 빠른 처리가 필요하다면 Cursor, 멀티스레드 병렬 처리나 커넥션 안정성이 중요하다면 Paging을 선택한다. JPA 환경이더라도 대용량 배치에서는 JpaPagingItemReader보다 JdbcPagingItemReader가 성능이 좋은 경우가 많다. JPA의 1차 캐시, 지연 로딩, 변경 감지 등이 배치 처리에는 오히려 오버헤드가 될 수 있기 때문이다.