Java 8 streams and JTA chunking?

Java 8 streams are very expressive – even if RxJava is even more 😉 – but sometimes EE integration is not that advanced. In case of a batch import of data it would be handy to use Stream API since we iterate over data to persist them but how to ensure we can use chunking – ie get a commit interval to not commit for each record?

The solution needs a small trick but is actually quite nice.

Build our import stream

For the purpose of this blog post we’ll simplify a real world flow/import and modelise it as:

  .flatMap(block -> block.getRecords().stream())
  .map(xmlRecord -> convertToJpa(xmlRecord));

This part is pretty abstracted since it is not the main topic of the post but what I want to show is:

  • you can stream the input (it is actually quite common for files so you just need to wire it to java Stream API)
  • you can use the expressiveness of Stream api to map/flatMap/filter records easily which makes Stream a nice choice for slow or simple batch not requiring a batch runtime

Stream and JTA

In previous part we mainly created our business logic but we miss the sink/leaf of the stream so our code does actually nothing. We can easily add a foreach persisting our records:

  .flatMap(block -> block.getRecords().stream())
  .map(xmlRecord -> convertToJpa(xmlRecord))

The dao would just do an EntityManager#persist in a transaction.

This code works but you use one transaction per record which can be quite inefficient.

To convert it to chunks (ie group persist calls) we’ll just convert our save call to a runnable:

  .flatMap(block -> block.getRecords().stream())
  .map(xmlRecord -> convertToJpa(xmlRecord))
  .map(r -> (Runnable) () ->;

With this code we are streaming runnables which are persisting records.

Now we’ll assume we have a TransactionProvider giving us the ability to execute some Runnables in a transaction. In EE 7 it can be as easy as:

public class TransactionProvider {
  public void execute(Collection<Runnable> tasks) {

So we just need to group the runnable with our chunking size/commit interval (let say 10 for this post) and call all individual persist runnable in the provider execute method:

Collection<Runnable> trail = streamRecordsFromFile()
  .flatMap(block -> block.getRecords().stream())
  .map(xmlRecord -> convertToJpa(xmlRecord))
  // chunking handling starts there
  .map(r -> (Runnable) () ->
    (list, runnable) -> {
        if ((list.size() % 10) == 0) {

if (!trail.isEmpty()) {
  • we use collect() leaf of the stream to group the tasks
  • the stream being browsed by element we can check the grouping size in collect aggregator and call the chunk (execute invocation) there
  • it can happen we have a number of task not divisible by the commit interval so we need to execute the remaining tasks after the end of the stream (trail handling)

Note: using a higher level API like RxJava allows to get something even more fluent but it adds a dependency which is quite big for such a simple need.

Oops it failed: don’t forget the error handling

This code is a very simplified version. For instance if a transaction fails it will stop the processing. That’s why the transaction provider will most of the time be wrapped in a method returning a boolean representing the transaction status (committed or rolled-back). This allows to continue the processing even if one chunk fails and get a very fine reporting at the end.

More about that in next post!


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s