Angel has posted 1 posts at DZone. View Full User Profile

Java 7 Fork/Join example implementing Object Pool pattern for mathematical procedures

12.19.2011
| 2764 views |
  • submit to reddit

Fork/Join (JDK 1.7)

 

New in the Java SE 7 release, the fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to make your application wicked fast.

 Source:

Oracle Fork/Join section (click here)

 

This post try to solve what must be a typically problem with the fork/join implementations, or any other concurrent tasks that has to split an create new tasks on the fly.

 

From a very short experiment implementing the oracle example, it was found that splitting mathematical tasks that run fast on a process, produce delays on the final time of calculations, resulting on decreasing performance instead of improve it.

 

It was not very hard to see that the overhead time was produced by the cost (on time), that generateds the creation of new objects on the fly.

 

On the following example it was included on the calculation process, a object poll class, from the apache commons pool classes to provide objects to the split tasks.

 

Additionally to that the pool was populated with object that performs calculations with a common one dimension array, allowing to all the objects borrows from the pool share the same reference to it.

 

Also the design classes were make flexible enough to serve several purposes. This mean that the object pooled from the pool does his calculations through a instance of a class that encapsulates the behavior of the method that actually does the math

 

public class ForkJoinArrayCalculator<T> extends RecursiveAction {
    private static final int TOKEN = -1;
    /**
     * A generic type up to: Double, Float, Integer arrays
     */
    private static final long serialVersionUID = 1L;
    protected T[] array;
    protected int start;
    protected int length;
    protected Procedure<T> procedure;
    private int thresHold = TOKEN;
    private ObjectPool pool;

//Generic constructor to be use by the ObjectPoolableFactory (see below)
    public ForkJoinArrayCalculator() {
    }
//set to the object, to do, when the object its about to be used
    public void set(Procedure<T> procedure, ObjectPool pool, int thresHold, T[] array, int start,
            int length) {
        this.array = array;
        this.start = start;
        this.length = length;
        this.procedure = procedure;
        this.pool = pool;
        this.thresHold = thresHold;
    }
//unset to the object when it's returned back to the pool (we don't want any references there)
    public void unset() {
        this.procedure = null;
        this.array = null;
        this.pool = null;
    }
//the implementation
    @Override
    protected void compute() {
        if (thresHold == TOKEN)
            throw new IllegalStateException("set the threshold");
        if (pool == null)
            throw new IllegalStateException("set the object pool");

        if (length <= thresHold) {
            straightCalc();
            return;
        }

        int half = (length) / 2;
        ForkJoinArrayCalculator<T> p1 = getObject(start, half);
        ForkJoinArrayCalculator<T> p2 = getObject(start + half, length - half);
        invokeAll(p1, p2);
        try {
            pool.returnObject(p1);
            pool.returnObject(p2);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
//Method that get an object form the pool
    @SuppressWarnings("unchecked")
    private ForkJoinArrayCalculator<T> getObject(int start, int length) {
        ForkJoinArrayCalculator<T> obj;
        try {
            obj = (ForkJoinArrayCalculator<T>) pool.borrowObject();
            obj.set(procedure, pool, thresHold, array, start, length);
            return obj;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }

    }
//The method that actually does the calculation
    private void straightCalc() {
        for (int i = start; i < length; i++)
            procedure.calculate(array, i);
    }

}

 

This class it's the implementation of the Recursive Action used by the ForkJoinPool


//Factory to create new objects to populate the pool

public class CalculatorFactory extends BasePoolableObjectFactory {
//Creates new objects
    @SuppressWarnings("rawtypes")
    @Override
    public Object makeObject() throws Exception {
        return new ForkJoinArrayCalculator();
    }
//Method call to return a object to the pool
    @SuppressWarnings("rawtypes")
    @Override
    public void passivateObject(Object obj) throws Exception {
        ((ForkJoinArrayCalculator) obj).unset();
    }

}

 

 

 

This class it's use to the Apache ObjectPool implementation to create new objects

 


public interface Procedure<T> {
    public void calculate(T[] array, int pos);
}

//Procedure implementation to pow a value on the array

public class SquareProcedure implements Procedure<Double> {
//calculate that receives a reference to an array, an the position to work with
    @Override
    public void calculate(Double[] array, int pos) {
        array[pos] = Math.pow(array[pos], 2);
    }
}

 

The mathematical procedures, notice that the calculate method receives a reference to an array, an the position that should modify. In a more complex implementation you it is possible to define instance constants, arrays, or other values, always taking in notice that it would be visited concurrently from several threads

 

public class SquareTest {

    /**
     * @param args
     */
    public static void main(String[] args) {
        int tam = 5000; //array size
        Double[] array = new Double[tam];

        for (int i = 0; i < array.length; i++)
            array[i] = (double) i;//population (only necesary if  the implementation

                                  //of the procedure requires to do)

        int thresHold = 2500;//Threshold to split the task

        SquareProcedure procedure = new SquareProcedure();//Procedure imp

                //ObjectPool
        StackObjectPool pool = new StackObjectPool(new CalculatorFactory());
              //populating the pool

        for (int i = 0; i < 20; i++)
            try {
                pool.addObject();
            } catch (Exception e) {
                e.printStackTrace();
            }
        //RecursiveAction implementation
        ForkJoinArrayCalculator<Double> doubleCalculator = new ForkJoinArrayCalculator<>();

        //setting the inital task
        doubleCalculator.set(procedure, pool, thresHold, array, 0, tam);
      
        ForkJoinPool forkPool = new ForkJoinPool();
        long startTime = System.currentTimeMillis();
        forkPool.invoke(doubleCalculator);//The invokation
        long endTime = System.currentTimeMillis();

        System.out.println("It took " + (endTime - startTime) + " ms.");

    }
}

 

The actual test

Published at DZone with permission of its author, Angel Lacret.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)