Eren Avşaroğulları is highly motivated open source developer and enthusiast on Java and related open source technologies. He has over 8 years of professional software design and development experiences in different domains such as telecommunication, finance and control & automation. His areas of interest are Web Frameworks, Distributed and Paralel Computing, High-Performance Systems, High Availability and Scalability. His current focus is Large-Scaled & High Available Enterprise Applications and GSM Network Platform & Service Design and Developments. He writes on the open source technologies. He hold a B.Sc. degree in Electrical & Electronics Engineering and a M.Sc. degree in Control & Automation Engineering. Eren is a DZone MVB and is not an employee of DZone and has posted 9 posts at DZone. You can read more from them at their website. View Full User Profile

How to Distribute Spring Beans Using EntryProcessor and PortableObject Features

06.12.2012
| 2774 views |
  • submit to reddit

This article shows how to distribute Spring beans by using EntryProcessor and Portable Object Format(POF) features in Oracle Coherence.

Coherence supports a lock-free programming model through the EntryProcessor API. This feature improves system performance by reducing network access and performing an implicit low-level lock on the entries. This implicit low-level locking functionality is different than the explicit lock(key) provided by ConcurrentMap API.

Explicit locking, Transaction Framework API and Coherence Resource Adapter are other Coherence Transaction Options as Entry Processors. For detailed informations about Coherence Transaction Options, please look at the references section. In addition, Distributed Data Management in Oracle Coherence Article can be suggested for the Coherence Explicit locking implementation.

Portable Object Format(POF) is a platform-independent serialization format. It allows to encode equivalent Java, .NET and C++ objects into the identical sequence of bytes. POF is suggested for the system performance since Serialization and Deserialization performances of POF are better than the Standart Java Serialization(According to Coherence Reference document, in a simple test class with a String, a long, and three ints, (de)serialization was seven times faster than the Standart Java Serialization).

Coherence offers many kinds of cache types such as Distributed(or Partitioned), Replicated, Optimistic, Near, Local and Remote Cache. Distributed cache is defined as a collection of data that is distributed (or, partitioned) across any number of cluster nodes such that exactly one node in the cluster is responsible for each piece of data in the cache, and the responsibility is distributed (or, load-balanced) among the cluster nodes. Please note that distributed cache type has been used in this article. Since the other cache-types are not in the scope of this article, please look at the References section or Coherence Reference document. Their configurations are very similar to distributed cache configuration.

How to distribute Spring Beans by using Coherence Article covering Explicit locking – Java Standart Serialization is suggested to compare two different implementations(EntryProcessor – Portable Object Format(POF) and Explicit locking – Java Standart Serialization).

In this article, a new cluster named OTV has been created and a spring bean has been distributed by using a cache object named user-cache. It has been distributed between two members of the cluster.

Let us look at implementation of AbsctractProcessor implementing EntryProcessor Interface and PortableObject Interface for Spring Beans’ distribution between JVMs in a cluster.

Used Technologies :

JDK 1.6.0_31
Spring 3.1.1
Coherence 3.7.0
SolarisOS 5.10
Maven 3.0.2

STEP 1 : CREATE MAVEN PROJECT

A maven project is created as below. (It can be created by using Maven or IDE Plug-in).

STEP 2 : COHERENCE PACKAGE

Coherence is downloaded via Coherence Package

STEP 3 : LIBRARIES

Firstly, Spring dependencies are added to Maven’ s pom.xml. Please note that Coherence library is installed to Local Maven Repository and its description is added to pom.xml as follows. Also if the maven is not used, coherence.jar file can be added to classpath.

<properties>
    <spring.version>3.1.1.RELEASE</spring.version>
</properties>
 
<dependencies>
 
    <!-- Spring 3 dependencies -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
 
    <!-- Coherence library(from local repository) -->
    <dependency>
        <groupId>com.tangosol</groupId>
        <artifactId>coherence</artifactId>
        <version>3.7.0</version>
    </dependency>
 
    <!-- Log4j library -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.16</version>
    </dependency>
 
</dependencies>

The following maven-plugin can be used to create runnable-jar.

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.3.1</version>
 
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>com.otv.exe.Application</mainClass>
                    </transformer>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.handlers</resource>
                    </transformer>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.schemas</resource>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

STEP 4 : CREATE otv-pof-config.xml
otv-pof-config.xml covers the classes using Portable Object Format(POF) feature for serialization. In this example; User, UpdateUserProcessor and DeleteUserProcessor classes implement the com.tangosol.io.pof.PortableObject Interface.

-Dtangosol.pof.config argument can be used to define otv-pof-config.xml path in startup script.

<?xml version="1.0"?>
<!DOCTYPE pof-config SYSTEM "pof-config.dtd">
<pof-config>
    <user-type-list>
        <!-- coherence POF user types -->
        <include>coherence-pof-config.xml</include>
        <!-- The definition of classes which use Portable Object Format -->
        <user-type>
            <type-id>1001</type-id>
            <class-name>com.otv.user.User</class-name>
        </user-type>
        <user-type>
            <type-id>1002</type-id>
            <class-name>com.otv.user.processor.UpdateUserProcessor</class-name>
        </user-type>
        <user-type>
            <type-id>1003</type-id>
            <class-name>com.otv.user.processor.DeleteUserProcessor</class-name>
        </user-type>
    </user-type-list>
    <allow-interfaces>true</allow-interfaces>
    <allow-subclasses>true</allow-subclasses>
</pof-config>

STEP 5 : CREATE otv-coherence-cache-config.xml

otv-coherence-cache-config.xml contains caching-schemes(distributed or replicated) and caching-scheme-mapping configuration. Created cache configuration should be added to coherence-cache-config.xml.

<?xml version="1.0"?>
 
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
              xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config
                     coherence-cache-config.xsd">
 
    <caching-scheme-mapping>
        <cache-mapping>
            <cache-name>user-cache</cache-name>
            <scheme-name>UserDistributedCacheWithPof</scheme-name>
        </cache-mapping>
    </caching-scheme-mapping>
 
    <caching-schemes>
 
        <distributed-scheme>
            <scheme-name>UserDistributedCacheWithPof</scheme-name>
            <service-name>UserDistributedCacheWithPof</service-name>
 
            <serializer>
                <instance>
                    <class-name>com.tangosol.io.pof.SafeConfigurablePofContext
                    </class-name>
                    <init-params>
                        <init-param>
                            <param-type>String</param-type>
                            <param-value>
                                <!-- pof-config.xml path should be set-->
                                otv-pof-config.xml
                            </param-value>
                        </init-param>
                    </init-params>
                </instance>
            </serializer>
            <backing-map-scheme>
                <local-scheme />
            </backing-map-scheme>
            <autostart>true</autostart>
        </distributed-scheme>
    </caching-schemes>
 
</cache-config>

STEP 6 : CREATE tangosol-coherence-override.xml

tangosol-coherence-override.xml covers cluster, member-identity and configurable-cache-factory configuration. Also the following configuration xml file shows first member of the cluster. -Dtangosol.coherence.override argument can be used to define tangosol-coherence-override.xml path in startup script.

tangosol-coherence-override.xml for first member of the cluster :

<?xml version='1.0'?>
 
<coherence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
   xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
 
   <cluster-config>
 
      <member-identity>
         <cluster-name>OTV</cluster-name>
         <!-- Name of the first member of the cluster -->
         <role-name>OTV1</role-name>
      </member-identity>
 
      <unicast-listener>
          <well-known-addresses>
            <socket-address id="1">
              <!-- IP Address of the first member of the cluster -->
              <address>x.x.x.x</address>
              <port>8089</port>
            </socket-address>
            <socket-address id="2">
              <!-- IP Address of the second member of the cluster -->
              <address>y.y.y.y</address>
              <port>8089</port>
            </socket-address>
          </well-known-addresses>
 
          <!-- Name of the first member of the cluster -->
          <machine-id>OTV1</machine-id>
          <!-- IP Address of the first member of the cluster -->
          <address>x.x.x.x</address>
          <port>8089</port>
          <port-auto-adjust>true</port-auto-adjust>
      </unicast-listener>
 
   </cluster-config>
 
   <configurable-cache-factory-config>
      <init-params>
         <init-param>
            <param-type>java.lang.String</param-type>
            <param-value system-property="tangosol.coherence.cacheconfig">
              <!-- coherence-cache-config.xml path should be set-->
              otv-coherence-cache-config.xml
            </param-value>
         </init-param>
      </init-params>
   </configurable-cache-factory-config>
 
</coherence>

tangosol-coherence-override.xml for second member of the cluster :

<?xml version='1.0'?>
 
<coherence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
   xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
 
   <cluster-config>
 
      <member-identity>
         <cluster-name>OTV</cluster-name>
         <!-- Name of the second member of the cluster -->
         <role-name>OTV2</role-name>
      </member-identity>
 
      <unicast-listener>     
 
          <well-known-addresses>
            <socket-address id="1">
              <!-- IP Address of the first member of the cluster -->
              <address>x.x.x.x</address>
              <port>8089</port>
            </socket-address>
            <socket-address id="2">
              <!-- IP Address of the second member of the cluster -->
              <address>y.y.y.y</address>
              <port>8089</port>
            </socket-address>
          </well-known-addresses>
 
          <!-- Name of the second member of the cluster -->
          <machine-id>OTV2</machine-id>
          <!-- IP Address of the second member of the cluster -->
          <address>y.y.y.y</address>
          <port>8089</port>
          <port-auto-adjust>true</port-auto-adjust>
 
      </unicast-listener>
 
   </cluster-config>
 
   <configurable-cache-factory-config>
      <init-params>
         <init-param>
            <param-type>java.lang.String</param-type>
            <param-value system-property="tangosol.coherence.cacheconfig">
              <!-- coherence-cache-config.xml path should be set-->
              otv-coherence-cache-config.xml</param-value>
         </init-param>
      </init-params>
   </configurable-cache-factory-config>
 
</coherence>

STEP 7 : CREATE applicationContext.xml

applicationContext.xml is created.

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
 
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
 
    <!-- Beans Declaration -->
    <bean id="User" class="com.otv.user.User" scope="prototype" />
    <bean id="UserCacheService" class="com.otv.user.cache.srv.UserCacheService" />
    <bean id="CacheUpdaterTask" class="com.otv.cache.updater.task.CacheUpdaterTask">
        <property name="userCacheService" ref="UserCacheService" />
    </bean>
</beans>

STEP 8 : CREATE SystemConstants CLASS

SystemConstants Class is created. This class covers all system constants.

package com.otv.common;
 
/**
 * System Constants
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class SystemConstants {
 
    public static final String APPLICATION_CONTEXT_FILE_NAME = "applicationContext.xml";
 
    //Named Cache Definition...
    public static final String USER_CACHE = "user-cache";
 
    //Bean Names...
    public static final String BEAN_NAME_CACHE_UPDATER_TASK = "CacheUpdaterTask";
    public static final String BEAN_NAME_USER = "User";
 
}

STEP 9 : CREATE User BEAN

A new User Spring bean is created. This bean will be distributed between two nodes in OTV cluster. PortableObject can be implemented for the serialization. PortableObject Interface has got two unimplemented methods as readExternal and writeExternal. The properties which are only serialized, must be defined. In this example, all the properties(id, name and surname of User) are serialized.

package com.otv.user;
 
import java.io.IOException;
 
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
 
/**
 * User Bean
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class User implements PortableObject {
 
    private String id;
    private String name;
    private String surname;
 
    /**
     * Gets User Id
     *
     * @return String id
     */
    public String getId() {
        return id;
    }
 
    /**
     * Sets User Id
     *
     * @param String id
     */
    public void setId(String id) {
        this.id = id;
    }
 
    /**
     * Gets User Name
     *
     * @return String name
     */
    public String getName() {
        return name;
    }
 
    /**
     * Sets User Name
     *
     * @param String name
     */
    public void setName(String name) {
        this.name = name;
    }
 
    /**
     * Gets User Surname
     *
     * @return String surname
     */
    public String getSurname() {
        return surname;
    }
 
    /**
     * Sets User Surname
     *
     * @param String surname
     */
    public void setSurname(String surname) {
        this.surname = surname;
    }
 
    @Override
    public String toString() {
        StringBuilder strBuilder = new StringBuilder();
        strBuilder.append("Id : ").append(id);
        strBuilder.append(", Name : ").append(name);
        strBuilder.append(", Surname : ").append(surname);
        return strBuilder.toString();
    }
 
    /**
     * Restore the contents of a user type instance by reading its state
     * using the specified PofReader object.
     *
     * @param PofReader in
     */
    public void readExternal(PofReader in) throws IOException {
        this.id = in.readString(0);
        this.name = in.readString(1);
        this.surname = in.readString(2);
    }
 
    /**
     * Save the contents of a POF user type instance by writing its state
     * using the specified PofWriter object.
     *
     * @param PofWriter out
     */
    public void writeExternal(PofWriter out) throws IOException {
        out.writeString(0, id);
        out.writeString(1, name);
        out.writeString(2, surname);
    }
}

STEP 10 : CREATE IUserCacheService INTERFACE

A new IUserCacheService Interface is created to perform cache operations.

package com.otv.user.cache.srv;
 
import com.otv.user.User;
import com.otv.user.processor.DeleteUserProcessor;
import com.otv.user.processor.UpdateUserProcessor;
import com.tangosol.net.NamedCache;
 
/**
 * User Cache Service Interface
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public interface IUserCacheService {
 
    /**
     * Gets Distributed User Cache
     *
     * @return NamedCache User Cache
     */
    NamedCache getUserCache();
 
    /**
     * Adds user to cache
     *
     * @param User user
     */
    void addUser(User user);
 
    /**
     * Updates user on the cache
     *
     * @param String userId
     * @param UpdateUserProcessor processor
     *
     */
    void updateUser(String userId, UpdateUserProcessor processor);
 
    /**
     * Deletes user from the cache
     *
     * @param String userId
     * @param DeleteUserProcessor processor
     *
     */
    void deleteUser(String userId, DeleteUserProcessor processor);
 
}

STEP 11 : CREATE UserCacheService CLASS

UserCacheService Class is created by implementing IUserCacheService Interface.

package com.otv.user.cache.srv;
 
import com.otv.cache.listener.UserMapListener;
import com.otv.common.SystemConstants;
import com.otv.user.User;
import com.otv.user.processor.DeleteUserProcessor;
import com.otv.user.processor.UpdateUserProcessor;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
 
/**
 * User Cache Service
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class UserCacheService implements IUserCacheService {
 
    private NamedCache userCache = null;   
 
    public UserCacheService() {
        setUserCache(CacheFactory.getCache(SystemConstants.USER_CACHE));
        //UserMap Listener is registered to listen user-cache operations
        getUserCache().addMapListener(new UserMapListener());
    }  
 
    /**
     * Adds user to cache
     *
     * @param User user
     */
    public void addUser(User user) {
        getUserCache().put(user.getId(), user);
    }
 
    /**
     * Deletes user from the cache
     *
     * @param String userId
     * @param DeleteUserProcessor processor
     *
     */
    public void deleteUser(String userId, DeleteUserProcessor processor) {
        getUserCache().invoke(userId, processor);
    }
 
    /**
     * Updates user on the cache
     *
     * @param String userId
     * @param UpdateUserProcessor processor
     *
     */
    public void updateUser(String userId, UpdateUserProcessor processor) {
        getUserCache().invoke(userId, processor);
    }
 
    /**
     * Gets Distributed User Cache
     *
     * @return NamedCache User Cache
     */
    public NamedCache getUserCache() {
        return userCache;
    }
 
    /**
     * Sets User Cache
     *
     * @param NamedCache userCache
     */
    public void setUserCache(NamedCache userCache) {
        this.userCache = userCache;
    }
}

STEP 12 : CREATE USERMAPLISTENER CLASS

A new UserMapListener class is created. This listener receives distributed user-cache events.

package com.otv.cache.listener;
 
import org.apache.log4j.Logger;
 
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
 
/**
 * User Map Listener
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class UserMapListener implements MapListener {
 
    private static Logger logger = Logger.getLogger(UserMapListener.class);
 
    /**
     * This method is invoked when an entry is deleted from the cache...
     *
     * @param MapEvent me
     */
    public void entryDeleted(MapEvent me) {
         logger.debug("Deleted Key = " + me.getKey() + ", Value = " + me.getOldValue());
    }
 
    /**
     * This method is invoked when an entry is inserted to the cache...
     *
     * @param MapEvent me
     */
    public void entryInserted(MapEvent me) {
        logger.debug("Inserted Key = " + me.getKey() + ", Value = " + me.getNewValue());
    }
 
    /**
     * This method is invoked when an entry is updated on the cache...
     *
     * @param MapEvent me
     */
    public void entryUpdated(MapEvent me) {
        logger.debug("Updated Key = " + me.getKey() + ", New_Value = " + me.getNewValue() + ", Old Value = " + me.getOldValue());
    }
}

STEP 13 : CREATE UpdateUserProcessor CLASS

AbstractProcessor is an abstract class under package com.tangosol.util.processor. It implements EntryProcessor Interface.

UpdateUserProcessor Class is created to process User Update operation on the cache. When UpdateUserProcessor is invoked for a key, firstly the member containing the key is found in the cluster. After then, UpdateUserProcessor is invoked from the member which contains the related key and its value(User object) is updated. Therefore, network traffic is reduced.

package com.otv.user.processor;
 
import java.io.IOException;
 
import org.apache.log4j.Logger;
 
import com.otv.user.User;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;
 
/**
 * Update User Processor
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class UpdateUserProcessor extends AbstractProcessor implements PortableObject {
 
    private static Logger logger = Logger.getLogger(UpdateUserProcessor.class);
    private User newUser;
 
    /**
     * This empty constructor is added for Portable Object Format(POF).
     *
     */
    public UpdateUserProcessor() {
 
    }
 
    public UpdateUserProcessor(User newUser) {
        this.newUser = newUser;
    }
 
    /**
     * Processes a Map.Entry object.
     *
     * @param Entry entry
     * @return Object newUser
     */
    public Object process(Entry entry) {
        Object newValue = null;
        try {
            newValue = getNewUser();
            entry.setValue(newValue);
        } catch (Exception e) {
            logger.error("Error occured when entry was being processed!", e);
        }
 
        return newValue;
    }
 
    /**
     * Gets new user
     *
     * @return User newUser
     */
    public User getNewUser() {
        return newUser;
    }
 
    /**
     * Sets new user
     *
     * @param User newUser
     */
    public void setNewUser(User newUser) {
        this.newUser = newUser;
    }
 
    /**
     * Restore the contents of a user type instance by reading its state
     * using the specified PofReader object.
     *
     * @param PofReader in
     */
    public void readExternal(PofReader in) throws IOException {
        setNewUser((User) in.readObject(0));
    }
 
    /**
     * Save the contents of a POF user type instance by writing its state
     * using the specified PofWriter object.
     *
     * @param PofWriter out
     */
    public void writeExternal(PofWriter out) throws IOException {
        out.writeObject(0, getNewUser());
    }
}

STEP 14 : CREATE DeleteUserProcessor CLASS

DeleteUserProcessor Class is created to process User Deletion operation on the cache. When DeleteUserProcessor is invoked for a key, firstly the member containing the key is found in the cluster. After then, DeleteUserProcessor is invoked from the member which contains the related key. Therefore, network traffic is reduced.

package com.otv.user.processor;
 
import java.io.IOException;
 
import org.apache.log4j.Logger;
 
import com.otv.user.User;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;
 
/**
 * Delete User Processor
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class DeleteUserProcessor extends AbstractProcessor implements PortableObject {
 
    private static Logger logger = Logger.getLogger(DeleteUserProcessor.class);
 
    /**
     * Processes a Map.Entry object.
     *
     * @param Entry entry
     * @return Object user
     */
    public Object process(Entry entry) {
        User user = null;
        try {
            user = (User) entry.getValue();
            entry.remove(true);
        } catch (Exception e) {
            logger.error("Error occured when entry was being processed!", e);
        }
 
        return user;
    }
 
    /**
     * Restore the contents of a user type instance by reading its state
     * using the specified PofReader object.
     *
     * @param PofReader in
     */
    public void readExternal(PofReader in) throws IOException {
 
    }
 
    /**
     * Save the contents of a POF user type instance by writing its state
     * using the specified PofWriter object.
     *
     * @param PofWriter out
     */
    public void writeExternal(PofWriter out) throws IOException {
 
    }
}

STEP 15 : CREATE CacheUpdaterTask CLASS

CacheUpdaterTask Class is created to perform cache operations(add, update and delete) and monitor cache content.

package com.otv.cache.updater.task;
 
import java.util.Collection;
 
import org.apache.log4j.Logger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
 
import com.otv.common.SystemConstants;
import com.otv.user.User;
import com.otv.user.cache.srv.IUserCacheService;
import com.otv.user.processor.DeleteUserProcessor;
import com.otv.user.processor.UpdateUserProcessor;
 
/**
 * Cache Updater Task
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class CacheUpdaterTask implements BeanFactoryAware, Runnable {
 
    private static Logger log = Logger.getLogger(CacheUpdaterTask.class);
    private IUserCacheService userCacheService;
    private BeanFactory beanFactory;
 
    public void run() {
        try {
            while(true) {
                /**
                 * Before the project is built for the first member,
                 * this code block should be used instead of
                 * method processRequestsOnSecondMemberOfCluster.
                 */
                processRequestsOnFirstMemberOfCluster();
 
                /**
                 * Before the project is built for the second member,
                 * this code block should be used instead of
                 * method processRequestsOnFirstMemberOfCluster.
                 */
//              processRequestsOnSecondMemberOfCluster();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    /**
      * Processes the cache requests on the first member of cluster...
      *
      * @throws InterruptedException
      */
    private void processRequestsOnFirstMemberOfCluster() throws InterruptedException {
        //Entry is added to cache...
        getUserCacheService().addUser(getUser("1", "Bruce", "Willis"));
 
        //Cache Entries are being printed...
        printCacheEntries();
 
        Thread.sleep(10000);
 
        User newUser = getUser("1", "Client", "Eastwood");
        //Existent Entry is updated on the cache...
        getUserCacheService().updateUser(newUser.getId(), new UpdateUserProcessor(newUser));
 
        //Cache Entries are being printed...
        printCacheEntries();
 
        Thread.sleep(10000);
 
        //Entry is deleted from cache...
        getUserCacheService().deleteUser(newUser.getId(), new DeleteUserProcessor());
 
        //Cache Entries are being printed...
        printCacheEntries();
 
        Thread.sleep(10000);
    }
 
    /**
      * Processes the cache requests on the second member of cluster...
      *
      * @throws InterruptedException
      */
    private void processRequestsOnSecondMemberOfCluster() throws InterruptedException {
        //Entry is added to cache...
        getUserCacheService().addUser(getUser("2", "Nathalie", "Portman"));
 
        Thread.sleep(15000);
 
        User newUser = getUser("2", "Sharon", "Stone");
        //Existent Entry is updated on the cache...
        getUserCacheService().updateUser(newUser.getId(), new UpdateUserProcessor(newUser));
 
        User newUser2 = getUser("1", "Maria", "Sharapova");
        //Existent Entry is updated on the cache...
        getUserCacheService().updateUser(newUser2.getId(), new UpdateUserProcessor(newUser2));
 
        Thread.sleep(15000);
 
        //Entry is deleted from cache...
        getUserCacheService().deleteUser(newUser.getId(), new DeleteUserProcessor());
 
        Thread.sleep(15000);
    }
 
    /**
     * Prints cache entries
     *
     */
    private void printCacheEntries() {
        Collection<User> userCollection = (Collection<User>)getUserCacheService().getUserCache().values();
        for(User user : userCollection) {
            log.debug("Cache Content : "+user);
        }
    }
 
    /**
     * Gets new user instance
     *
     * @param String user id
     * @param String user name
     * @param String user surname
     * @return User user
     */
    private User getUser(String id, String name, String surname) {
        User user = getNewUserInstance();
        user.setId(id);
        user.setName(name);
        user.setSurname(surname);
 
        return user;
    }
 
    /**
     * Gets user cache service...
     *
     * @return IUserCacheService userCacheService
     */
    public IUserCacheService getUserCacheService() {
        return userCacheService;
    }
 
    /**
     * Sets user cache service...
     *
     * @param IUserCacheService userCacheService
     */
    public void setUserCacheService(IUserCacheService userCacheService) {
        this.userCacheService = userCacheService;
    }
 
    /**
     * Gets a new instance of User Bean
     *
     * @return User
     */
    public User getNewUserInstance() {
        return  (User) getBeanFactory().getBean(SystemConstants.BEAN_NAME_USER);
    }
 
    /**
     * Gets bean factory
     *
     * @return BeanFactory
     */
    public BeanFactory getBeanFactory() {
        return beanFactory;
    }
 
    /**
     * Sets bean factory
     *
     * @param BeanFactory beanFactory
     * @throws BeansException
     */
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }
}

STEP 16 : CREATE Application CLASS

Application Class is created to run the application.

package com.otv.exe;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
import com.otv.cache.updater.task.CacheUpdaterTask;
import com.otv.common.SystemConstants;
 
/**
 * Application Class
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class Application {
 
    /**
     * Starts the application
     *
     * @param  String[] args
     *
     */
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext(SystemConstants.APPLICATION_CONTEXT_FILE_NAME);
 
        CacheUpdaterTask cacheUpdaterTask = (CacheUpdaterTask) context.getBean(SystemConstants.BEAN_NAME_CACHE_UPDATER_TASK);
        Thread cacheUpdater = new Thread(cacheUpdaterTask);
        cacheUpdater.start();
    }
}

STEP 17 : BUILD PROJECT

After OTV_Spring_Coherence_With_Processor_and_POF Project is build, OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar will be created.
Please note that the members of the cluster have got different configuration for Coherence so project should be built separately for each member.

STEP 18 : RUN PROJECT ON FIRST MEMBER OF THE CLUSTER

After created OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar file is run at the members of the cluster, the following output logs will be shown on first member’ s console:

--After A new cluster is created and First Member joins the cluster, a new entry is added to the cache.
02.06.2012 14:21:45 DEBUG (UserMapListener.java:33) - Inserted Key = 1, Value = Id : 1, Name : Bruce, Surname : Willis
02.06.2012 14:21:45 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Bruce, Surname : Willis
.......
--After Second Member joins the cluster, a new entry is added to the cache.
02.06.2012 14:21:45 DEBUG (UserMapListener.java:33) - Inserted Key = 2, Value = Id : 2, Name : Nathalie, Surname : Portman
.......
--Cache operations go on both first and second members of the cluster:
02.06.2012 14:21:55 DEBUG (UserMapListener.java:42) - Updated Key = 1, New_Value = Id : 1, Name : Client, Surname : Eastwood,
                                                                       Old Value = Id : 1, Name : Bruce, Surname : Willis
 
02.06.2012 14:21:55 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 2, Name : Nathalie, Surname : Portman
02.06.2012 14:21:55 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Client, Surname : Eastwood
 
02.06.2012 14:22:00 DEBUG (UserMapListener.java:42) - Updated Key = 2, New_Value = Id : 2, Name : Sharon, Surname : Stone,
                                                                       Old Value = Id : 2, Name : Nathalie, Surname : Portman
 
02.06.2012 14:22:00 DEBUG (UserMapListener.java:42) - Updated Key = 1, New_Value = Id : 1, Name : Maria, Surname : Sharapova,
                                                                       Old Value = Id : 1, Name : Client, Surname : Eastwood
 
02.06.2012 14:22:05 DEBUG (UserMapListener.java:24) - Deleted Key = 1, Value = Id : 1, Name : Maria, Surname : Sharapova
02.06.2012 14:22:05 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 2, Name : Sharon, Surname : Stone
02.06.2012 14:22:15 DEBUG (UserMapListener.java:24) - Deleted Key = 2, Value = Id : 2, Name : Sharon, Surname : Stone
02.06.2012 14:22:15 DEBUG (UserMapListener.java:33) - Inserted Key = 1, Value = Id : 1, Name : Bruce, Surname : Willis
02.06.2012 14:22:15 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Bruce, Surname : Willis

STEP 19 : DOWNLOAD

OTV_Spring_Coherence_With_Processor_and_POF

REFERENCES :

Performing Transactions in Coherence
Using Portable Object Format in Coherence
Spring Framework Reference 3.x

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Published at DZone with permission of Eren Avşaroğulları, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)