Batch Processing with Spring Boot

Learn how to implement batch processing jobs in Spring Boot applications using Spring Batch

Batch Processing with Spring Boot

This guide covers implementing batch processing in Spring Boot applications using Spring Batch framework.

Video Tutorial

Learn more about Spring Boot batch processing in this comprehensive video tutorial:

Prerequisites

<dependencies>
    <!-- Spring Batch -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    
    <!-- Database -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    
    <!-- For CSV Processing -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-csv</artifactId>
        <version>1.9.0</version>
    </dependency>
</dependencies>

Basic Batch Job

Configuration

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public Job importUserJob(Step step1) {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(new JobCompletionNotificationListener())
                .flow(step1)
                .end()
                .build();
    }
    
    @Bean
    public Step step1(ItemReader<User> reader,
                     ItemProcessor<User, User> processor,
                     ItemWriter<User> writer) {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}

Item Reader

@Component
public class CsvItemReader {
    
    @Bean
    public FlatFileItemReader<User> reader() {
        return new FlatFileItemReaderBuilder<User>()
                .name("userItemReader")
                .resource(new ClassPathResource("users.csv"))
                .delimited()
                .names("firstName", "lastName", "email")
                .fieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{
                    setTargetType(User.class);
                }})
                .build();
    }
}

Item Processor

@Component
public class UserItemProcessor implements ItemProcessor<User, User> {
    
    @Override
    public User process(User user) throws Exception {
        // Transform the user data
        String firstName = user.getFirstName().toUpperCase();
        String lastName = user.getLastName().toUpperCase();
        
        User transformedUser = new User(firstName, lastName, user.getEmail());
        
        log.info("Converting ({}) into ({})", user, transformedUser);
        
        return transformedUser;
    }
}

Item Writer

@Component
public class DatabaseItemWriter implements ItemWriter<User> {
    
    private final UserRepository userRepository;
    
    @Override
    public void write(List<? extends User> users) throws Exception {
        log.info("Saving users: {}", users);
        userRepository.saveAll(users);
    }
}

Advanced Batch Processing

Parallel Processing

@Configuration
public class ParallelBatchConfig {
    
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        return executor;
    }
    
    @Bean
    public Step parallelStep(ItemReader<User> reader,
                           ItemProcessor<User, User> processor,
                           ItemWriter<User> writer,
                           TaskExecutor taskExecutor) {
        return stepBuilderFactory.get("parallelStep")
                .<User, User>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .taskExecutor(taskExecutor)
                .build();
    }
}

Multi-Step Job

@Configuration
public class MultiStepJobConfig {
    
    @Bean
    public Job multiStepJob(Step validateStep,
                           Step processStep,
                           Step reportStep) {
        return jobBuilderFactory.get("multiStepJob")
                .incrementer(new RunIdIncrementer())
                .start(validateStep)
                .next(processStep)
                .next(reportStep)
                .build();
    }
    
    @Bean
    public Step validateStep() {
        return stepBuilderFactory.get("validateStep")
                .tasklet((contribution, chunkContext) -> {
                    // Validation logic
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
    
    @Bean
    public Step processStep(ItemReader<User> reader,
                          ItemProcessor<User, User> processor,
                          ItemWriter<User> writer) {
        return stepBuilderFactory.get("processStep")
                .<User, User>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
    
    @Bean
    public Step reportStep() {
        return stepBuilderFactory.get("reportStep")
                .tasklet((contribution, chunkContext) -> {
                    // Generate report
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}

Error Handling

Skip Policy

@Component
public class CustomSkipPolicy implements SkipPolicy {
    
    @Override
    public boolean shouldSkip(Throwable exception, int skipCount) {
        if (exception instanceof DataIntegrityViolationException) {
            return skipCount < 10;
        }
        return false;
    }
}

@Configuration
public class ErrorHandlingConfig {
    
    @Bean
    public Step stepWithErrorHandling(ItemReader<User> reader,
                                    ItemProcessor<User, User> processor,
                                    ItemWriter<User> writer,
                                    SkipPolicy skipPolicy) {
        return stepBuilderFactory.get("stepWithErrorHandling")
                .<User, User>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .faultTolerant()
                .skipPolicy(skipPolicy)
                .listener(new SkipListener())
                .build();
    }
}

Retry Policy

@Configuration
public class RetryConfig {
    
    @Bean
    public Step stepWithRetry(ItemReader<User> reader,
                            ItemProcessor<User, User> processor,
                            ItemWriter<User> writer) {
        return stepBuilderFactory.get("stepWithRetry")
                .<User, User>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .faultTolerant()
                .retry(RemoteAccessException.class)
                .retryLimit(3)
                .listener(new RetryListener())
                .build();
    }
}

Job Monitoring

Job Listener

@Component
public class JobMonitoringListener implements JobExecutionListener {
    
    private final JobRepository jobRepository;
    private final MeterRegistry meterRegistry;
    
    @Override
    public void beforeJob(JobExecution jobExecution) {
        log.info("Job started: {}", jobExecution.getJobInstance().getJobName());
        meterRegistry.counter("batch.job.starts").increment();
    }
    
    @Override
    public void afterJob(JobExecution jobExecution) {
        log.info("Job finished: {} with status: {}",
                jobExecution.getJobInstance().getJobName(),
                jobExecution.getStatus());
        
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            meterRegistry.counter("batch.job.completions").increment();
        } else {
            meterRegistry.counter("batch.job.failures").increment();
        }
        
        // Record metrics
        recordJobMetrics(jobExecution);
    }
    
    private void recordJobMetrics(JobExecution jobExecution) {
        Timer.builder("batch.job.duration")
                .tag("job", jobExecution.getJobInstance().getJobName())
                .tag("status", jobExecution.getStatus().toString())
                .register(meterRegistry)
                .record(jobExecution.getEndTime().getTime() - 
                        jobExecution.getStartTime().getTime(),
                        TimeUnit.MILLISECONDS);
    }
}

Job Scheduling

Scheduled Job Launcher

@Component
public class ScheduledJobLauncher {
    
    private final JobLauncher jobLauncher;
    private final Job job;
    
    @Scheduled(cron = "0 0 1 * * ?") // Run at 1 AM every day
    public void runJob() throws Exception {
        JobParameters params = new JobParametersBuilder()
                .addDate("date", new Date())
                .toJobParameters();
        
        try {
            JobExecution execution = jobLauncher.run(job, params);
            log.info("Job finished with status: {}", execution.getStatus());
        } catch (Exception e) {
            log.error("Job failed", e);
            throw e;
        }
    }
}

Best Practices

  1. Job Design

    • Use meaningful job names
    • Implement proper error handling
    • Configure appropriate chunk sizes
    • Use step transitions
  2. Performance

    • Use parallel processing
    • Optimize database operations
    • Configure proper batch sizes
    • Monitor resource usage
  3. Error Handling

    • Implement skip policies
    • Use retry mechanisms
    • Log failed items
    • Handle restarts
  4. Monitoring

    • Track job execution
    • Monitor step completion
    • Record processing times
    • Alert on failures

Common Patterns

  1. Extract-Transform-Load (ETL)
@Configuration
public class EtlJobConfig {
    
    @Bean
    public Job etlJob(Step extractStep,
                     Step transformStep,
                     Step loadStep) {
        return jobBuilderFactory.get("etlJob")
                .start(extractStep)
                .next(transformStep)
                .next(loadStep)
                .build();
    }
    
    @Bean
    public Step extractStep() {
        return stepBuilderFactory.get("extract")
                .<SourceData, RawData>chunk(100)
                .reader(sourceReader())
                .processor(extractProcessor())
                .writer(rawDataWriter())
                .build();
    }
    
    @Bean
    public Step transformStep() {
        return stepBuilderFactory.get("transform")
                .<RawData, TransformedData>chunk(100)
                .reader(rawDataReader())
                .processor(transformProcessor())
                .writer(transformedDataWriter())
                .build();
    }
    
    @Bean
    public Step loadStep() {
        return stepBuilderFactory.get("load")
                .<TransformedData, TargetData>chunk(100)
                .reader(transformedDataReader())
                .processor(loadProcessor())
                .writer(targetWriter())
                .build();
    }
}
  1. Data Migration
@Configuration
public class DataMigrationConfig {
    
    @Bean
    public Job migrationJob(Step migrationStep) {
        return jobBuilderFactory.get("migrationJob")
                .incrementer(new RunIdIncrementer())
                .start(migrationStep)
                .build();
    }
    
    @Bean
    public Step migrationStep(
            @Qualifier("sourceReader") ItemReader<SourceData> reader,
            @Qualifier("migrationProcessor") ItemProcessor<SourceData, TargetData> processor,
            @Qualifier("targetWriter") ItemWriter<TargetData> writer) {
        return stepBuilderFactory.get("migrationStep")
                .<SourceData, TargetData>chunk(100)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}

Conclusion

Effective batch processing with Spring Boot requires:

  • Proper job configuration
  • Error handling
  • Performance optimization
  • Monitoring and metrics
  • Following best practices

For more Spring Boot topics, check out: