Building Batch Job Processing Application with Spring Batch

Batch processing is an important part of an enterprise technology ecosystem/architecture. It is used for uploading bulk data to a system periodically via scheduled job runs for business transaction processing. Another example is the integration of data between an application and another system inside or external of the enterprise.

Spring Batch is a lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.

This blog will outline a few design practices when building a batch job processing application and illustrate how they are implemented using the Spring Batch framework APIs. The example used here is a typically extract/update application where it reads data from multiple input files and write them to a database to be used by another applications in the enterprise.

fileupload

Solution Design

I will cover the following design and implementation aspects:

  1. Input read and validation
  2. Parallel processing
  3. Job monitoring

1. Input read and validation

Spring Batch provides the ItemReader as a generic interface for reading data from different types of input. For csv files, we can use the class FlatFileItemReader as shown in the example  below:

private <T> FlatFileItemReader<T> readerForEntity(String readerName, InputStream is, String header, String[] cols, Class<T> targetType) {
	return new FlatFileItemReaderBuilder<T>()
		.skippedLinesCallback(line -> {
		   // Verify file header is what we expect
		   if (!StringUtils.equals(line, header)) {
			throw new IllegalArgumentException(String.format("Unexpected header: [%s]", line));
		   }
		 })
		.name(readerName)
		.resource(new InputStreamResource(is))
		.linesToSkip(1)
		.delimited().delimiter("|")
		.names(cols)
		.targetType(targetType)
		.build();
	}

The above method creates a FlatFileItemReader object for loading a particular csv file into DTO objects of type targetType.

Validate input file header

At a minimum, the application needs to verify that an input file is containing the data type it expects. I would recommend that all the files should have a header line with all the fields so that the application can check and validate the files. This can then be implemented using the skippedLinesCallback method in combination with the linesToSkip method as shown above.

Use S3 or alike for staging input files

Typically, an enterprise uses a local or remote file server as staging area for storing the input csv files. Spring Batch provides support out of the box for this use case. I would however recommend an application to use a cloud storage solution such as AWS S3. This will allow it to leverage the availability and durability provided by cloud storage solution with no/minimum extra complexity or cost. To do this with Spring Batch, we need to implement our own InputStream for S3:

public class S3InputStream extends InputStream {

     private static S3Client s3;
     private final String bucket;
     private final String file;
     private final byte[] buffer;
     private long lastByteOffset;

     private long offset = 0;
     private int next = 0;
     private int length = 0;

     static {
            Region region = Region.AP_SOUTHEAST_2;
            s3 = S3Client.builder().region(region).build();
     }

     public S3InputStream(final String bucket, final String file, final byte[] buffer) {
          this.bucket = bucket;
          this.file = file;
          this.buffer = buffer;

          ResponseInputStream<GetObjectResponse> object = s3.getObject(GetObjectRequest.builder().bucket(bucket).key(file).build());
          this.lastByteOffset = object.response().contentLength() - 1;
     }

     @Override
     public int read() throws IOException {
          if (next >= length) {
               fill();
               if (length <= 0) {
                    return -1;
               }
               next = 0;
          }
          if (next >= length) {
               return -1;
          }
          return buffer[this.next++];
     }

     public void fill() throws IOException {
          if (offset >= lastByteOffset) {
          length = -1;
          } else {
               try (final InputStream inputStream = s3Object()) {
                    length = 0;
                    int b;
                    while ((b = inputStream.read()) != -1) {
                         buffer[length++] = (byte) b;
                    }
                    if (length > 0) {
                         offset += length;
                    }
                }
          }
     }

     private InputStream s3Object() {
          String len = Long.toString(offset + buffer.length - 1);
          String range = "bytes=" + offset + "-" + len;
          GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(file).range(range).build();
logger.info("Getting content from S3...");
          return s3.getObject(request);
     }
}

2. Parallel Processing

Spring Batch provides support for different types/levels of parallel processing of input data through its APIs. This should be considered and leveraged when designing a batch processing solution to produce the performance and efficiency required.

Multiple jobs

An enterprise typically has to upload different files at different intervals for processing. For example, a daily job to load files for transaction processing and a monthly job for invoicing. We can implement this with Spring Batch by declaring the job beans as below:

@Bean("dailyJob")
public Job dailyJob() {
	return jobBuilderFactory.get("dailyJob")
		.incrementer(new RunIdIncrementer())
		.start(...)
		.end()
		.build();
}

@Bean("monthlyJob")
public Job monthlyJob() {
       return jobBuilderFactory.get("monthJob")
               .incrementer(new RunIdIncrementer())
               .start(...)
               .end()
               .build();
}

Then we can implement individual scheduler to run each job separately:

@Autowired
@Qualifier("dailyJob")
private Job dailyJob;

@Scheduled(cron = "0 0 23 * * ?")
public void launchDailyJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobInstanceAlreadyExistsException, NoSuchJobException {
	jobLauncher.run(dailyJob,
		new JobParametersBuilder()
		.addLong("launchTime", System.currentTimeMillis()) // include this line to rerun job with same parameters
		// skip some lines here
		.toJobParameters());
}
// ... similarly for monthlyJob

Split Flow

If the batch job application is going to import files into multiple, independent database tables, we should consider using the split flow feature provided by Spring Batch to load the files in steps running in parallel. The example below runs two flows (flowOne and flowTwo) in parallel using a task executor.

@Bean
public Flow flowOne() {
	return new FlowBuilder<SimpleFlow>("flow 1").start(flowOneStep).build();
}

@Bean
public Flow flowTwo() {
	return new FlowBuilder<SimpleFlow>("flow 2").start(flowTwoStep).build();
}

@Bean("spring_batch")
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Flow splitFlow() {
	return new FlowBuilder<SimpleFlow>("split-flow")
			.split(taskExecutor())
			.add(flowOne(), flowTwo())
			.build();
	}

3. Job Monitoring

Monitoring the progress and completion status of the batch jobs are important. Spring Batch APIs support this by providing listeners to various components of the job configurations.

JobExecutionListener

This interface provides methods to inject codes before and after a job is executed. For example:

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
	private Logger logger = LoggerFactory.getLogger(getClass());
        @Override
	public void afterJob(JobExecution jobExecution) {
		logger.info("Job {} finished. Start time = {}, End time = {}", jobExecution.getJobInstance().getJobName(), jobExecution.getStartTime(), jobExecution.getEndTime());
		logger.info("Job finished with status: {}", jobExecution.getStatus().toString());
		// ... do something else
	}
}

StepExecutionListener

Similar to jobExecutionListener above but for step executions:

@Component
public class StepExecutionListener extends StepExecutionListenerSupport {
	private Logger logger = LoggerFactory.getLogger(getClass());
	@Override
	public ExitStatus afterStep(StepExecution stepExecution) {
		logger.info("After step: {} [{}], status: {}", stepExecution.getStepName(), stepExecution.getId(), stepExecution.getStatus());
		return stepExecution.getExitStatus();
	}
}

ItemReadListener and ItemWriteListener

These two interfaces provide methods around reading and writing of an item. For example, the implementation below captures events when a read or write fails:

public class GenericEntityItemListener<T> extends ItemListenerSupport<T, T> {
private Logger logger = LoggerFactory.getLogger(getClass());
	@Override
	public void onReadError(Exception ex) {
		logger.info("onReadError: {}", ex);
	}

	@Override
	public void onWriteError(Exception exception, List<? extends T> items) {
		logger.info("onWriteError: {}", exception.getMessage());
		items.forEach(p -> {
			logger.info("Item {}", p);
		});
	}
}

The above listeners should be used as part of a batch job processing solution to actively monitoring the batch jobs progress and status. For example, to trigger an email alert when a job fails to execute or when a read/write error occurs during processing an input file. It would also be useful to log the events generated by the listeners for further analysis on job load and performance of the application.

Conclusions

This blog posts outline a few design and implementation aspects to consider when building a batch job processing application using Spring Batch. It is by no means comprehensive and hopefully enough to get you started. More information can be found in the Reference Doc.