Batch Processing with Spring Boot
Learn how to implement batch processing jobs in Spring Boot applications using Spring Batch
Batch Processing with Spring Boot
This guide covers implementing batch processing in Spring Boot applications using Spring Batch framework.
Video Tutorial
Learn more about Spring Boot batch processing in this comprehensive video tutorial:
Prerequisites
<dependencies>
<!-- Spring Batch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- Database -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- For CSV Processing -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
Basic Batch Job
Configuration
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job importUserJob(Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(new JobCompletionNotificationListener())
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(ItemReader<User> reader,
ItemProcessor<User, User> processor,
ItemWriter<User> writer) {
return stepBuilderFactory.get("step1")
.<User, User>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}
Item Reader
@Component
public class CsvItemReader {
@Bean
public FlatFileItemReader<User> reader() {
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users.csv"))
.delimited()
.names("firstName", "lastName", "email")
.fieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{
setTargetType(User.class);
}})
.build();
}
}
Item Processor
@Component
public class UserItemProcessor implements ItemProcessor<User, User> {
@Override
public User process(User user) throws Exception {
// Transform the user data
String firstName = user.getFirstName().toUpperCase();
String lastName = user.getLastName().toUpperCase();
User transformedUser = new User(firstName, lastName, user.getEmail());
log.info("Converting ({}) into ({})", user, transformedUser);
return transformedUser;
}
}
Item Writer
@Component
public class DatabaseItemWriter implements ItemWriter<User> {
private final UserRepository userRepository;
@Override
public void write(List<? extends User> users) throws Exception {
log.info("Saving users: {}", users);
userRepository.saveAll(users);
}
}
Advanced Batch Processing
Parallel Processing
@Configuration
public class ParallelBatchConfig {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
@Bean
public Step parallelStep(ItemReader<User> reader,
ItemProcessor<User, User> processor,
ItemWriter<User> writer,
TaskExecutor taskExecutor) {
return stepBuilderFactory.get("parallelStep")
.<User, User>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.taskExecutor(taskExecutor)
.build();
}
}
Multi-Step Job
@Configuration
public class MultiStepJobConfig {
@Bean
public Job multiStepJob(Step validateStep,
Step processStep,
Step reportStep) {
return jobBuilderFactory.get("multiStepJob")
.incrementer(new RunIdIncrementer())
.start(validateStep)
.next(processStep)
.next(reportStep)
.build();
}
@Bean
public Step validateStep() {
return stepBuilderFactory.get("validateStep")
.tasklet((contribution, chunkContext) -> {
// Validation logic
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step processStep(ItemReader<User> reader,
ItemProcessor<User, User> processor,
ItemWriter<User> writer) {
return stepBuilderFactory.get("processStep")
.<User, User>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Step reportStep() {
return stepBuilderFactory.get("reportStep")
.tasklet((contribution, chunkContext) -> {
// Generate report
return RepeatStatus.FINISHED;
})
.build();
}
}
Error Handling
Skip Policy
@Component
public class CustomSkipPolicy implements SkipPolicy {
@Override
public boolean shouldSkip(Throwable exception, int skipCount) {
if (exception instanceof DataIntegrityViolationException) {
return skipCount < 10;
}
return false;
}
}
@Configuration
public class ErrorHandlingConfig {
@Bean
public Step stepWithErrorHandling(ItemReader<User> reader,
ItemProcessor<User, User> processor,
ItemWriter<User> writer,
SkipPolicy skipPolicy) {
return stepBuilderFactory.get("stepWithErrorHandling")
.<User, User>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipPolicy(skipPolicy)
.listener(new SkipListener())
.build();
}
}
Retry Policy
@Configuration
public class RetryConfig {
@Bean
public Step stepWithRetry(ItemReader<User> reader,
ItemProcessor<User, User> processor,
ItemWriter<User> writer) {
return stepBuilderFactory.get("stepWithRetry")
.<User, User>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.retry(RemoteAccessException.class)
.retryLimit(3)
.listener(new RetryListener())
.build();
}
}
Job Monitoring
Job Listener
@Component
public class JobMonitoringListener implements JobExecutionListener {
private final JobRepository jobRepository;
private final MeterRegistry meterRegistry;
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("Job started: {}", jobExecution.getJobInstance().getJobName());
meterRegistry.counter("batch.job.starts").increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
log.info("Job finished: {} with status: {}",
jobExecution.getJobInstance().getJobName(),
jobExecution.getStatus());
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
meterRegistry.counter("batch.job.completions").increment();
} else {
meterRegistry.counter("batch.job.failures").increment();
}
// Record metrics
recordJobMetrics(jobExecution);
}
private void recordJobMetrics(JobExecution jobExecution) {
Timer.builder("batch.job.duration")
.tag("job", jobExecution.getJobInstance().getJobName())
.tag("status", jobExecution.getStatus().toString())
.register(meterRegistry)
.record(jobExecution.getEndTime().getTime() -
jobExecution.getStartTime().getTime(),
TimeUnit.MILLISECONDS);
}
}
Job Scheduling
Scheduled Job Launcher
@Component
public class ScheduledJobLauncher {
private final JobLauncher jobLauncher;
private final Job job;
@Scheduled(cron = "0 0 1 * * ?") // Run at 1 AM every day
public void runJob() throws Exception {
JobParameters params = new JobParametersBuilder()
.addDate("date", new Date())
.toJobParameters();
try {
JobExecution execution = jobLauncher.run(job, params);
log.info("Job finished with status: {}", execution.getStatus());
} catch (Exception e) {
log.error("Job failed", e);
throw e;
}
}
}
Best Practices
-
Job Design
- Use meaningful job names
- Implement proper error handling
- Configure appropriate chunk sizes
- Use step transitions
-
Performance
- Use parallel processing
- Optimize database operations
- Configure proper batch sizes
- Monitor resource usage
-
Error Handling
- Implement skip policies
- Use retry mechanisms
- Log failed items
- Handle restarts
-
Monitoring
- Track job execution
- Monitor step completion
- Record processing times
- Alert on failures
Common Patterns
- Extract-Transform-Load (ETL)
@Configuration
public class EtlJobConfig {
@Bean
public Job etlJob(Step extractStep,
Step transformStep,
Step loadStep) {
return jobBuilderFactory.get("etlJob")
.start(extractStep)
.next(transformStep)
.next(loadStep)
.build();
}
@Bean
public Step extractStep() {
return stepBuilderFactory.get("extract")
.<SourceData, RawData>chunk(100)
.reader(sourceReader())
.processor(extractProcessor())
.writer(rawDataWriter())
.build();
}
@Bean
public Step transformStep() {
return stepBuilderFactory.get("transform")
.<RawData, TransformedData>chunk(100)
.reader(rawDataReader())
.processor(transformProcessor())
.writer(transformedDataWriter())
.build();
}
@Bean
public Step loadStep() {
return stepBuilderFactory.get("load")
.<TransformedData, TargetData>chunk(100)
.reader(transformedDataReader())
.processor(loadProcessor())
.writer(targetWriter())
.build();
}
}
- Data Migration
@Configuration
public class DataMigrationConfig {
@Bean
public Job migrationJob(Step migrationStep) {
return jobBuilderFactory.get("migrationJob")
.incrementer(new RunIdIncrementer())
.start(migrationStep)
.build();
}
@Bean
public Step migrationStep(
@Qualifier("sourceReader") ItemReader<SourceData> reader,
@Qualifier("migrationProcessor") ItemProcessor<SourceData, TargetData> processor,
@Qualifier("targetWriter") ItemWriter<TargetData> writer) {
return stepBuilderFactory.get("migrationStep")
.<SourceData, TargetData>chunk(100)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}
Conclusion
Effective batch processing with Spring Boot requires:
- Proper job configuration
- Error handling
- Performance optimization
- Monitoring and metrics
- Following best practices
For more Spring Boot topics, check out: