CDI: replace the configuration by a register pattern


CDI doesn’t really have a configuration file. Of course the beans.xml is used to activate few features like interceptors but you can’t register a bean in it, can’t add a qualifier on a bean etc…

When it comes to writing a CDI library the question of the configuration hits you pretty quickly. Let see how to solve it with a not very complicated pattern making users life really nicer.

The starting point of this reflection is to check how as a transversal library you can propose a standard configuration. The answer is pretty easy: you can’t :D. Configuration is bound to too much points to make it generic enough to match all users. Some companies will use a deployment tool to push static files in a particular format. Some others will use a dynamic database. And finally in some cases you just need to put it in the webapp. So to summarize to have a generic configuration solution you need to be able to:

  • read any file
  • read any format
  • read any resource
  • read any database with any structure

This is such a task that some frameworks are built to do it like Apache Tamaya. You can of course use such a framework to read the configuration but you come back on the format/structure issue you need to impose to your users.

The conclusion of that state is as a library writer it is better to let users register in the library needed entries than reading entries in the user format or just imposing a format to the users which can not fit their usage.

Registration as a pattern: thread pool example

Note: I’ll use CDI 1.1 API in this post but changing the new API we can make it working on CDI 1.0 if needed.

To illustrate that we’ll create a small library allowing you to get injected named thread pools.

The API usage would be something like:

public class MyService {
    @Inject
    @ThreadPool("computation")
    private ExecutorService executorService;

    public Report doReport() {
         // use executorService
    }
}

The identified configuration for a thread pool is composed of:

  • core: the pool core size (size even when no task is executed)
  • max: the pool max size
  • keep alive time: the time after which a thread can be released if not used
  • thread factory: if threads need some customization (like wrapping runnables to start @RequestScoped automatically for instance ;))
  • rejection handler: what happens when a task can’t be submitted
  • shutdown time: the duration to wait for already submitted tasks to complete before giving up when shutting down

Now we have the API and the needed configuration we just need:

  • A startup event to register user pool configurations (of course we can support a default if needed or not to avoid to leak a pool between usages)
  • A customizable model used during registration to let the user change the settings
  • A “runtime” model: the one lazily accessed to create the pools on demand – of course you can eagerly create the pools if you prefer
  • A producer to allow to inject the pool

The API

The API is mainly just the ThreadPool qualifier:

import javax.enterprise.util.Nonbinding;
import javax.inject.Qualifier;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Qualifier
@Target({METHOD, FIELD, PARAMETER})
@Retention(RUNTIME)
public @interface ThreadPool {
    /**
     * @return the name of the pool to use.
     */
    @Nonbinding
    String value();
}

Nothing special to note there excepted it is a qualifier to not conflict if the application already produces some pool in other places.

Startup/Shutdown event

For a full CDI application you can use the event sent when application scope is initialized and destroyed:

@ApplicationScoped
public class ThreadPoolManager {
    private final AtomicBoolean running = new AtomicBoolean();

    void init(@Observes @Initialized(ApplicationScoped.class) final Object start) {
        // TODO: register
        running.set(true);
    }

    void destroy(@Observes @Destroyed(ApplicationScoped.class) final Object end) {
        running.set(false);
        // TODO: destroy
    }
}

The idea there is to have a model manager which will be used at runtime to find the pool model if the pool doesn’t already exist and create it. One interesting trick is to ensure you have a state flag allowing you to fail if the application queries a pool and the manager is destroyed (running in previous snippet).

Runtime model

Our model will exactly be the one of a ThreadPoolExecutor:

public class ThreadPoolModel {
    private final int core;
    private final int max;
    private final long keepAliveTime;
    private final TimeUnit keepAliveTimeUnit;
    private final BlockingQueue<Runnable> workQueue;
    private final ThreadFactory threadFactory;
    private final RejectedExecutionHandler handler;
    private final long shutdownTime;
    private final TimeUnit shutdownTimeUnit;

    public ThreadPoolModel(final int core, final int max,
                            final long keepAliveTime, final TimeUnit keepAliveTimeUnit,
                            final BlockingQueue<Runnable> workQueue,
                            final ThreadFactory threadFactory,
                            final RejectedExecutionHandler handler,
                            final long shutdownTime, final TimeUnit shutdownTimeUnit) {
        this.core = core;
        this.max = max;
        this.keepAliveTime = keepAliveTime;
        this.keepAliveTimeUnit = keepAliveTimeUnit == null ? MILLISECONDS : keepAliveTimeUnit;
        this.shutdownTime = shutdownTime;
        this.shutdownTimeUnit = shutdownTimeUnit == null ? MILLISECONDS : shutdownTimeUnit;
        this.workQueue = workQueue == null ? new LinkedBlockingDeque<>() : workQueue;
        this.threadFactory = threadFactory == null ? Executors.defaultThreadFactory() : threadFactory;
        this.handler = handler == null ? new ThreadPoolExecutor.AbortPolicy() : handler;
    }

    public ThreadPoolExecutor create() {
        return new ThreadPoolExecutor(core, max, keepAliveTime, keepAliveTimeUnit, workQueue, threadFactory, handler);
    }

    public void destroy(final ThreadPoolExecutor executor) {
        executor.shutdown();
        if (shutdownTime > 0) {
            try {
                executor.awaitTermination(shutdownTime, shutdownTimeUnit);
            } catch (final InterruptedException e) {
                Thread.interrupted();
            }
        }
    }
}

We clearly identify a producer pattern there where we passthrough the configuration to the ThreadPoolExecutor creation and we just add some wait time at shutdown when destructing the pool.

Wiring it to CDI

We have an event at the right moment, we have a class holding the configuration and able to create/destroy pools so we just need to be able to register the configuration, produce a pool and destroy them with the application.

Registration

There are multiple solutions but one simple is to fire an event which is the manager itself, add a register method in the manager and return a builder wrapping the model:

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Destroyed;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Produces;
import javax.enterprise.inject.spi.InjectionPoint;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

@ApplicationScoped
public class ThreadPoolManager {
    private final AtomicBoolean running = new AtomicBoolean();
    private final Map<String, ThreadPoolModel> models = new HashMap<>();
    private final ConcurrentMap<String, ThreadPoolExecutor> pools = new ConcurrentHashMap<>();

    @Inject
    private Event<ThreadPoolManager> registrationEvent;

    void init(@Observes @Initialized(ApplicationScoped.class) final Object start) {
        registrationEvent.fire(this);
        running.set(true);
    }

    @PreDestroy
    void destroy() {
        running.set(false);
        pools.forEach((n, p) -> models.get(n).destroy(p));
    }

    /**
     * @param name name of the pool (only mandatory configuration).
     * @return a pool builder to customize defaults of the pool. Think to call add() to ensure it is registered.
     */
    public ThreadPoolBuilder register(final String name) {
        return new ThreadPoolBuilder(this, name);
    }

    public static class ThreadPoolBuilder {
        private final String name;
        private final ThreadPoolManager registration;

        private int core = 3;
        private int max = 10;
        private long shutdownTime = 0;
        private TimeUnit shutdownTimeUnit = MILLISECONDS;
        private long keepAliveTime = 0;
        private TimeUnit keepAliveTimeUnit = MILLISECONDS;
        private BlockingQueue<Runnable> workQueue;
        private ThreadFactory threadFactory;
        private RejectedExecutionHandler handler;

        private ThreadPoolBuilder(final ThreadPoolManager registration, final String name) {
            this.registration = registration;
            this.name = name;
        }

        public ThreadPoolBuilder withCore(final int core) {
            this.core = core;
            return this;
        }

        public ThreadPoolBuilder withMax(final int max) {
            this.max = max;
            return this;
        }

        public ThreadPoolBuilder withKeepAliveTime(final long keepAliveTime, final TimeUnit unit) {
            this.keepAliveTime = keepAliveTime;
            this.keepAliveTimeUnit = unit;
            return this;
        }

        public ThreadPoolBuilder withShutdownTime(final long time, final TimeUnit unit) {
            this.shutdownTime = keepAliveTime;
            this.shutdownTimeUnit = unit;
            return this;
        }

        public ThreadPoolBuilder withWorkQueue(final BlockingQueue<Runnable> workQueue) {
            this.workQueue = workQueue;
            return this;
        }

        public ThreadPoolBuilder withThreadFactory(final ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
            return this;
        }

        public ThreadPoolBuilder withRejectedExecutionHandler(final RejectedExecutionHandler handler) {
            this.handler = handler;
            return this;
        }

        public ThreadPoolManager add() {
            this.registration.models.put(name, new ThreadPoolModel(core, max, keepAliveTime, keepAliveTimeUnit, workQueue, threadFactory, handler, shutdownTime, shutdownTimeUnit));
            return registration;
        }
    }
}

You can see:

  • we created a builder mapping our model but with a fluent API between the manager and the configuration
  • we store the model registered through the builder
  • we ensure the builders can be chained to propose a nice API for people not reading configuration from a file
  • we store created managers in a concurrent hashmap (runtime storage)
  • we destroy pools with the bean destruction (note: this is a simplification cause pool injections in other application scoped beans can be used after but this is out of scope for this post. Tip: use an extension and BeforeShutdown event or just proxy pools and make them synchronous after the manager destruction.)
  • we store created pools in a map to not recreate them (they are almost application scoped in the sense there will be one instance by name by application)

@Produces

Thanks to previous backbone we can just get or create the pool in a producer with the @ThreadPool qualifier:

@Produces
@ThreadPool("")
public ThreadPoolExecutor getOrCreatePool(final InjectionPoint ip) {
  if (!running.get()) {
    throw new IllegalStateException("Pool not available");
  }
  return pools.computeIfAbsent(ip.getAnnotated().getAnnotation(ThreadPool.class).value(), name ->
      ofNullable(models.get(name)).map(ThreadPoolModel::create).orElseThrow(() -> new IllegalArgumentException("No pool '" + name + "' defined.")));
}

Put it all together

Just to have a working code here is a working version of the manager (all but the @ThreadPool annotation):

import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Produces;
import javax.enterprise.inject.spi.InjectionPoint;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

@ApplicationScoped
public class ThreadPoolManager {
    private final AtomicBoolean running = new AtomicBoolean();
    private final Map<String, ThreadPoolModel> models = new HashMap<>();
    private final ConcurrentMap<String, ThreadPoolExecutor> pools = new ConcurrentHashMap<>();

    @Inject
    private Event<ThreadPoolManager> registrationEvent;

    void init(@Observes @Initialized(ApplicationScoped.class) final Object start) {
        registrationEvent.fire(this); // note: doesn't prevent to support some default file configuration if desired
        running.set(true);
    }

    @PreDestroy
    void destroy() {
        running.set(false);
        pools.forEach((n, p) -> models.get(n).destroy(p));
    }

    @Produces
    @ThreadPool("")
    public ThreadPoolExecutor getOrCreatePool(final InjectionPoint ip) {
        if (!running.get()) {
            throw new IllegalStateException("Pool not available");
        }
        return pools.computeIfAbsent(ip.getAnnotated().getAnnotation(ThreadPool.class).value(), name ->
            ofNullable(models.get(name)).map(ThreadPoolModel::create).orElseThrow(() -> new IllegalArgumentException("No pool '" + name + "' defined.")));
    }

    /**
     * @param name name of the pool (only mandatory configuration).
     * @return a pool builder to customize defaults of the pool. Think to call add() to ensure it is registered.
     */
    public ThreadPoolBuilder register(final String name) {
        return new ThreadPoolBuilder(this, name);
    }

    public static class ThreadPoolBuilder {
        private final String name;
        private final ThreadPoolManager registration;

        private int core = 3;
        private int max = 10;
        private long shutdownTime = 0;
        private TimeUnit shutdownTimeUnit = MILLISECONDS;
        private long keepAliveTime = 0;
        private TimeUnit keepAliveTimeUnit = MILLISECONDS;
        private BlockingQueue<Runnable> workQueue;
        private ThreadFactory threadFactory;
        private RejectedExecutionHandler handler;

        private ThreadPoolBuilder(final ThreadPoolManager registration, final String name) {
            this.registration = registration;
            this.name = name;
        }

        public ThreadPoolBuilder withCore(final int core) {
            this.core = core;
            return this;
        }

        public ThreadPoolBuilder withMax(final int max) {
            this.max = max;
            return this;
        }

        public ThreadPoolBuilder withKeepAliveTime(final long keepAliveTime, final TimeUnit unit) {
            this.keepAliveTime = keepAliveTime;
            this.keepAliveTimeUnit = unit;
            return this;
        }

        public ThreadPoolBuilder withShutdownTime(final long time, final TimeUnit unit) {
            this.shutdownTime = keepAliveTime;
            this.shutdownTimeUnit = unit;
            return this;
        }

        public ThreadPoolBuilder withWorkQueue(final BlockingQueue<Runnable> workQueue) {
            this.workQueue = workQueue;
            return this;
        }

        public ThreadPoolBuilder withThreadFactory(final ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
            return this;
        }

        public ThreadPoolBuilder withRejectedExecutionHandler(final RejectedExecutionHandler handler) {
            this.handler = handler;
            return this;
        }

        public ThreadPoolManager add() {
            this.registration.models.put(name, new ThreadPoolModel(core, max, keepAliveTime, keepAliveTimeUnit, workQueue, threadFactory, handler, shutdownTime, shutdownTimeUnit));
            return registration;
        }
    }

    private static class ThreadPoolModel {
        private final int core;
        private final int max;
        private final long keepAliveTime;
        private final TimeUnit keepAliveTimeUnit;
        private final BlockingQueue<Runnable> workQueue;
        private final ThreadFactory threadFactory;
        private final RejectedExecutionHandler handler;
        private final long shutdownTime;
        private final TimeUnit shutdownTimeUnit;

        private ThreadPoolModel(final int core, final int max,
                                final long keepAliveTime, final TimeUnit keepAliveTimeUnit,
                                final BlockingQueue<Runnable> workQueue,
                                final ThreadFactory threadFactory,
                                final RejectedExecutionHandler handler,
                                final long shutdownTime, final TimeUnit shutdownTimeUnit) {
            this.core = core;
            this.max = max;
            this.keepAliveTime = keepAliveTime;
            this.keepAliveTimeUnit = keepAliveTimeUnit == null ? MILLISECONDS : keepAliveTimeUnit;
            this.shutdownTime = shutdownTime;
            this.shutdownTimeUnit = shutdownTimeUnit == null ? MILLISECONDS : shutdownTimeUnit;
            this.workQueue = workQueue == null ? new LinkedBlockingDeque<>() : workQueue;
            this.threadFactory = threadFactory == null ? Executors.defaultThreadFactory() : threadFactory;
            this.handler = handler == null ? new ThreadPoolExecutor.AbortPolicy() : handler;
        }

        private ThreadPoolExecutor create() {
            return new ThreadPoolExecutor(core, max, keepAliveTime, keepAliveTimeUnit, workQueue, threadFactory, handler);
        }

        private void destroy(final ThreadPoolExecutor executor) {
            executor.shutdown();
            if (shutdownTime > 0) {
                try {
                    executor.awaitTermination(shutdownTime, shutdownTimeUnit);
                } catch (final InterruptedException e) {
                    Thread.interrupted();
                }
            }
        }
    }
}

Registration sample

Now all is set up to create a pool you just need to register it through an observer:

@Dependent
public static class Registration {
    public void register(@Observes final ThreadPoolManager mgr) {
        // or read any config you desire
        mgr.register("test").withMax(4).withCore(2).add();
    }
}

Conclusion

Once you have a programmable configuration the user can just use the configuration which matches his company/case and register what he needs in your library.

If you step back you will see it is a more common pattern than you thought: metrics – which is all but CDI 😉 – uses it for instance.

The other nice thing is indirectly you almost use a reactive pattern since the user pushes the configuration to you instead of letting you pulling the user configuration.

The important part of this post is to keep in mind that controlling boundaries of the users is a task you are pretty sure to do wrong or excluding people. Avoiding it means providing users an API to be able to integrate with you nicely. Configuration is part of these boundary things you can’t control or even guess nicely from a library so it sounds wiser to assume it eagerly than just thinking configuration needs to be somewhere and it is not that important where :).

1 thought on “CDI: replace the configuration by a register pattern

Leave a comment