Manik Surtani is a core R&D engineer at JBoss and project lead on JBoss Cache. He has a background in artificial intelligence and neural networks, a field he left behind when he moved from academic circles to the commercial world. Since then, he's been working with Java-related technologies, first for a startup, focusing on knowledge management and information exchange. He later worked for a large London-based consultancy as a tech lead focused on e-commerce applications on large J2EE and peer-to-peer technology. Manik is a strong proponent of open source development methodologies, ethos, and collaborative processes, and often speaks at Java User Groups around the world. Manik is a DZone MVB and is not an employee of DZone and has posted 38 posts at DZone. You can read more from them at their website. View Full User Profile

Fine-grained Replication in Infinispan

07.02.2012
| 2924 views |
  • submit to reddit


Sometimes we have a large object, possibly with lots of attributes or holding some binary data, and we would like to tell Infinispan to replicate only certain part of the object across the cluster. Typically, we wanna replicate only that part which we've just updated. This is where DeltaAware and Delta interfaces come to play. By providing implementations of these interfaces we can define fine-grained replication. When we put some effort into such such an enhancements, we would also like to speed up object marshalling and unmarshalling. Therefore, we're going to define our own externalizers - to avoid slow default Java serialization.

The following code snippets are gathered in a complete example at https://github.com/mgencur/infinispan-examples/tree/master/partial-state-transfer This project contains a readme file with instructions on how to build and run the example. It is based on clustered-cache quickstart in Infinispan.

Implementing DeltaAware interface
So let's look at our main object. For the purpose of this exercise, I defined a Bicycle class that consists of many components like frame, fork, rearShock, etc. This object is stored in a cache as a value under certain (not important) key. It might happen in our scenario that we update only certain components of the bike and in such case we want to replicate just those component changes.

Important methods here are (description taken from javadocs):

commit() - Indicates that all deltas collected to date has been extracted (via a
                 call to delta()) and can be discarded. Often used as an optimization if
                 the delta isn't really needed, but the cleaning and resetting of       
                 internal state is desirable.

delta() - Extracts changes made to implementations, in an efficient format that
             can easily and cheaply be serialized and deserialized.  This method will
             only be called once for each changeset as it is assumed that any
             implementation's internal changelog is wiped and reset after generating
             and submitting the delta to the caller.
         
We also need to define setters and getters for our members. Setter methods are, among other things, responsible for registering changes to the changelog that will be later used to reconstruct the object's state. The externalizer for this class is only needed when cache stores are used. For the sake of simplicity, I don't mention it here.
public class Bicycle implements DeltaAware, Cloneable {

    //bicycle attributes
    String frame, fork, rearShock, crank, bottomBracket, shifters, cogSet, chain,
           frontDerailleur, rearDerailleur, rims, hubs, tires, pedals, brake, handlebar, stem;

    private BicycleDelta delta;

    @Override
    public void commit() {
        delta = null;
    }

    @Override
    public Delta delta() {
        Delta toReturn = getDelta();
        delta = null; // reset
        return toReturn;
    }
    
    ...
    
    public void setFrame(String frame) {
        getDelta().registerComponentChange("frame", frame);
        this.frame = frame;
    }
    
    public String getFrame() {
        return frame;
    }
    
    ...
}


Implementing Delta interface
Actual object that will be replicated across the cluster is the implementation of Delta interface. Let's look at the class. First, we need a field that will hold the changes - changeLog. Second, we need to define a merge() method. This method must be implemented so that Infinispan knows how to merge an existing object with incoming changes. The parameter of this method represents an object that is already stored in a cache, incoming changes will be applied to this object. We're using a reflection here to apply the changes to the actual object but it is not necessary. We could easily call setter methods. The advantage of using reflection is that we can set those fields in a loop.

Another piece is a registerComponentChange() method. This is called by an object of the Bicycle class - to record changes to that object. The name of this method is not important.

Defining our own externalizer
Alright, so what remains is the externalizer definition for the Delta implementation. We implement AdvancedExternalizer interface and say that only changeLog object should be marshalled and unmarshalled when transfering data over the wire. A complete (almost) implementation of Delta interface is the following.
public class BicycleDelta implements Delta {

    private HashMap<String, String> changeLog = new HashMap<String, String>();
    
    @Override
    public DeltaAware merge(DeltaAware d) {
        Bicycle other;
        if (d != null && (d instanceof Bicycle)) {
            other = (Bicycle) d;
        }
        else {
            other = new Bicycle();
        }
        Class<?> hugeClass = other.getClass();
        try {
            if (changeLog != null) {
                for (Entry<String, String> o : changeLog.entrySet()) {
                    Field x = hugeClass.getDeclaredField(o.getKey());
                    if (!x.isAccessible()) x.setAccessible(true);
                    x.set(other, o.getValue());
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return other;
    }
    
    public void registerComponentChange(String compName, String compDescription) {
        changeLog.put(compName, compDescription);
    }
    
    ...
    
    public static class Externalizer implement AdvancedExternalizer<BicycleDelta> {

        @Override
        public Set<Class<? extends BicycleDelta>> getTypeClasses() {
            return Util.<Class<? extends BicycleDelta>>asSet(BicycleDelta.class);
        }

        @Override
        public void writeObject(ObjectOutput output, BicycleDelta object) throws IOException {
            output.writeObject(object.changeLog);
        }

        @Override
        public BicycleDelta readObject(ObjectInput input) throws IOException, ClassNotFoundException {
            BicycleDelta delta = new BicycleDelta();
            delta.changeLog = (HashMap<String, String>) input.readObject();
            return delta;
        }
        
        @Override
        public Integer getId() {
           return 33; //put some random value here to identify the externalizer (must not be in reserved value ranges)
        }
    }
}


Tell Infinispan about the extra externalizer
We also need to configure Infinispan to use our special externalizer to marshall/unmarshall our objects. We can do it e.g. programatically by calling .addAdvancedExternalizer() on the serialization configuration builder.
new DefaultCacheManager(
GlobalConfigurationBuilder.defaultClusteredBuilder()
.serialization()
.addAdvancedExternalizer(new BicycleDelta.Externalizer())
.build(),
new ConfigurationBuilder()
.clustering()
.cacheMode(CacheMode.REPL_SYNC)
.transaction()
.transactionMode(TransactionMode.TRANSACTIONAL)
.autoCommit(true)
.lockingMode(LockingMode.OPTIMISTIC)
.transactionManagerLookup(new JBossStandaloneJTAManagerLookup())
.locking()
.isolationLevel(IsolationLevel.READ_COMMITTED)
.build()
);

You can see we're also configuring transactions here. This is not necessary, though. We just aim to provide a richer example, removing transactional behavior is trully easy.

And here comes the "usage" part. Enclose cache calls by a transaction, retrieve a bicycle object from the cache, do some changes and commit them. 
try {
tm.begin();

//retrieve the bicycle from the cache, we suppose the bicycle already exists and
//only trying to apply some changes to it, only these changes will be replicated
Bicycle toChange = cache.get(BIKE1);

System.out.println("Updating components: frame, fork");
toChange.setFrame("New Frame");
toChange.setFork("New Fork");

//store the bicycle back to the cache
cache.put(BIKE1, toChange);

tm.commit();
} catch (Exception e) {
try {
if (tm != null) {
tm.rollback();
}
} catch (Exception ex) {
}
}

That's it. What is eventually transferred over the wire is just the changeLog object. The actual bicycle object is reconstructed from incomming updates.

If all of this seem to be too complex to you, I have good news. Infinispan provides one implementation of DeltaAware interface whish is called AtomicHashMap (package org.infinispan.atomic). If this map is used as a value in key/value pairs stored in the cache, only puts/gets/removes performed to this map during a transaction are replicated to other nodes. Classes like Bicycle and BicycleDelta are not need then. Even registering the externalizer for AtomicHashMap is not needed, this is done automatically during registration of internal externalizers. However, one might want a class emulating a real-world object, not just a map. That's the case when your own implementations of DeltaAware and Delta interfaces are the only way. 
  
Published at DZone with permission of Manik Surtani, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)