A common need when writing batches is to interact with a database. The two main ways to do it is to either use JDBC or JPA.
In this post we’ll see how to use JDBC to get items from a database.
We’ll simply read items from a database in a simple chunk step. So we basically needs to write an ItemReader.
The reader itself
To read from a database what do we need?
- a JDBC connection. It can be obtained from a [driver, url, user, password] n-fold or from a container DataSource (either injected or looked up by its jndi name).
- a sql query to find records
- a mapper converting records to objects (there are alternative but this one is pretty simple and efficient)
Once we have these information that’s pretty simple to execute the query and for each record returning the result of the ResultSet conversion.
However take care to two main things here:
- the query needs to handle the pagination if you don’t want to get memory issues
- the ItemReader needs to cache converted records to avoid to requery the database for each items which leads to performances issues pretty quickly
To convert the ResultSet to an Object we suppose we have the following interface:
public interface RecordMapper { Object map(ResultSet resultSet) throws SQLException; }
Here is a possible implementation:
import javax.batch.api.BatchProperty; import javax.batch.api.chunk.ItemReader; import javax.inject.Inject; import java.io.Serializable; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.LinkedList; import javax.inject.Named; @Named public class JdbcReader implements ItemReader { @Inject @BatchProperty private String jndi; @Inject @BatchProperty protected String driver; @Inject @BatchProperty protected String url; @Inject @BatchProperty protected String user; @Inject @BatchProperty protected String password; @Inject @BatchProperty(name = "mapper") private String mapperStr; @Inject @BatchProperty private String query; private LinkedList<Object> items; private RecordMapper mapper; protected Connection connection() throws Exception { if (jndi != null) { return DataSource.class.cast(new InitialContext().lookup(jndi)).getConnection(); } try { Class.forName(driver); } catch (final ClassNotFoundException e) { throw new BatchRuntimeException(e); } return DriverManager.getConnection(url, user, password); } @Override public void open(final Serializable checkpoint) throws Exception { mapper = RecordMapper.class.cast(Thread.currentThread().getContextClassLoader().newInstance(mapperStr)); items = new LinkedList<Object>(); } @Override public void close() throws Exception { // no-op } @Override public Object readItem() throws Exception { if (items.isEmpty()) { final Connection conn = connection(); final PreparedStatement preparedStatement = conn.prepareStatement(query, ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE); ResultSet resultSet = null; try { resultSet = preparedStatement.executeQuery(); while (resultSet.next()) { items.add(mapper.map(resultSet)); } if (items.isEmpty()) { return null; } } finally { if (resultSet != null) { resultSet.close(); } preparedStatement.close(); } } return items.pop(); } @Override public Serializable checkpointInfo() throws Exception { return null; // not needed if tha datasource is transactional and the common case will be to re-query the database, state here is the database itself } }
The configuration of the JdbcReader is then pretty simple:
<reader ref="jdbcReader"> <properties> <property name="mapper" value="org.superbiz.SimpleMapper" /> <property name="query" value="select * from FOO where name like 't%'" /> <property name="driver" value="org.apache.derby.jdbc.EmbeddedDriver" /> <property name="url" value="jdbc:derby:memory:jdbcreader;create=true" /> <property name="user" value="app" /> <property name="password" value="app" /> </properties> </reader>
Note: because we prepared our query with ResultSet.CONCUR_UPDATABLE option we can update the selected rows in the mapper. For instance the following mapper is valid (but you probably don’t want to delete the record line by line this way):
public class SimpleMapper implements RecordMapper { @Override public Object map(final ResultSet resultSet) throws SQLException { final String name = resultSet.getString("name"); resultSet.deleteRow(); // use a bulk query to do so is more efficient return name; } }