JBatch: a basic chunking sample


Chunking is the minimum you can expect from a batch framework. The idea is simply to do the work for a list of items and commit it for this list then continue on next list etc…You can see it as something more general than the commit interval (the interval can be a number of items, a timeout, etc…).

So how to use chunking with JBatch?

First the chunking is composed of three entities: a reader, a processor and a writer. The reader gets the data one by one. The processor is optional but if present each data is sent to the processor which can change it. Finally the writer takes a list of processed items (a chunk) and write it somewhere.

How to do so with JBatch?

First the reader needs to implement ItemReader. Here is a simple implementation reading a file line by line:

public class FlatFileItemReader implements ItemReader {
    @Inject
    @BatchProperty
    private String input;

    private BufferedReader reader = null;

    @Override
    public void open(final Serializable checkpoint) throws Exception {
        if (input == null) {
            throw new RuntimeException("Can't find any input");
        }
        final File file = new File(input);
        if (!file.exists()) {
            throw new RuntimeException("'" + input + "' doesn't exist");
        }
        reader = new BufferedReader(new FileReader(file));
    }

    @Override
    public void close() throws Exception {
        if (reader != null) {
            reader.close();
        }
    }

    @Override
    public Object readItem() throws Exception {
        return reader.readLine(); // returns null when stream is read
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        // no-op
    }
}

Here the reader is obvious and a more complete implementation would handle comment lines and maybe a mapper to convert the line to an object. Here we’ll use the processor to do so (we suppose we have a class Person and a person has a name and an age):

public class StringToPersonProcessor implements ItemProcessor {
    @Override // the input is a string containing <name>,<age>
    public Object processItem(final Object item) throws Exception {
        final String[] line = String.class.cast(item).split(",");
        return new Person(line[0], Integer.parseInt(line[1]));
    }
}

Finally we want to log all people. To do so we’ll simply write an ItemWriter using JUL as logging API:

public class LogWriter implements ItemWriter {
    private static final Logger LOGGER = Logger.getLogger(LogWriter.class.getName());

    @Override
    public void open(Serializable checkpoint) throws Exception {
        // no-op
    }

    @Override
    public void close() throws Exception {
        // no-op
    }

    @Override
    public void writeItems(final List<Object> items) throws Exception {
        // level and logger name could be configurable
        for (final Object o : items) {
            LOGGER.info(o.toString());
        }
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return null; // doesn't handle restart
    }
}

Once we have our implementation we need to put it together in a job xml file. It has to be under META-INF/batch-jobs. We’ll call it demo.xml:

<job id="demo" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
  <step id="step1">
    <chunk>
      <reader ref="sample.FlatFileItemReader">
        <properties>
          <property name="input" value="#{jobParameters['input']}"/>
        </properties>
      </reader>
      <processor ref="sample.StringToPersonProcessor" />
      <writer ref="sample.LogWriter" />
    </chunk>
  </step>
</job>

Note: see how we use jobParameters to get the name of the file to process dynamically.

Then to call it just use JobOperator API:

final Properties jobParams = new Properties();
jobParams.setProperty("input", "/input/jbatch/demo/people.txt");

final JobOperator jobOperator = BatchRuntime.getJobOperator();
long id = jobOperator.start("demo", jobParams); // first param is the name of the xml file without the extension

Note: start is not synchronous, to wait the end of the job you can use this kind of code:

public static void waitForEnd(final JobOperator jobOperator, final long id) {
    final Collection<BatchStatus> endStatuses = Arrays.asList(BatchStatus.COMPLETED, BatchStatus.FAILED);
    do {
        try {
            Thread.sleep(100);
        } catch (final InterruptedException e) {
            return;
        }
    } while (!endStatuses.contains(jobOperator.getJobExecution(id).getBatchStatus()));
}

// ...
waitForEnd(jobOperator, id); // note: jobOperator param is normally optional but some implementation can need it
Advertisements

2 thoughts on “JBatch: a basic chunking sample

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s