Chunking is the minimum you can expect from a batch framework. The idea is simply to do the work for a list of items and commit it for this list then continue on next list etc…You can see it as something more general than the commit interval (the interval can be a number of items, a timeout, etc…).
So how to use chunking with JBatch?
First the chunking is composed of three entities: a reader, a processor and a writer. The reader gets the data one by one. The processor is optional but if present each data is sent to the processor which can change it. Finally the writer takes a list of processed items (a chunk) and write it somewhere.
How to do so with JBatch?
First the reader needs to implement ItemReader. Here is a simple implementation reading a file line by line:
public class FlatFileItemReader implements ItemReader { @Inject @BatchProperty private String input; private BufferedReader reader = null; @Override public void open(final Serializable checkpoint) throws Exception { if (input == null) { throw new RuntimeException("Can't find any input"); } final File file = new File(input); if (!file.exists()) { throw new RuntimeException("'" + input + "' doesn't exist"); } reader = new BufferedReader(new FileReader(file)); } @Override public void close() throws Exception { if (reader != null) { reader.close(); } } @Override public Object readItem() throws Exception { return reader.readLine(); // returns null when stream is read } @Override public Serializable checkpointInfo() throws Exception { // no-op } }
Here the reader is obvious and a more complete implementation would handle comment lines and maybe a mapper to convert the line to an object. Here we’ll use the processor to do so (we suppose we have a class Person and a person has a name and an age):
public class StringToPersonProcessor implements ItemProcessor { @Override // the input is a string containing <name>,<age> public Object processItem(final Object item) throws Exception { final String[] line = String.class.cast(item).split(","); return new Person(line[0], Integer.parseInt(line[1])); } }
Finally we want to log all people. To do so we’ll simply write an ItemWriter using JUL as logging API:
public class LogWriter implements ItemWriter { private static final Logger LOGGER = Logger.getLogger(LogWriter.class.getName()); @Override public void open(Serializable checkpoint) throws Exception { // no-op } @Override public void close() throws Exception { // no-op } @Override public void writeItems(final List<Object> items) throws Exception { // level and logger name could be configurable for (final Object o : items) { LOGGER.info(o.toString()); } } @Override public Serializable checkpointInfo() throws Exception { return null; // doesn't handle restart } }
Once we have our implementation we need to put it together in a job xml file. It has to be under META-INF/batch-jobs. We’ll call it demo.xml:
<job id="demo" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="step1"> <chunk> <reader ref="sample.FlatFileItemReader"> <properties> <property name="input" value="#{jobParameters['input']}"/> </properties> </reader> <processor ref="sample.StringToPersonProcessor" /> <writer ref="sample.LogWriter" /> </chunk> </step> </job>
Note: see how we use jobParameters to get the name of the file to process dynamically.
Then to call it just use JobOperator API:
final Properties jobParams = new Properties(); jobParams.setProperty("input", "/input/jbatch/demo/people.txt"); final JobOperator jobOperator = BatchRuntime.getJobOperator(); long id = jobOperator.start("demo", jobParams); // first param is the name of the xml file without the extension
Note: start is not synchronous, to wait the end of the job you can use this kind of code:
public static void waitForEnd(final JobOperator jobOperator, final long id) { final Collection<BatchStatus> endStatuses = Arrays.asList(BatchStatus.COMPLETED, BatchStatus.FAILED); do { try { Thread.sleep(100); } catch (final InterruptedException e) { return; } } while (!endStatuses.contains(jobOperator.getJobExecution(id).getBatchStatus())); } // ... waitForEnd(jobOperator, id); // note: jobOperator param is normally optional but some implementation can need it
Where can I download the example’s code?
I didn’t keep them but most of what is done here can be seen in Apache BatchEE tests: https://git-wip-us.apache.org/repos/asf?p=incubator-batchee.git;a=tree;f=extensions;h=625a224778a920dc05077c5db0cd5f737f012e54;hb=a0ac7912dee64563df7aa96e14e924f2d6fb3b2a