DevOps Zone is brought to you in partnership with:

I'm a software developer with over 20 years development experience (10 years with Smalltalk, more than 10 years with Java) working mostly on client/server applications for banks, planning systems or manufacturing systems. My favorite areas in computer science are concurrent and/or distributed systems. I like to have a look at things such as Scala, JavaSpaces, Hadoop, actors, STM, and things alike. Oliver has posted 3 posts at DZone. You can read more from them at their website. View Full User Profile

Go-style Goroutines in Java and Scala Using HawtDispatch

06.23.2013
| 7126 views |
  • submit to reddit

In Google Go any method or closure can be run asynchronously when prefixed with the keyword go. According to the documentation "(...) they're called goroutines because the existing terms—threads, coroutines, processes, and so on—convey inaccurate connotations" (see reference). For that reason I also stick to the term goroutine. Goroutines are the recommended method for concurrent programming in Go. They are lightweight and you can easily create thousands of them. To make this efficient goroutines are multiplexed onto multiple OS threads. Networking and concurrency is really what Go is about.

This blog describes how to make use of HawtDispatch to achieve a very similar result in Java. HawtDispatch is a thread pooling and NIO event notification framework, which does the thread multiplexing that in Go is built into the language. There is also a Scala version of HawtDispatch. So the approach described here for Java can be applied in the same way in Scala. The code shown in this blog can be downloaded here from GitHub (includes a maven pom.xml to get HawtDispatch installed). Go provides channels as a means for goroutines to exchange information. We can model channels in Java through JDK5 BlockingQueues.

Let's have a look at some Go code that makes use of goroutines and channels (the sample code is shamelessly stolen from this article, see the chapter named "Channels"):

	

ch := make(chan int)

go func() {
  result := 0
  for i := 0; i < 100000000; i++ {
    result = result + i
  }
  ch <- result
}()

/* Do something for a while */

sum := <-ch // This will block if the calculation is not done yet
fmt.Println("The sum is: ", sum)

Making use of JDK8 default methods we can define in our Java world something like a keyword go. For that purpose I created one named async (using pre-JDK8 we would have to stick to little less elegant static methods):

public interface AsyncUtils {
    default public void async(Runnable runnable) {
        Dispatch.getGlobalQueue().execute(runnable);
    }
}

The async method will execute Runnables on a random thread of a fixed size thread pool. If you wanted to implement something like actors using HawtDispatchyou would use serial dispatch queues. Here is a simplistic actor implemented using HawtDispatch (with queueing being serial through the use of the queue class DispatchQueue):

	

public class HelloWorldActor {

    private DispatchQueue queue = Dispatch.createQueue()

    public void sayHello() {
        queue.execute(()->{ System.out.println("hello world!"); });
    }

    public static void main(String[] args) {
        HelloWorldActor actor = new HelloWorldActor();
        actor.sayHello(); // asynchronously prints "hello world" 
    }
}

To be precise the HelloWorldActor in the snippet above is more of an active object as functions are scheduled rather than messages as with actors. This little actor sample was shown to demonstrate that you can do much more with HawtDispatch than just running methods asynchronously. Now it is getting time to implement the sample in Go in Java with what we have built up so far. Here we go:

	public class GoroutineTest implements AsyncUtils {  
 
    @Test
    public void sumAsync() throws InterruptedException
    {
        BlockingQueue<Integer> channel = new LinkedBlockingQueue<>();

        async(()->
        {
            int result = 0;
            for(int i = 0; i < 100000000; i++) {
                result = result + i;
            }
            channel.add(result);
        });

        /* Do something for a while */

        int sum = channel.take();
        System.out.println("The sum is: " + sum);
    }    

    @After
    public void tearDown() throws InterruptedException {
        DispatcherConfig.getDefaultDispatcher().shutdown();
    }
}

The code presented here would also work with pre-JDK8 since JDK8 is not a requirement for HawtDispatch. I just preferred to make use of JDK8 lambdas and defender methods to get the sample code more compact.


Published at DZone with permission of its author, Oliver Plohmann.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)