B.Tech in Computer Science and Enginering, interested in Distributed Systems. Currently working as a Software Development Engineer in Amazon Swaranga is a DZone MVB and is not an employee of DZone and has posted 4 posts at DZone. You can read more from them at their website. View Full User Profile

A Generic and Concurrent Object Pool

03.29.2012
| 9973 views |
  • submit to reddit
In this post we will take a look at how we can create an object pool in Java. In recent years, the performance of the JVM has multiplied manifold that object pooling for better performance has been made almost redundant for most type of objects. In essence, creation of objects are no longer considered as expensive as it was done before.
However there are some kind of objects that certainly proves costly on creation. Objects such as Threads, database connection objects etc are not lightweight objects and are slightly more expensive to create. In any application we require the use of multiple objects of the above kind. So it would be great if there was a very way easy to create and mantain an object pool of that type so that objects can be dynamically used and reused, without the client code being bothered about the live cycle of the objects.
Before actually writing the code for an object pool, let us first identify the main requirements that any object pool must answer.
  • The pool must let clients use an object if any is available.
  • It must reuse the objects once they are returned to the pool by a client.
  • If required, it must be able to create more objects to satisfy growing demands of the client.
  • It must provide a proper shutdown mechanism, such that on shutdown no memory leaks occur.
Needless to say, the above points will form the basis of the interface that we will expose to our clients.
So our interface declaration will be as follows:
package com.test.pool;


/**
 * Represents a cached pool of objects.
 * 
 * @author Swaranga
 *
 * @param < T > the type of object to pool.
 */
public interface Pool< T >
{
 /**
  * Returns an instance from the pool. 
  * The call may be a blocking one or a non-blocking one 
  * and that is determined by the internal implementation.
  * 
  * If the call is a blocking call, 
  * the call returns immediately with a valid object 
  * if available, else the thread is made to wait 
  * until an object becomes available.
  * In case of a blocking call, 
  * it is advised that clients react 
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  * 
  * If the call is a non-blocking one, 
  * the call returns immediately irrespective of 
  * whether an object is available or not.
  * If any object is available the call returns it 
  * else the call returns < code >null< /code >.
  * 
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that 
  * an object < code >o< /code > is valid if 
  * < code > Validator.isValid(o) == true < /code >.
  * 
  * @return T one of the pooled objects.
  */
 T get();
 
 /**
  * Releases the object and puts it back to the pool.
  * 
  * The mechanism of putting the object back to the pool is
  * generally asynchronous, 
  * however future implementations might differ.
  * 
  * @param t the object to return to the pool
  */
  
 void release(T t);
 
 /**
  * Shuts down the pool. In essence this call will not 
  * accept any more requests 
  * and will release all resources.
  * Releasing resources are done 
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */
 
 void shutdown();
}

The above interface is intentionally made very simple and generic to support any type of objects. It  provides methods to  get/return an object from/to the pool. It also provides a shutdown mechanism to dispose of the objects. 
Now we try to create an implementation of the above interface. But before doing that it is important to note that an ideal release() method will first try to check if the object returned by the client is still reusable. If yes then it will return it to the pool else the object has to be discarded. We want every implementation of the Pool interface to follow this rule. So  before creating a concrete implementation, we create an abstract implementation hat imposes this restriction on subsequent implementations. Our abstract implementation will be called, surprise, AbstractPool and its definition will be as follows:
package com.test.pool;

/**
 * Represents an abstract pool, that defines the procedure
 * of returning an object to the pool.
 * 
 * @author Swaranga
 *
 * @param < T > the type of pooled objects.
 */
abstract class AbstractPool < T > implements Pool < T >
{
 /**
  * Returns the object to the pool. 
  * The method first validates the object if it is
  * re-usable and then puts returns it to the pool.
  * 
  * If the object validation fails, 
  * some implementations
  * will try to create a new one 
  * and put it into the pool; however 
  * this behaviour is subject to change 
  * from implementation to implementation
  * 
  */
 @Override
 public final void release(T t)
 {
  if(isValid(t))
  {
   returnToPool(t);
  }
  else
  {
   handleInvalidReturn(t);
  }
 }
 
 protected abstract void handleInvalidReturn(T t);
 
 protected abstract void returnToPool(T t);
 
 protected abstract boolean isValid(T t);
}
In the above class, we have made it mandatory for object pools to validate an object before returning it to the pool. To customize the behaviour of their pools the implementations are free to chose the way they implement the three abstract methods. They will decide using their own logic, how to check if an object is valid for reuse [the validate() method], what to do if the object returned by a client is not valid [the handleInvalidReturn() method] and the actual logic to return a valid object to the pool [the returnToPool() method].
Now having the above set of classes we are almost ready for a concrete implementation. But the catch is that since the above classes are designed to support generic object pools, hence a generic implementation of the above classes will not know how to validate an object [since the objects will be generic :-)]. Hence we need something else that will help us in this.
What we actually need is a common way to validate an object so that the concrete Pool implementations will not have to bother about the type of objects being validated. So we introduce a new interface, Validator, that defines methods to validate an object. Our definition of the Validator interface will be as follows:
package com.test.pool;

 /**
  * Represents the functionality to 
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  * 
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T >
 {
  /**
   * Checks whether the object is valid.
   * 
   * @param t the object to check.
   * 
   * @return <code>true</code> 
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);
  
  /**
   * Performs any cleanup activities 
   * before discarding the object.
   * For example before discarding 
   * database connection objects,
   * the pool will want to close the connections. 
   * This is done via the 
   * <code>invalidate()</code> method.
   * 
   * @param t the object to cleanup
   */
  
  public void invalidate(T t);
 }
The above interface defines methods to check if an object is valid and also a method to invalidate and object. The invalidate method should be used when we want to discard an object and clear up any memory used by that instance. Note that this interface has little significance by itself and makes sense only when used in context of an object pool. So we define this interface inside the top level Pool interface. This is analogous to the Map and Map.Entry interfaces in the Java Collections Library. Hence our Pool interface becomes as follows:
package com.test.pool;


/**
 * Represents a cached pool of objects.
 * 
 * @author Swaranga
 *
 * @param < T > the type of object to pool.
 */
public interface Pool< T >
{
 /**
  * Returns an instance from the pool. 
  * The call may be a blocking one or a non-blocking one 
  * and that is determined by the internal implementation.
  * 
  * If the call is a blocking call, 
  * the call returns immediately with a valid object 
  * if available, else the thread is made to wait 
  * until an object becomes available.
  * In case of a blocking call, 
  * it is advised that clients react 
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  * 
  * If the call is a non-blocking one, 
  * the call returns immediately irrespective of 
  * whether an object is available or not.
  * If any object is available the call returns it 
  * else the call returns < code >null< /code >.
  * 
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that 
  * an object < code >o< /code > is valid if 
  * < code > Validator.isValid(o) == true < /code >.
  * 
  * @return T one of the pooled objects.
  */
 T get();
 
 /**
  * Releases the object and puts it back to the pool.
  * 
  * The mechanism of putting the object back to the pool is
  * generally asynchronous, 
  * however future implementations might differ.
  * 
  * @param t the object to return to the pool
  */
  
 void release(T t);
 
 /**
  * Shuts down the pool. In essence this call will not 
  * accept any more requests 
  * and will release all resources.
  * Releasing resources are done 
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */
 
 void shutdown();

 /**
  * Represents the functionality to 
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  * 
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T >
 {
  /**
   * Checks whether the object is valid.
   * 
   * @param t the object to check.
   * 
   * @return <code>true</code> 
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);
  
  /**
   * Performs any cleanup activities 
   * before discarding the object.
   * For example before discarding 
   * database connection objects,
   * the pool will want to close the connections. 
   * This is done via the 
   * <code>invalidate()</code> method.
   * 
   * @param t the object to cleanup
   */
  
  public void invalidate(T t);
 }
}

We are almost ready for a concrete implementation. But before that we need one final weapon, which is actually the most important weapon of an object pool. It is called "the ability to create new objects".c Sine our object pools will be generic, they must have knowledge of how to create new objects to populate its pool. This functionality must also not depend on the type of the object pool and must be a common way to create new objects. The way to do this will be an interface, called ObjectFactory that defines just one method, which is "how to create a new object". Our ObjectFactory interface is as follows:
package com.test.pool;

/**
 * Represents the mechanism to create 
 * new objects to be used in an object pool.
 * 
 * @author Swaranga
 *
 * @param < T > the type of object to create. 
 */
public interface ObjectFactory < T >
{
 /**
  * Returns a new instance of an object of type T.
  * 
  * @return T an new instance of the object of type T
  */
 public abstract T createNew();
}
 We are finally done with our helper classes and now we will create a concrete implementation of the Pool interface. Since we want a pool that can be used in concurrent applications, we will create a blocking pool that blocks the  client if no objects are available in the pool. The blocking mechanism will block indefinitely until an objects becomes available. This kind of implementation begets that another method be there which will block only for a given time-out period, if any object becomes available before the time out that object is returned otherwise after the timeout instead of waiting for ever, a null object is returned. This implementation is analogous to a LinkedBlockingQueue implementation of the Java Concurrency API and thus before implementing the actual class we expose another implementation, BlockingPool, which is analogous to the BlockingQueue interface of the Java Concurrency API.

Hence the Blockingpool interface declaration is as follows:
package com.test.pool;

import java.util.concurrent.TimeUnit;

/**
 * Represents a pool of objects that makes the 
 * requesting threads wait if no object is available.
 * 
 * @author Swaranga
 *
 * @param < T > the type of objects to pool.
 */
public interface BlockingPool < T > extends Pool < T >
{
 /**
  * Returns an instance of type T from the pool.
  * 
  * The call is a blocking call, 
  * and client threads are made to wait
  * indefinitely until an object is available. 
  * The call implements a fairness algorithm 
  * that ensures that a FCFS service is implemented.
  * 
  * Clients are advised to react to InterruptedException. 
  * If the thread is interrupted while waiting 
  * for an object to become available,
  * the current implementations 
  * sets the interrupted state of the thread 
  * to <code>true</code> and returns null. 
  * However this is subject to change 
  * from implementation to implementation.
  * 
  * @return T an instance of the Object 
  * of type T from the pool.
  */
 T get();
 
 /**
  * Returns an instance of type T from the pool, 
  * waiting up to the
  * specified wait time if necessary 
  * for an object to become available..
  * 
  * The call is a blocking call, 
  * and client threads are made to wait
  * for time until an object is available 
  * or until the timeout occurs. 
  * The call implements a fairness algorithm 
  * that ensures that a FCFS service is implemented.
  * 
  * Clients are advised to react to InterruptedException. 
  * If the thread is interrupted while waiting 
  * for an object to become available,
  * the current implementations 
  * set the interrupted state of the thread 
  * to <code>true</code> and returns null. 
  * However this is subject to change 
  * from implementation to implementation.
  *  
  * 
  * @param time amount of time to wait before giving up, 
  *   in units of <tt>unit</tt>
  * @param unit a <tt>TimeUnit</tt> determining 
  *   how to interpret the
  *        <tt>timeout</tt> parameter
  *        
  * @return T an instance of the Object 
  * of type T from the pool.
  *        
  * @throws InterruptedException 
  * if interrupted while waiting
  */
 
 T get(long time, TimeUnit unit) throws InterruptedException;
}
  And our BoundedBlockingPool implementation will be as follows: 
package com.test.pool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public final class BoundedBlockingPool < T > 
 extends AbstractPool < T >
 implements BlockingPool < T >
{
 private int size;
 
 private BlockingQueue < T > objects;
 
 private Validator < T > validator;
 private ObjectFactory < T > objectFactory;
 
 private ExecutorService executor = 
  Executors.newCachedThreadPool();
  
 private volatile boolean shutdownCalled;
 
 public BoundedBlockingPool(
   int size, 
   Validator < T > validator, 
   ObjectFactory < T > objectFactory)
 {
  super();
  
  this.objectFactory = objectFactory;
  this.size = size;
  this.validator = validator;
  
  objects = new LinkedBlockingQueue < T >(size);
  
  initializeObjects();
  
  shutdownCalled = false;
 }
 
 public T get(long timeOut, TimeUnit unit)
 {
  if(!shutdownCalled)
  {
   T t = null;
   
   try
   {
    t = objects.poll(timeOut, unit);
    
    return t;
   }
   catch(InterruptedException ie)
   {
    Thread.currentThread().interrupt();
   }
   
   return t;
  }
  
  throw new IllegalStateException(
   "Object pool is already shutdown");
 }
 
 public T get()
 {
  if(!shutdownCalled)
  {
   T t = null;
   
   try
   {
    t = objects.take();
   }
   catch(InterruptedException ie)
   {
    Thread.currentThread().interrupt();
   }
   
   return t;
  }
  
  throw new IllegalStateException(
   "Object pool is already shutdown");
 }
 
 public void shutdown()
 {
  shutdownCalled = true;
  
  executor.shutdownNow();
  
  clearResources();
 }
 
 private void clearResources()
 {
  for(T t : objects)
  {
   validator.invalidate(t);
  }
 }
 
 @Override
 protected void returnToPool(T t)
 {
  if(validator.isValid(t))
  {
   executor.submit(new ObjectReturner(objects, t));
  }
 }
 
 @Override
 protected void handleInvalidReturn(T t)
 {
  
 }
 
 @Override
 protected boolean isValid(T t)
 {
  return validator.isValid(t);
 }
 
 private void initializeObjects()
 {
  for(int i = 0; i < size; i++)
  {
   objects.add(objectFactory.createNew());
  }
 }
 
 private class ObjectReturner < E > 
            implements Callable < Void >
 {
  private BlockingQueue < E > queue;
  private E e;
  
  public ObjectReturner(BlockingQueue < E > queue, E e)
  {
   this.queue = queue;
   this.e = e;
  }
  
  public Void call()
  {
   while(true)
   {
    try
    {
     queue.put(e);
     break;
    }
    catch(InterruptedException ie)
    {
     Thread.currentThread().interrupt();
    }
   }
   
   return null;
  }
 }
}

The above is a very basic object pool backed internally by a LinkedBlockingQueue. The only method of interest is the returnToPool() method. Since the internal storage is a blocking pool, if we tried to put the returned element directly into the LinkedBlockingPool, it might block he client if the queue is full. But we do not want a client of an object pool to block just for a mundane task like returning an object to the pool. So we have made the actual task of inserting the object into the LinkedBlockingQueue as an asynchronous task and submit it to an Executor instance so that the client thread can return immediately.

Now we will use the above object pool into our code. We will use the object pool to pool some database connection objects. Hence we will need a Validator to validate our database connection objects.

Our JDBCConnectionValidator will look like this:

package com.test;

import java.sql.Connection;
import java.sql.SQLException;

import com.test.pool.Pool.Validator;

public final class JDBCConnectionValidator 
    implements Validator < Connection >
{
 public boolean isValid(Connection con)
 { 
  if(con == null)
  {
   return false;
  }
  
  try
  {
   return !con.isClosed();
  }
  catch(SQLException se)
  {
   return false;
  }
 }
 
 public void invalidate(Connection con)
 {
  try
  {
   con.close();
  }
  catch(SQLException se)
  {
   
  }
 }
}

And our JDBCObjectFactory, that will enable the object pool to create new objects will be as follows:

package com.test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

import com.test.pool.ObjectFactory;

public class JDBCConnectionFactory 
 implements ObjectFactory < Connection >
{
 private String connectionURL;
 private String userName;
 private String password;
  
 public JDBCConnectionFactory(
  String driver, 
  String connectionURL, 
  String userName, 
  String password)
        {
         super();
         
         try
         {
          Class.forName(driver);
         }
         catch(ClassNotFoundException ce)
         {
          throw new IllegalArgumentException(
           "Unable to find driver in classpath", ce);
         }
         
         this.connectionURL = connectionURL;
         this.userName = userName;
         this.password = password;
        }
 
 public Connection createNew()
 { 
  try
  {
   return 
       DriverManager.getConnection(
    connectionURL, 
    userName, 
    password);
  }
  catch(SQLException se)
  {
   throw new IllegalArgumentException(
    "Unable to create new connection", se);
  }
 }
}

Now we create a JDBC object pool using the above Validator and ObjectFactory:

package com.test;
import java.sql.Connection;

import com.test.pool.Pool;
import com.test.pool.PoolFactory;


public class Main
{
 public static void main(String[] args)
    {
  Pool < Connection > pool = 
   new BoundedBlockingPool < Connection > (
    10, 
    new JDBCConnectionValidator(),
    new JDBCConnectionFactory("", "", "", "")
    );
  
  //do whatever you like
    }
}

As a bonus for reading the entire post. I will provide another implementation of the Pool interface that is essentially a non blocking object pool. The only difference of this implementation for the previous one is that this implementation does not block the client if an element is unavailable, rather return null. Here it goes:

package com.test.pool;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;

public class BoundedPool < T > 
 extends AbstractPool < T >
{
 private int size;
 
 private Queue < T > objects;
 
 private Validator < T > validator;
 private ObjectFactory < T > objectFactory;
 
 private Semaphore permits;
  
 private volatile boolean shutdownCalled;
 
 public BoundedPool(
  int size, 
  Validator < T > validator, 
  ObjectFactory < T > objectFactory)
 {
  super();
  
  this.objectFactory = objectFactory;
  this.size = size;
  this.validator = validator;
  
  objects = new LinkedList < T >();
  
  initializeObjects();
  
  shutdownCalled = false;
 }
 
 
 @Override
 public T get()
 {
  T t = null;
  
  if(!shutdownCalled)
  {
   if(permits.tryAcquire())
   {
    t = objects.poll();
   }
  }
  else
  {
   throw new IllegalStateException(
    "Object pool already shutdown");
  }
  
  return t;
 }

 @Override
 public void shutdown()
 {
  shutdownCalled = true;
  
  clearResources();
 }
 
 private void clearResources()
 {
  for(T t : objects)
  {
   validator.invalidate(t);
  }
 }

 @Override
 protected void returnToPool(T t)
 {
  boolean added = objects.add(t);
  
  if(added)
  {
   permits.release();
  }
 }
 
 @Override
 protected void handleInvalidReturn(T t)
 {
  
 }

 @Override
 protected boolean isValid(T t)
 {
  return validator.isValid(t);
 }
 
 private void initializeObjects()
 {
  for(int i = 0; i < size; i++)
  {
   objects.add(objectFactory.createNew());
  }
 }
}

Considering we are now two implementations strong, it is better to let users create our pools via factory with meaningful names. Here is the factory:

package com.test.pool;

import com.test.pool.Pool.Validator;

/**
 * Factory and utility methods for 
 * {@link Pool} and {@link BlockingPool} classes 
 * defined in this package. 
 * This class supports the following kinds of methods:
 *
 * <ul>
 *   <li> Method that creates and returns a default non-blocking 
 *        implementation of the {@link Pool} interface.
 *   </li>
 *   
 *   <li> Method that creates and returns a 
 *        default implementation of 
 *        the {@link BlockingPool} interface.
 *   </li>
 * </ul>
 *
 * @author Swaranga
 */

public final class PoolFactory
{
 private PoolFactory()
 {
  
 }
 
 /**
  * Creates a and returns a new object pool,
  * that is an implementation of the {@link BlockingPool}, 
  * whose size is limited by
  * the <tt> size </tt> parameter.
  * 
  * @param size the number of objects in the pool.
  * @param factory the factory to create new objects.
  * @param validator the validator to 
  * validate the re-usability of returned objects.
  * 
  * @return a blocking object pool
  * bounded by <tt> size </tt>
  */
 public static < T > Pool < T > 
  newBoundedBlockingPool(
      int size, 
      ObjectFactory < T > factory, 
      Validator < T > validator)
 {
  return new BoundedBlockingPool < T > (
                                    size, 
                                    validator,
                                    factory);
 }
 
 /**
  * Creates a and returns a new object pool,
  * that is an implementation of the {@link Pool} 
  * whose size is limited 
  * by the <tt> size </tt> parameter.
  * 
  * @param size the number of objects in the pool.
  * @param factory the factory to create new objects.
  * @param validator the validator to validate 
  * the re-usability of returned objects.
  * 
  * @return an object pool bounded by <tt> size </tt>
  */
 
 public static < T > Pool < T > newBoundedNonBlockingPool(
  int size, 
  ObjectFactory < T > factory, 
  Validator < T > validator)
 {
  return new BoundedPool < T >(size, validator, factory);
 }
}

Thus our clients now can create object pools in a more readable manner:

package com.test;
import java.sql.Connection;

import com.test.pool.Pool;
import com.test.pool.PoolFactory;


public class Main
{
 public static void main(String[] args)
    {
  Pool < Connection > pool = 
   PoolFactory.newBoundedBlockingPool(
    10, 
    new JDBCConnectionFactory("", "", "", ""), 
    new JDBCConnectionValidator());
  
  //do whatever you like
    }
}

And so ends our long post. This one was long overdue. Feel free to use it, change it, add more implementations.

Published at DZone with permission of Swaranga Sarma, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

matt inger replied on Fri, 2012/03/30 - 7:26am

This is a great example showing how to implement a pool.  However, for most people, there's no need to re-invent the wheel, and I would recommend the commons-pool package from apache. 

It combines several of your classes into a single PoolableObjectFactory<T> interface.  It has all the methods for object lifecycle management: construction, validation, passivation, activation and destruction.

 

PoolableObjectFactory<MyObject> myFactory = ...;
GenericObjectPool<MyObject> pool = new GenericObjectPool<MyObject>(myFactory, maxActive, GenericObjectPool.WHEN_EXHAUSTED_BLOCK, 10000L);

 

There's a lot of options on the pool class for things like time to live, max # of idle instances, etc....

 As i said, no use re-inventing the wheel, but kudos for taking the time for a well thought out example.

 

Swaranga Sarma replied on Sun, 2012/05/13 - 10:10am in response to: matt inger

Thanks for pointing that out. I was not familiar with the Apache Commons Pooling library. They certaily have implemented a much robust and flexible framework for object pooling. However it never hurts to learn to implement something yourself. :-)

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.