Talip has posted 10 posts at DZone. View Full User Profile

Hazelcast 1.2: Distributed ExecutorService

09.17.2008
| 9478 views |
  • submit to reddit

One of the coolest new features of Java 1.5 is the Executor framework, which allows you to asynchronously execute your tasks, logical units of works, such as database query, complex calculation, image rendering etc. So one nice way of executing such tasks would be running them asynchronously and doing other things meanwhile. When ready, get the result and move on. If execution of the task takes longer than expected, you may consider canceling the task execution. In Java Executor framework, tasks are implemented as java.util.concurrent.Callable and java.util.Runnable.

import java.util.concurrent.Callable;
import java.io.Serializable;

public class Echo implements Callable<String>, Serializable {
String input = null;
public Echo() {
super();
}
public Echo(String input) {
super();
this.input = input;
}
public String call() {
return Hazelcast.getCluster().getLocalMember().toString() + ":"
+ input;
}
}



Echo callable above, for instance, in its call() method, is returning the local member and the input passed in. Remember that Hazelcast.getCluster().getLocalMember() returns the local member and toString() returns the member's address (ip + port) in String form, just to see which member actually executed the code for our example. Of course, call() method can do and return anything you like.

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit (new Echo("myinput"));
//while it is executing, do some useful stuff
//when ready, get the result of your execution
String result = future.get();



Now it is time to execute the same code on cluster. You can ask Hazelcast to execute your code (Runnable, Callable):

  • on a specific cluster member you choose.
  • on the member owning the key you choose.
  • on the member Hazelcast will pick.
  • on all or subset of the cluster members.

 

import com.hazelcast.core.Member;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.MultiTask;
import com.hazelcast.core.DistributedTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Future;
import java.util.Set;

public void echoOnTheMember(String input, Member member) throws Exception {
FutureTask<String> task = new DistributedTask<String>(new Echo(input), member);
ExecutorService executorService = Hazelcast.getExecutorService();
executorService.execute(task);
String echoResult = task.get();
}

public void echoOnTheMemberOwningTheKey(String input, Object key) throws Exception {
FutureTask<String> task = new DistributedTask<String>(new Echo(input), key);
ExecutorService executorService = Hazelcast.getExecutorService();
executorService.execute(task);
String echoResult = task.get();
}

public void echoOnSomewhere(String input) throws Exception {
ExecutorService executorService = Hazelcast.getExecutorService();
Future<String> task = executorService.submit(new Echo(input));
String echoResult = task.get();
}

public void echoOnMembers(String input, Set<Member> members) throws Exception {
MultiTask<String> task = new MultiTask<String>(new Echo(input), members);
ExecutorService executorService = Hazelcast.getExecutorService();
executorService.execute(task);
Collection<String> results = task.get();
}

 

Simple enough? Note that you can obtain the set of cluster members via Hazelcast.getCluster().getMembers() call.

What if the code you execute in cluster takes longer than acceptable. If you cannot stop/cancel that task it will keep eating your resources. Standard Java executor framework solves this problem with by introducing cancel() api and 'encouraging' us to code and design for cancellations, which is highly ignored part of software development.

public class Fibonacci implements Callable, Serializable {
int input = 0;

public Fibonacci() {
}

public Fibonacci(int input) {
this.input = input;
}

public Long call() {
return calculate (input);
}

private long calculate (int n) {
if (Thread.currentThread().isInterrupted()) return 0;
if (n <= 1) return n;
else return calculate(n-1) + calculate(n-2);
}
}


The callable class above calculates the fibonacci number for a given number. In the calculate method, we are checking to see if the current thread is interrupted so that code can be responsive to cancellations once the execution started.

Following fib() method submits the Fibonacci calculation task for number 'n' and waits maximum 3 seconds for result. If the execution doesn't complete in 3 seconds, future.get() will throw TimeoutException and upon catching it we interruptibly cancel the execution for saving some CPU cycles.

long fib(int n) throws Exception {
ExecutorService es = Hazelcast.getExecutorService();
Future future = es.submit(new Fibonacci(n));
try {
return future.get(3, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
}
return -1;
}


fib(20) will probably will take less than 3 seconds but fib(50) will take way longer. (This is not the example for writing better fibonacci calculation code but for showing how to cancel a running execution that takes too long.)

future.cancel(false) can only cancel execution before it is running (executing) but future.cancel(true) can interrupt running executions if your code is able to handle the interruption. So if you are willing to be able to cancel already running task then your task has to be designed to handle interruption.

If calculate (int n) method didn't have if (Thread.currentThread().isInterrupted()) line, then you wouldn't be able to cancel the execution after it started.

As of 1.2 relase, Hazelcast is also a distributed ExecutorService. Hazelcast is highly available and scalable distributed (partitioned) implementations of

  • java.util.Queue
  • java.util.Map
  • java.util.Set
  • java.util.List
  • java.util.concurrent.Lock
  • java.util.concurrent.ExecutorService

It is super easy to work with Hazelcast. It is one single jar, doesn't require any configuration and is free. Download the zip, run the same application run.bat script 5 times to have 5 member cluster instantly. Simplicity guaranteed!

For more info, please visit http://www.hazelcast.com and http://jroller.com/talipozturk
0
Published at DZone with permission of its author, Talip Ozturk.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)