Eren Avşaroğulları is highly motivated open source developer and enthusiast on Java and related open source technologies. He has over 8 years of professional software design and development experiences in different domains such as telecommunication, finance and control & automation. His areas of interest are Web Frameworks, Distributed and Paralel Computing, High-Performance Systems, High Availability and Scalability. His current focus is Large-Scaled & High Available Enterprise Applications and GSM Network Platform & Service Design and Developments. He writes on the open source technologies. He hold a B.Sc. degree in Electrical & Electronics Engineering and a M.Sc. degree in Control & Automation Engineering. Eren is a DZone MVB and is not an employee of DZone and has posted 9 posts at DZone. You can read more from them at their website. View Full User Profile

Hazelcast Distributed Execution with Spring

12.11.2012
| 10514 views |
  • submit to reddit

The ExecutorService feature had come with Java 5 and is under the java.util.concurrent package. It extends the Executor interface and provides a thread pool functionality to execute asynchronous short tasks. Java Executor Service Types is suggested to look over basic ExecutorService implementation.

Also ThreadPoolExecutor is a very useful implementation of ExecutorService ınterface. It extends AbstractExecutorService providing default implementations of ExecutorService execution methods. It provides improved performance when executing large numbers of asynchronous tasks and maintains basic statistics, such as the number of completed tasks. How to develop and monitor Thread Pool Services by using Spring is also suggested to investigate how to develop and monitor Thread Pool Services.

So far, we have just talked Undistributed Executor Service implementation. Let us also investigate Distributed Executor Service.

Hazelcast Distributed Executor Service feature is a distributed implementation of java.util.concurrent.ExecutorService. It allows to execute business logic in cluster. There are four alternative ways to realize it :

1) The logic can be executed on a specific cluster member which is chosen.
2) The logic can be executed on the member owning the key which is chosen.
3) The logic can be executed on the member Hazelcast will pick.
4) The logic can be executed on all or subset of the cluster members.

This article shows how to develop Distributed Executor Service via Hazelcast and Spring.

Used Technologies :

JDK 1.7.0_09
Spring 3.1.3
Hazelcast 2.4
Maven 3.0.4

STEP 1 : CREATE MAVEN PROJECT

A maven project is created as below. (It can be created by using Maven or IDE Plug-in).

STEP 2 : LIBRARIES

Firstly, Spring dependencies are added to Maven’ s pom.xml

<properties>
    <spring.version>3.1.3.RELEASE</spring.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
 
<dependencies>
    <!-- Spring 3 dependencies -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>${spring.version}</version>
    </dependency>
 
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
 
    <!-- Hazelcast library -->
    <dependency>
        <groupId>com.hazelcast</groupId>
        <artifactId>hazelcast-all</artifactId>
        <version>2.4</version>
    </dependency>
 
    <!-- Log4j library -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.16</version>
    </dependency>
</dependencies>

maven-compiler-plugin(Maven Plugin) is used to compile the project with JDK 1.7

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.0</version>
    <configuration>
      <source>1.7</source>
      <target>1.7</target>
    </configuration>
</plugin>

maven-shade-plugin(Maven Plugin) can be used to create runnable-jar

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.0</version>
 
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>com.onlinetechvision.exe.Application</mainClass>
                    </transformer>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.handlers</resource>
                    </transformer>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.schemas</resource>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>


STEP 3 : CREATE Customer BEAN

A new Customer bean is created. This bean will be distributed between two node in OTV cluster. In the following sample, all defined properties(id, name and surname)’ types are String and standart java.io.Serializable interface has been implemented for serializing. If custom or third-party object types are used, com.hazelcast.nio.DataSerializable interface can be implemented for better serialization performance.

package com.onlinetechvision.customer;
 
import java.io.Serializable;
 
/**
 * Customer Bean.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Customer implements Serializable {
 
    private static final long serialVersionUID = 1856862670651243395L;
 
    private String id;
    private String name;
    private String surname;
 
    public String getId() {
        return id;
    }
 
    public void setId(String id) {
        this.id = id;
    }
 
    public String getName() {
        return name;
    }
 
    public void setName(String name) {
        this.name = name;
    }
 
    public String getSurname() {
        return surname;
    }
 
    public void setSurname(String surname) {
        this.surname = surname;
    }
 
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((id == null) ? 0 : id.hashCode());
        result = prime * result + ((name == null) ? 0 : name.hashCode());
        result = prime * result + ((surname == null) ? 0 : surname.hashCode());
        return result;
    }
 
    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        Customer other = (Customer) obj;
        if (id == null) {
            if (other.id != null)
                return false;
        } else if (!id.equals(other.id))
            return false;
        if (name == null) {
            if (other.name != null)
                return false;
        } else if (!name.equals(other.name))
            return false;
        if (surname == null) {
            if (other.surname != null)
                return false;
        } else if (!surname.equals(other.surname))
            return false;
        return true;
    }
 
    @Override
    public String toString() {
        return "Customer [id=" + id + ", name=" + name + ", surname=" + surname + "]";
    }
 
}

STEP 4 : CREATE ICacheService INTERFACE

A new ICacheService Interface is created for service layer to expose cache functionality.

package com.onlinetechvision.cache.srv;
 
import com.hazelcast.core.IMap;
import com.onlinetechvision.customer.Customer;
 
/**
 * A new ICacheService Interface is created for service layer to expose cache functionality.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public interface ICacheService {
 
    /**
     * Adds Customer entries to cache
     *
     * @param String key
     * @param Customer customer
     *
     */
    void addToCache(String key, Customer customer);
 
    /**
     * Deletes Customer entries from cache
     *
     * @param String key
     *
     */
    void deleteFromCache(String key);
 
    /**
     * Gets Customer cache
     *
     * @return IMap Coherence named cache
     */
    IMap<String, Customer> getCache();
}

STEP 5 : CREATE CacheService IMPLEMENTATION

CacheService is implementation of ICacheService Interface.

package com.onlinetechvision.cache.srv;
 
import com.hazelcast.core.IMap;
import com.onlinetechvision.customer.Customer;
import com.onlinetechvision.test.listener.CustomerEntryListener;
 
/**
 * CacheService Class is implementation of ICacheService Interface.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class CacheService implements ICacheService {
 
    private IMap<String, Customer> customerMap;
 
    /**
     * Constructor of CacheService
     *
     * @param IMap customerMap
     *
     */
    @SuppressWarnings("unchecked")
    public CacheService(IMap<String, Customer> customerMap) {
        setCustomerMap(customerMap);
        getCustomerMap().addEntryListener(new CustomerEntryListener(), true);
    }
 
    /**
     * Adds Customer entries to cache
     *
     * @param String key
     * @param Customer customer
     *
     */
    @Override
    public void addToCache(String key, Customer customer) {
        getCustomerMap().put(key, customer);
    }
 
    /**
     * Deletes Customer entries from cache
     *
     * @param String key
     *
     */
    @Override
    public void deleteFromCache(String key) {
        getCustomerMap().remove(key);
    }
 
    /**
     * Gets Customer cache
     *
     * @return IMap Coherence named cache
     */
    @Override
    public IMap<String, Customer> getCache() {
        return getCustomerMap();
    }
 
    public IMap<String, Customer> getCustomerMap() {
        return customerMap;
    }
 
    public void setCustomerMap(IMap<String, Customer> customerMap) {
        this.customerMap = customerMap;
    }
 
}

STEP 6 : CREATE IDistributedExecutorService INTERFACE

A new IDistributedExecutorService Interface is created for service layer to expose distributed execution functionality.

package com.onlinetechvision.executor.srv;
 
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
 
import com.hazelcast.core.Member;
 
/**
 * A new IDistributedExecutorService Interface is created for service layer to expose distributed execution functionality.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public interface IDistributedExecutorService {
 
    /**
     * Executes the callable object on stated member
     *
     * @param Callable callable
     * @param Member member
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    String executeOnStatedMember(Callable<String> callable, Member member) throws InterruptedException, ExecutionException;
 
    /**
     * Executes the callable object on member owning the key
     *
     * @param Callable callable
     * @param Object key
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    String executeOnTheMemberOwningTheKey(Callable<String> callable, Object key) throws InterruptedException, ExecutionException;
 
    /**
     * Executes the callable object on any member
     *
     * @param Callable callable
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    String executeOnAnyMember(Callable<String> callable) throws InterruptedException, ExecutionException;
 
    /**
     * Executes the callable object on all members
     *
     * @param Callable callable
     * @param Set all members
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    Collection<String> executeOnMembers(Callable<String> callable, Set<Member> members) throws InterruptedException, ExecutionException;
}

STEP 7 : CREATE DistributedExecutorService IMPLEMENTATION

DistributedExecutorService is implementation of IDistributedExecutorService Interface.

package com.onlinetechvision.executor.srv;
 
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
 
import org.apache.log4j.Logger;
 
import com.hazelcast.core.DistributedTask;
import com.hazelcast.core.Member;
import com.hazelcast.core.MultiTask;
 
/**
 * DistributedExecutorService Class is implementation of IDistributedExecutorService Interface.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class DistributedExecutorService implements IDistributedExecutorService {
 
    private static final Logger logger = Logger.getLogger(DistributedExecutorService.class);
 
    private ExecutorService hazelcastDistributedExecutorService;
 
    /**
     * Executes the callable object on stated member
     *
     * @param Callable callable
     * @param Member member
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    @SuppressWarnings("unchecked")
    public String executeOnStatedMember(Callable<String> callable, Member member) throws InterruptedException, ExecutionException {
        logger.debug("Method executeOnStatedMember is called...");
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        FutureTask<String> task = (FutureTask<String>) executorService.submit( new DistributedTask<String>(callable, member));
        String result = task.get();
        logger.debug("Result of method executeOnStatedMember is : " + result);
        return result;
    }
 
    /**
     * Executes the callable object on member owning the key
     *
     * @param Callable callable
     * @param Object key
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    @SuppressWarnings("unchecked")
    public String executeOnTheMemberOwningTheKey(Callable<String> callable, Object key) throws InterruptedException, ExecutionException {
        logger.debug("Method executeOnTheMemberOwningTheKey is called...");
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        FutureTask<String> task = (FutureTask<String>) executorService.submit(new DistributedTask<String>(callable, key));
        String result = task.get();
        logger.debug("Result of method executeOnTheMemberOwningTheKey is : " + result);
        return result;
    }
 
    /**
     * Executes the callable object on any member
     *
     * @param Callable callable
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    public String executeOnAnyMember(Callable<String> callable) throws InterruptedException, ExecutionException {
        logger.debug("Method executeOnAnyMember is called...");
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        Future<String> task = executorService.submit(callable);
        String result = task.get();
        logger.debug("Result of method executeOnAnyMember is : " + result);
        return result;
    }
 
    /**
     * Executes the callable object on all members
     *
     * @param Callable callable
     * @param Set all members
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    public Collection<String> executeOnMembers(Callable<String> callable, Set<Member> members) throws ExecutionException, InterruptedException {
        logger.debug("Method executeOnMembers is called...");
        MultiTask<String> task = new MultiTask<String>(callable, members);
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        executorService.execute(task);
        Collection<String> results = task.get();
        logger.debug("Result of method executeOnMembers is : " + results.toString());
        return results;
    }
 
    public ExecutorService getHazelcastDistributedExecutorService() {
        return hazelcastDistributedExecutorService;
    }
 
    public void setHazelcastDistributedExecutorService(ExecutorService hazelcastDistributedExecutorService) {
        this.hazelcastDistributedExecutorService = hazelcastDistributedExecutorService;
    }
 
}

STEP 8 : CREATE TestCallable CLASS

TestCallable Class shows business logic to be executed.

TestCallable task for first member of the cluster :

package com.onlinetechvision.task;
 
import java.io.Serializable;
import java.util.concurrent.Callable;
 
/**
 * TestCallable Class shows business logic to be executed.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class TestCallable implements Callable<String>, Serializable{
 
    private static final long serialVersionUID = -1839169907337151877L;
 
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return String computed result
     * @throws Exception if unable to compute a result
     */
    public String call() throws Exception {
        return "First Member' s TestCallable Task is called...";
    }
 
}

TestCallable task for second member of the cluster :

package com.onlinetechvision.task;
 
import java.io.Serializable;
import java.util.concurrent.Callable;
 
/**
 * TestCallable Class shows business logic to be executed.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class TestCallable implements Callable<String>, Serializable{
 
    private static final long serialVersionUID = -1839169907337151877L;
 
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return String computed result
     * @throws Exception if unable to compute a result
     */
    public String call() throws Exception {
        return "Second Member' s TestCallable Task is called...";
    }
 
}


STEP 9 : CREATE AnotherAvailableMemberNotFoundException CLASS

AnotherAvailableMemberNotFoundException is thrown when another available member is not found. To avoid this exception, first node should be started before the second node.

package com.onlinetechvision.exception;
 
/**
 * AnotherAvailableMemberNotFoundException is thrown when another available member is not found.
 * To avoid this exception, first node should be started before the second node.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class AnotherAvailableMemberNotFoundException extends Exception {
 
    private static final long serialVersionUID = -3954360266393077645L;
 
    /**
     * Constructor of AnotherAvailableMemberNotFoundException
     *
     * @param  String Exception message
     *
     */
    public AnotherAvailableMemberNotFoundException(String message) {
        super(message);
    }
 
}

STEP 10 : CREATE CustomerEntryListener CLASS

CustomerEntryListener Class listens entry changes on named cache object.

package com.onlinetechvision.test.listener;
 
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
 
/**
 * CustomerEntryListener Class listens entry changes on named cache object.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
@SuppressWarnings("rawtypes")
public class CustomerEntryListener implements EntryListener {
 
    /**
     * Invoked when an entry is added.
     *
     * @param EntryEvent
     *
     */
    public void entryAdded(EntryEvent ee) {
        System.out.println("EntryAdded... Member : " + ee.getMember() + ", Key : "+ee.getKey()+", OldValue : "+ee.getOldValue()+", NewValue : "+ee.getValue());
    }
 
    /**
     * Invoked when an entry is removed.
     *
     * @param EntryEvent
     *
     */
    public void entryRemoved(EntryEvent ee) {
        System.out.println("EntryRemoved... Member : " + ee.getMember() + ", Key : "+ee.getKey()+", OldValue : "+ee.getOldValue()+", NewValue : "+ee.getValue());
    }
 
    /**
     * Invoked when an entry is evicted.
     *
     * @param EntryEvent
     *
     */
    public void entryEvicted(EntryEvent ee) {
 
    }  
 
    /**
     * Invoked when an entry is updated.
     *
     * @param EntryEvent
     *
     */
    public void entryUpdated(EntryEvent ee) {
 
    }
 
}

STEP 11 : CREATE Starter CLASS

Starter Class loads Customers to cache and executes distributed tasks.

Starter Class of first member of the cluster :

package com.onlinetechvision.exe;
 
import com.onlinetechvision.cache.srv.ICacheService;
import com.onlinetechvision.customer.Customer;
 
/**
 * Starter Class loads Customers to cache and executes distributed tasks.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Starter {
 
    private ICacheService cacheService;
 
    /**
     * Loads cache and executes the tasks
     *
     */
    public void start() {
        loadCacheForFirstMember();
    }
 
    /**
     * Loads Customers to cache
     *
     */
    public void loadCacheForFirstMember() {
        Customer firstCustomer = new Customer();
        firstCustomer.setId("1");
        firstCustomer.setName("Jodie");
        firstCustomer.setSurname("Foster");
 
        Customer secondCustomer = new Customer();
        secondCustomer.setId("2");
        secondCustomer.setName("Kate");
        secondCustomer.setSurname("Winslet");
 
        getCacheService().addToCache(firstCustomer.getId(), firstCustomer);
        getCacheService().addToCache(secondCustomer.getId(), secondCustomer);
    }
 
    public ICacheService getCacheService() {
        return cacheService;
    }
 
    public void setCacheService(ICacheService cacheService) {
        this.cacheService = cacheService;
    }
 
}


Starter Class of second member of the cluster :

package com.onlinetechvision.exe;
 
import java.util.Set;
import java.util.concurrent.ExecutionException;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member;
import com.onlinetechvision.cache.srv.ICacheService;
import com.onlinetechvision.customer.Customer;
import com.onlinetechvision.exception.AnotherAvailableMemberNotFoundException;
import com.onlinetechvision.executor.srv.IDistributedExecutorService;
import com.onlinetechvision.task.TestCallable;
 
/**
 * Starter Class loads Customers to cache and executes distributed tasks.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Starter {
 
    private String hazelcastInstanceName;
    private Hazelcast hazelcast;
    private IDistributedExecutorService distributedExecutorService;
    private ICacheService cacheService;
 
    /**
     * Loads cache and executes the tasks
     *
     */
    public void start() {
        loadCache();
        executeTasks();
    }
 
    /**
     * Loads Customers to cache
     *
     */
    public void loadCache() {
        Customer firstCustomer = new Customer();
        firstCustomer.setId("3");
        firstCustomer.setName("Bruce");
        firstCustomer.setSurname("Willis");
 
        Customer secondCustomer = new Customer();
        secondCustomer.setId("4");
        secondCustomer.setName("Colin");
        secondCustomer.setSurname("Farrell");
 
        getCacheService().addToCache(firstCustomer.getId(), firstCustomer);
        getCacheService().addToCache(secondCustomer.getId(), secondCustomer);
    }
 
    /**
     * Executes Tasks
     *
     */
    public void executeTasks() {
        try {
            getDistributedExecutorService().executeOnStatedMember(new TestCallable(), getAnotherMember());
            getDistributedExecutorService().executeOnTheMemberOwningTheKey(new TestCallable(), "3");
            getDistributedExecutorService().executeOnAnyMember(new TestCallable());
            getDistributedExecutorService().executeOnMembers(new TestCallable(), getAllMembers());
        } catch (InterruptedException | ExecutionException | AnotherAvailableMemberNotFoundException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * Gets cluster members
     *
     * @return Set<Member> Set of Cluster Members
     *
     */
    private Set<Member> getAllMembers() {
        Set<Member> members = getHazelcastLocalInstance().getCluster().getMembers();
 
        return members;
    }
 
    /**
     * Gets an another member of cluster
     *
     * @return Member Another Member of Cluster
     * @throws AnotherAvailableMemberNotFoundException An Another Available Member can not found exception
     */
    private Member getAnotherMember() throws AnotherAvailableMemberNotFoundException {
        Set<Member> members = getAllMembers();
        for(Member member : members) {
            if(!member.localMember()) {
                return member;
            }
        }
 
        throw new AnotherAvailableMemberNotFoundException("No Other Available Member on the cluster. Please be aware that all members are active on the cluster");
    }
 
    /**
     * Gets Hazelcast local instance
     *
     * @return HazelcastInstance Hazelcast local instance
     */
    @SuppressWarnings("static-access")
    private HazelcastInstance getHazelcastLocalInstance() {
        HazelcastInstance instance = getHazelcast().getHazelcastInstanceByName(getHazelcastInstanceName());
        return instance;
    }
 
    public String getHazelcastInstanceName() {
        return hazelcastInstanceName;
    }
 
    public void setHazelcastInstanceName(String hazelcastInstanceName) {
        this.hazelcastInstanceName = hazelcastInstanceName;
    }
 
    public Hazelcast getHazelcast() {
        return hazelcast;
    }
 
    public void setHazelcast(Hazelcast hazelcast) {
        this.hazelcast = hazelcast;
    }
 
    public IDistributedExecutorService getDistributedExecutorService() {
        return distributedExecutorService;
    }
 
    public void setDistributedExecutorService(IDistributedExecutorService distributedExecutorService) {
        this.distributedExecutorService = distributedExecutorService;
    }
 
    public ICacheService getCacheService() {
        return cacheService;
    }
 
    public void setCacheService(ICacheService cacheService) {
        this.cacheService = cacheService;
    }
 
}


STEP 12 : CREATE hazelcast-config.properties FILE

hazelcast-config.properties file shows the properties of cluster members.

First member properties :

hz.instance.name = OTVInstance1
 
hz.group.name = dev
hz.group.password = dev
 
hz.management.center.enabled = true
hz.management.center.url = http://localhost:8080/mancenter
 
hz.network.port = 5701
hz.network.port.auto.increment = false
 
hz.tcp.ip.enabled = true
 
hz.members = 192.168.1.32
 
hz.executor.service.core.pool.size = 2
hz.executor.service.max.pool.size = 30
hz.executor.service.keep.alive.seconds = 30
 
hz.map.backup.count=2
hz.map.max.size=0
hz.map.eviction.percentage=30
hz.map.read.backup.data=true
hz.map.cache.value=true
hz.map.eviction.policy=NONE
hz.map.merge.policy=hz.ADD_NEW_ENTRY

Second member properties :

hz.instance.name = OTVInstance2
 
hz.group.name = dev
hz.group.password = dev
 
hz.management.center.enabled = true
hz.management.center.url = http://localhost:8080/mancenter
 
hz.network.port = 5702
hz.network.port.auto.increment = false
 
hz.tcp.ip.enabled = true
 
hz.members = 192.168.1.32
 
hz.executor.service.core.pool.size = 2
hz.executor.service.max.pool.size = 30
hz.executor.service.keep.alive.seconds = 30
 
hz.map.backup.count=2
hz.map.max.size=0
hz.map.eviction.percentage=30
hz.map.read.backup.data=true
hz.map.cache.value=true
hz.map.eviction.policy=NONE
hz.map.merge.policy=hz.ADD_NEW_ENTRY


STEP 13 : CREATE applicationContext-hazelcast.xml

Spring Hazelcast Configuration file, applicationContext-hazelcast.xml, is created and Hazelcast Distributed Executor Service and Hazelcast Instance are configured.

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:hz="http://www.hazelcast.com/schema/spring"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
 
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 
http://www.hazelcast.com/schema/spring
 
http://www.hazelcast.com/schema/spring/hazelcast-spring-2.4.xsd">
 
    <hz:map id="customerMap" name="customerMap" instance-ref="instance"/>
 
    <!-- Hazelcast Distributed Executor Service definition -->
    <hz:executorService id="hazelcastDistributedExecutorService" instance-ref="instance" name="hazelcastDistributedExecutorService" />
 
    <!-- Hazelcast Instance configuration -->
    <hz:hazelcast id="instance">
        <hz:config>
 
            <!-- Hazelcast Instance Name -->
            <hz:instance-name>${hz.instance.name}</hz:instance-name>
 
        <!-- Hazelcast Group Name and Password -->
        <hz:group name="${hz.group.name}" password="${hz.group.password}"/>
 
                <!-- Hazelcast Management Center URL -->
            <hz:management-center  enabled="${hz.management.center.enabled}" url="${hz.management.center.url}"/>
 
            <!-- Hazelcast Tcp based network configuration -->
            <hz:network port="${hz.network.port}" port-auto-increment="${hz.network.port.auto.increment}">
                <hz:join>
                    <hz:tcp-ip enabled="${hz.tcp.ip.enabled}">
                        <hz:members>${hz.members}</hz:members>
                    </hz:tcp-ip>
                </hz:join>
            </hz:network>
 
            <!-- Hazelcast Distributed Executor Service configuration -->
            <hz:executor-service name="executorService"
                                 core-pool-size="${hz.executor.service.core.pool.size}"
                                 max-pool-size="${hz.executor.service.max.pool.size}"
                                 keep-alive-seconds="${hz.executor.service.keep.alive.seconds}"/>
 
            <!-- Hazelcast Distributed Map configuration -->
            <hz:map name="map"
                backup-count="${hz.map.backup.count}"
                max-size="${hz.map.max.size}"
                eviction-percentage="${hz.map.eviction.percentage}"
                read-backup-data="${hz.map.read.backup.data}"
                cache-value="${hz.map.cache.value}"
                eviction-policy="${hz.map.eviction.policy}"
                merge-policy="${hz.map.merge.policy}"  />
 
        </hz:config>
 
    </hz:hazelcast>  
 
</beans>

STEP 14 : CREATE applicationContext.xml

Spring Configuration file, applicationContext.xml, is created.

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:hz="http://www.hazelcast.com/schema/spring"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
 
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
 
    <import resource="classpath:applicationContext-hazelcast.xml" />
 
    <!-- Beans Declaration -->
    <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:/hazelcast-config.properties</value>
            </list>
        </property>
    </bean>
 
    <bean id="cacheService" class="com.onlinetechvision.cache.srv.CacheService">
        <constructor-arg ref="customerMap"/>
    </bean>
 
    <bean id="distributedExecutorService" class="com.onlinetechvision.executor.srv.DistributedExecutorService">
        <property name="hazelcastDistributedExecutorService" ref="hazelcastDistributedExecutorService" />
    </bean>
 
    <bean id="hazelcast" class="com.hazelcast.core.Hazelcast"/>
 
    <bean id="starter" class="com.onlinetechvision.exe.Starter">
        <property name="hazelcastInstanceName" value="${hz.instance.name}" />
        <property name="hazelcast" ref="hazelcast" />
        <property name="distributedExecutorService" ref="distributedExecutorService" />
        <property name="cacheService" ref="cacheService" />
    </bean>
</beans>

STEP 15 : CREATE Application CLASS

Application Class is created to run the application.

ackage com.onlinetechvision.exe;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
/**
 * Application class starts the application
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Application {
 
    /**
     * Starts the application
     *
     * @param  String[] args
     *
     */
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        Starter starter = (Starter) context.getBean("starter");
        starter.start();
    }
 
}

STEP 16 : BUILD PROJECT

After OTV_Spring_Hazelcast_DistributedExecution Project is built, OTV_Spring_Hazelcast_DistributedExecution-0.0.1-SNAPSHOT.jar will be created.
Important Note : The Members of the cluster have got different configuration for Coherence so the project should be built separately for each member.

STEP 17 : INTEGRATION with HAZELCAST MANAGEMENT CENTER

Hazelcast Management Center enables to monitor and manage nodes in the cluster.

Entity and backup counts which are owned by customerMap, can be seen via Map Memory Data Table. We have distributed 4 entries via customerMap as shown below :

Sample keys and values can be seen via Map Browser :

Added First Entry :

Added Third Entry :

hazelcastDistributedExecutorService details can be seen via Executors tab. We have executed 3 task on first member and 2 tasks on second member as shown below :

STEP 18 : RUN PROJECT BY STARTING THE CLUSTER’ s MEMBER

After created OTV_Spring_Hazelcast_DistributedExecution-0.0.1-SNAPSHOT.jar file is run at the cluster’ s members, the following console output logs will be shown :

First member console output :

Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker
INFO: Interfaces is disabled, trying to pick one address from TCP-IP config addresses: [x.y.z.t]
Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker
INFO: Prefer IPv4 stack is true.
Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker
INFO: Picked Address[x.y.z.t]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
Kas 25, 2012 4:07:21 PM com.hazelcast.system
INFO: [x.y.z.t]:5701 [dev] Hazelcast Community Edition 2.4 (20121017) starting at Address[x.y.z.t]:5701
Kas 25, 2012 4:07:21 PM com.hazelcast.system
INFO: [x.y.z.t]:5701 [dev] Copyright (C) 2008-2012 Hazelcast.com
Kas 25, 2012 4:07:21 PM com.hazelcast.impl.LifecycleServiceImpl
INFO: [x.y.z.t]:5701 [dev] Address[x.y.z.t]:5701 is STARTING
Kas 25, 2012 4:07:24 PM com.hazelcast.impl.TcpIpJoiner
INFO: [x.y.z.t]:5701 [dev]
--A new cluster is created and First Member joins the cluster.
Members [1] {
    Member [x.y.z.t]:5701 this
}
 
Kas 25, 2012 4:07:24 PM com.hazelcast.impl.MulticastJoiner
INFO: [x.y.z.t]:5701 [dev]
 
Members [1] {
    Member [x.y.z.t]:5701 this
}
 
...
-- First member adds two new entries to the cache...
EntryAdded... Member : Member [x.y.z.t]:5701 this, Key : 1, OldValue : null, NewValue : Customer [id=1, name=Jodie, surname=Foster]
EntryAdded... Member : Member [x.y.z.t]:5701 this, Key : 2, OldValue : null, NewValue : Customer [id=2, name=Kate, surname=Winslet]
 
...
--Second Member joins the cluster.
Members [2] {
    Member [x.y.z.t]:5701 this
    Member [x.y.z.t]:5702
}
 
...
-- Second member adds two new entries to the cache...
EntryAdded... Member : Member [x.y.z.t]:5702, Key : 4, OldValue : null, NewValue : Customer [id=4, name=Colin, surname=Farrell]
EntryAdded... Member : Member [x.y.z.t]:5702, Key : 3, OldValue : null, NewValue : Customer [id=3, name=Bruce, surname=Willis]


Second member console output :

Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker
INFO: Interfaces is disabled, trying to pick one address from TCP-IP config addresses: [x.y.z.t]
Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker
INFO: Prefer IPv4 stack is true.
Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker
INFO: Picked Address[x.y.z.t]:5702, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5702], bind any local is true
Kas 25, 2012 4:07:49 PM com.hazelcast.system
INFO: [x.y.z.t]:5702 [dev] Hazelcast Community Edition 2.4 (20121017) starting at Address[x.y.z.t]:5702
Kas 25, 2012 4:07:49 PM com.hazelcast.system
INFO: [x.y.z.t]:5702 [dev] Copyright (C) 2008-2012 Hazelcast.com
Kas 25, 2012 4:07:49 PM com.hazelcast.impl.LifecycleServiceImpl
INFO: [x.y.z.t]:5702 [dev] Address[x.y.z.t]:5702 is STARTING
Kas 25, 2012 4:07:49 PM com.hazelcast.impl.Node
INFO: [x.y.z.t]:5702 [dev] ** setting master address to Address[x.y.z.t]:5701
Kas 25, 2012 4:07:49 PM com.hazelcast.impl.MulticastJoiner
INFO: [x.y.z.t]:5702 [dev] Connecting to master node: Address[x.y.z.t]:5701
Kas 25, 2012 4:07:49 PM com.hazelcast.nio.ConnectionManager
INFO: [x.y.z.t]:5702 [dev] 55715 accepted socket connection from /x.y.z.t:5701
Kas 25, 2012 4:07:55 PM com.hazelcast.cluster.ClusterManager
INFO: [x.y.z.t]:5702 [dev]
--Second Member joins the cluster.
Members [2] {
    Member [x.y.z.t]:5701
    Member [x.y.z.t]:5702 this
}
 
Kas 25, 2012 4:07:56 PM com.hazelcast.impl.LifecycleServiceImpl
INFO: [x.y.z.t]:5702 [dev] Address[x.y.z.t]:5702 is STARTED
-- Second member adds two new entries to the cache...
EntryAdded... Member : Member [x.y.z.t]:5702 this, Key : 3, OldValue : null, NewValue : Customer [id=3, name=Bruce, surname=Willis]
EntryAdded... Member : Member [x.y.z.t]:5702 this, Key : 4, OldValue : null, NewValue : Customer [id=4, name=Colin, surname=Farrell]
 
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:42) - Method executeOnStatedMember is called...
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:46) - Result of method executeOnStatedMember is : First Member' s TestCallable Task is called...
 
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:61) - Method executeOnTheMemberOwningTheKey is called...
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:65) - Result of method executeOnTheMemberOwningTheKey is : First Member' s TestCallable Task is called...
 
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:78) - Method executeOnAnyMember is called...
25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:82) - Result of method executeOnAnyMember is : Second Member' s TestCallable Task is called...
 
25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:96) - Method executeOnMembers is called...
25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:101) - Result of method executeOnMembers is : [First Member' s TestCallable Task is called..., Second Member' s TestCallable Task is called...]


STEP 19 : DOWNLOAD

https://github.com/erenavsarogullari/OTV_Spring_Hazelcast_DistributedExecution

REFERENCES :

Java ExecutorService Interface
Hazelcast Distributed Executor Service












Published at DZone with permission of Eren Avşaroğulları, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)