JBatch: read from a database using JDBC


A common need when writing batches is to interact with a database. The two main ways to do it is to either use JDBC or JPA.

In this post we’ll see how to use JDBC to get items from a database.

We’ll simply read items from a database in a simple chunk step. So we basically needs to write an ItemReader.

The reader itself

To read from a database what do we need?

  • a JDBC connection. It can be obtained from a [driver, url, user, password] n-fold or from a container DataSource (either injected or looked up by its jndi name).
  • a sql query to find records
  • a mapper converting records to objects (there are alternative but this one is pretty simple and efficient)

Once we have these information that’s pretty simple to execute the query and for each record returning the result of the ResultSet conversion.

However take care to two main things here:

  • the query needs to handle the pagination if you don’t want to get memory issues
  • the ItemReader needs to cache converted records to avoid to requery the database for each items which leads to performances issues pretty quickly

To convert the ResultSet to an Object we suppose we have the following interface:

public interface RecordMapper {
    Object map(ResultSet resultSet) throws SQLException;
}

Here is a possible implementation:

import javax.batch.api.BatchProperty;
import javax.batch.api.chunk.ItemReader;
import javax.inject.Inject;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.LinkedList;

import javax.inject.Named;

@Named
public class JdbcReader implements ItemReader {
    @Inject
    @BatchProperty
    private String jndi;

    @Inject
    @BatchProperty
    protected String driver;

    @Inject
    @BatchProperty
    protected String url;

    @Inject
    @BatchProperty
    protected String user;

    @Inject
    @BatchProperty
    protected String password;

    @Inject
    @BatchProperty(name = "mapper")
    private String mapperStr;

    @Inject
    @BatchProperty
    private String query;

    private LinkedList<Object> items;
    private RecordMapper mapper;

    protected Connection connection() throws Exception {
        if (jndi != null) {
            return DataSource.class.cast(new InitialContext().lookup(jndi)).getConnection();
        }

        try {
            Class.forName(driver);
        } catch (final ClassNotFoundException e) {
            throw new BatchRuntimeException(e);
        }
        return DriverManager.getConnection(url, user, password);
    }

    @Override
    public void open(final Serializable checkpoint) throws Exception {
        mapper = RecordMapper.class.cast(Thread.currentThread().getContextClassLoader().newInstance(mapperStr));
        items = new LinkedList<Object>();
    }

    @Override
    public void close() throws Exception {
        // no-op
    }

    @Override
    public Object readItem() throws Exception {
        if (items.isEmpty()) {
            final Connection conn = connection();
            final PreparedStatement preparedStatement = conn.prepareStatement(query, ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
            ResultSet resultSet = null;
            try {
                resultSet = preparedStatement.executeQuery();
                while (resultSet.next()) {
                    items.add(mapper.map(resultSet));
                }
                if (items.isEmpty()) {
                    return null;
                }
            } finally {
                if (resultSet != null) {
                    resultSet.close();
                }
                preparedStatement.close();
            }
        }
        return items.pop();
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return null; // not needed if tha datasource is transactional and the common case will be to re-query the database, state here is the database itself
    }
}

The configuration of the JdbcReader is then pretty simple:

<reader ref="jdbcReader">
  <properties>
    <property name="mapper" value="org.superbiz.SimpleMapper" />
    <property name="query" value="select * from FOO where name like 't%'" />
    <property name="driver" value="org.apache.derby.jdbc.EmbeddedDriver" />
    <property name="url" value="jdbc:derby:memory:jdbcreader;create=true" />
    <property name="user" value="app" />
    <property name="password" value="app" />
  </properties>
</reader>

Note: because we prepared our query with ResultSet.CONCUR_UPDATABLE option we can update the selected rows in the mapper. For instance the following mapper is valid (but you probably don’t want to delete the record line by line this way):

public class SimpleMapper implements RecordMapper {
        @Override
        public Object map(final ResultSet resultSet) throws SQLException {
            final String name = resultSet.getString("name");
            resultSet.deleteRow(); // use a bulk query to do so is more efficient
            return name;
        }
    }
Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s