Václav is a programming enthusiast who's constantly seeking ways to make development more effective and enjoyable. He's particularly interested in server-side Java technologies, distributed systems, concurrency, agile methodologies, modern programming languages and DSLs. He works for JetBrains as a senior software developer and a technology evangelist. He is also a board member of the JetBrains Academy. On the side, he's leading the GPars project, an opensource concurrency library, and investigates the domains of neural networks, evolutionary programming and data mining. You can check out his blog or follow him on twitter. [dzone] Václav is a DZone Zone Leader and has posted 45 posts at DZone. View Full User Profile

Concurrency in JSR-166y Meets Groovy: Process Collections in Parallel

  • submit to reddit

How about parallel processing of elements stored in one of the java.util.* collections? Could we make the Groovy each(), collect() and such methods leverage multiple threads to speed them up? In fact, it is very easy.

I wrote briefly about the new concurrency enhancements planned for Java 7 under JSR-166y earlier. I'm still happily experimenting with the library and here's how you could seamlessly employ ParallelArrays to speed up your Groovy collections. Thanks to Groovy meta-programming capabilities we will replace some of the original GDK methods on collections with parallel implementations.

It'll be a short journey, the target is only two steps away.

1. Create the JSR-166y thread pool and a helper method that will create Parallel Arrays for us

//Create a pool with size close to the number of processor's cores
final ForkJoinPool pool = new ForkJoinPool(2)

private ParallelArray createParallelArray(pool, collection) {
return ParallelArray.createFromCopy(
collection.toArray(new Object[collection.size()]), pool)

2. Enhance the required collection classes with parallel methods using meta-programing

We can either introduce new methods, something like eachAsync(), collectAsync(), etc., or replace directly the original GDK methods like each(), collect() and such. I'll go the second way in the example and replace the GDK methods on the ArrayList class, although replacing them on individual instances of the class instead would probably be more practical in reality.

//Enhance ArrayLists with a new method to process collect in parallel
ArrayList.metaClass.collect = {Closure cl ->
createParallelArray(pool, delegate).
withMapping({cl(it)} as Op).all().asList()

//Enhance ArrayLists with a new method to find matching objects in parallel
ArrayList.metaClass.findAll = {Closure cl ->
createParallelArray(pool, delegate).
withFilter({cl(it)} as Predicate).all().asList()

The magic happens now 

Although nothing needs to change in our code, the collections now use parallelism under the covers.

def sites=[

def groovySites = sites.findAll {new URL(it).text.toLowerCase().contains('groovy')}
println "These sites talk about Groovy today: ${groovySites}"

Or if you need to process complex images in some way:

//Use the new parallel functionality
List images=loadImages()
List reformatedImages=images.collect {processImage(it)}

Did you count how many times we had to use the Thread class and the synchronized block in the multi-threaded code we've just written?

From http://jroller.com/vaclav/

Published at DZone with permission of its author, Václav Pech.


Artur Biesiadowski replied on Fri, 2008/09/26 - 5:07am

What will happen if exception is thrown while processing second url? How easy would be to add custom exception handler which would gather the errors while the rest of tasks would be processed normally or exits the program if there is an error?



Václav Pech replied on Fri, 2008/09/26 - 6:32am in response to: Artur Biesiadowski

Certainly you can handle exceptions in the closure directly:

 def groovySites = sites.findAll {
    try {
        return new URL(it).text.toLowerCase().contains('groovy')
    } catch (Exception e) {
        println("Site doesn't exist $it")

Apart from that you have an option to register an exception handler with the thread pool or specify a custom thread factory.

pool.setUncaughtExceptionHandler({final Thread t, final Throwable e ->
println "Exception: $e"} as Thread.UncaughtExceptionHandler)


Ray Krueger replied on Fri, 2008/09/26 - 8:24am

Nicely done. Very cool concept indeed.

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.