I’m a software engineer. I got my first computer when I was 7 and have loved programming ever since. I’ve been developing corporate web applications since 1999, mostly using Java based technologies, but I like to try and explore a little bit of everything interesting going around the web/mobile technology scene. I like hard rock, RPG video games (all flavors), science fiction and fantasy books, and almost all movies (unless they star Sandra Bullock or Hugh Grant). Ricardo is a DZone MVB and is not an employee of DZone and has posted 16 posts at DZone. You can read more from them at their website. View Full User Profile

Parallel Data Processing Strategies: (Almost) Always Use java.util.concurrent!

04.02.2012
| 6789 views |
  • submit to reddit

Version 5 of the Java platform introduced a high level concurrency API, located in the java.util.concurrent package.

It allows for a much elegant and intuitive multi-threaded programming. I know this is old news for some, but I have found that most programmers still rely on the Thread class and Runnable interface to solve most concurrent problems in Java, when almost all of them can be implemented in a much cleaner way using the new API.

In this post series I’ll provide several examples on the usage of java.util.concurrent classes to solve common challenges. Let’s start with a simple parallel solution for data processing.

Imagine you have a set of data elements and you need to perform some kind of processing over each one of them, you want to maximize the speed at which this processing is done, but, on the other hand you don’t want to hog every system resource available if the system is being used.

A good strategy would be to have a thread pool with a pre-defined number of maximum active threads that will process the data one item at a time as soon as the threads become available.

This strategy can be quickly implemented using a fixed thread pool executor service.

We can model our data processing task as a runnable:

package com.ricardozuasti;
 
public class DataProcessor implements Runnable {
    public DataProcessor(int data){
        this.data = data;
    }
 
    @Override
    public void run() {
        System.out.println("Processing data: " + data);
        // Data processing goes here
    }
 
    private int data;
}

And then use the utility Executors class to create a new thread pool service (with a given maximum active thread count) with the newFixedThreadPool(n) method. The returned service (which implements the ExecutorService interface) can be used by submitting new tasks to it using the execute() and submit() methods.

package com.ricardozuasti;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
 
public class Concurrency1 {
 
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
 
        for (int i = 0; i<100; i++){
            executor.execute(new DataProcessor(i));
        }
 
        System.out.println("Starting shutdown...");
        executor.shutdown();
 
        try {
            executor.awaitTermination(100, TimeUnit.SECONDS);
        } catch (InterruptedException ex) {
            System.out.println("Interrupted...");
        }
 
        System.out.println("All done!");
 
    }
}

After submitting all our work units to the ExecutorService we can instruct it to shutdown, this will not block our current working thread, nor avoid any previously submitted tasks from running, but only prevent new tasks from being passed onto the ExecutorService.

To actually wait for all the tasks to be done, we can use the awaitTermination() method.

And that’s it… it looks nice and clean, doesn’t it? :)

Check out the Executors API to see other kinds of executor services you can build right out of the box (cached and scheduled for example).

 

 

Published at DZone with permission of Ricardo Zuasti, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)