I have been writing code since 1980. In 1999, I learned Java and after discovering IntelliJ the love was complete. XML and XSLT was a revelation in 2000, too bad the standards committees have done so much damage since. Contributed to the Flying Saucer open source XML and CSS rendered and published an open source XSLT generator, weffo. Currently employed by Google. Torbjörn is a DZone MVB and is not an employee of DZone and has posted 9 posts at DZone. You can read more from them at their website. View Full User Profile

Using Java Concurrency Utilities

12.29.2008
| 9294 views |
  • submit to reddit

The inspiration for this post comes from Jacob Hookom's blog and I can only second the recommendations he gives. Although, as always, I would caution to test any such implementation properly, that it works well and actually provides a benefit. There are lots of pitfalls and concurrency is tricky even with the excellent utilities provided in Java.

To summarize the interesting problem: parallelize the execution of lengthy tasks in a web request, without creating many threads for each request, but also ensuring that the thread pool is not starved by one request. The idea is to have a reasonably sized thread pool and to limit the number of tasks executing in parallel to a number small enough to allow the expected amount of concurrent requests to share the pooled threads.

Essentially, limiting the number of tasks executing in parallel can be done in two ways: limit the number of tasks submitted at one time or limit the number of workers that execute a set of tasks. Jacob takes the first approach, I will take the second approach, which seems to make it simpler to manage time-out issues.

Here's some code:

<V> Queue<Future><V>> submit(int numberOfWorkers, Queue<Callable><V>> tasks,
long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
Queue<Future><V>> result = new ConcurrentLinkedQueue<Future><V>>();
List<WorkerTask><V>> workers = new ArrayList<WorkerTask><V>>(numberOfWorkers);

for (int i = 0; i < numberOfWorkers; i++)
{
workers.add(new WorkerTask<V>(result, tasks));
}
List<Future><Object>> deadWorkers = executor.invokeAll(workers, timeout, unit);

for (Future<Object> obituary : deadWorkers)
{
if (obituary.isCancelled())
{
throw new TimeoutException();
}
}
return result;
}

And the code for a WorkerTask:

private static class WorkerTask<V> implements Callable<Object> 
{
private Queue<Callable><V>> tasks;
private Queue<Future><V>> result;

public WorkerTask(Queue<Future><V>> result, Queue<Callable><V>> tasks)
{
this.result = result;
this.tasks = tasks;
}

public Object call()
{
for (Callable<V> task = tasks.poll(); task != null; task = tasks.poll())
{
FutureTask<V> future = new FutureTask<V>(task);
future.run();
if (Thread.interrupted())
{
Thread.currentThread().interrupt();
// Restore interrupt.
break;
}
result.add(future);
}
return null;
}
}

 

Note that it is important to have thread-safe collections for tasks and result, we should actually make sure that the tasks are in a thread-safe collection, but I'll ignore that for now. Note also the check if the thread has been interrupted in the call() method of WorkerTask. That is vital to be able to cancel the task when you don't want to wait for it any longer (i.e. on time-out). If possible, the submitted tasks should also handle interrupts. Note the careful restoration of the interrupt status so that the caller of the method may also be notified.

From http://tobega.blogspot.com/

Published at DZone with permission of Torbjörn Gannholm, author and DZone MVB.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

Ronald Miura replied on Mon, 2008/12/29 - 9:34am

The generics' angle brackets are messed up

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.