Enterprise Integration Zone is brought to you in partnership with:

John D'Emic is a technologist, developer and author. He is currently a Solutions Architect at MuleSoft and a co-author of both editions of Mule in Action. John is a DZone MVB and is not an employee of DZone and has posted 10 posts at DZone. You can read more from them at their website. View Full User Profile

Implementing a Circuit Breaker to Define Your Own Integration Patterns

12.18.2012
| 1628 views |
  • submit to reddit
One of my favorite from Michael Nygard’s excellent Release It! is the .  A is an automatic switch that stops the flow of electricity in the event of a failure.  This sort of behavior is also useful when integrating with remote systems.

We might want to stop message delivery on an outbound-endpoint after a certain exception is thrown. A remote system under load or the target of a denial-of-service attach is a good example.  In this scenario it would be nice to automatically stop delivery  for a certain period of time to not exacerbate the situation.

Configuring the Circuit Breaker

We’ll use DevKit  to implement a circuit breaker and start off by using Maven to create a Mule Module for the circuit breaker processors.

mvn archetype:generate -DarchetypeGroupId=org.mule.tools.devkit -DarchetypeArtifactId=mule-devkit-archetype-generic -DarchetypeVersion=3.0.1 -DarchetypeRepository=http://repository.mulesoft.org/releases/ -DgroupId=com.acmesoft -DartifactId=circuit-breaker-module -Dversion=1.0-SNAPSHOT -DmuleVersion=3.2.0 -DmuleModuleName=CircuitBreaker -DmuleModulePackage=com.acmesoft.integration

Once that’s done  we  can define how the circuit breaker will be configured in a flow. Take a look at the following configuration:

<circuitbreaker:config breakerName="testingBreaker" tripThreshold="5" tripTimeout="300000"/>

<flow name="testFlow">
    <vm:inbound-endpoint path="vm.in"/>
    <circuitbreaker:filter/>
    <test:component throwException="true"/>
    <vm:outbound-endpoint path="vm.out"/>
    <default-exception-strategy>
        <circuitbreaker:trip tripOnException="org.mule.tck.exceptions.FunctionalTestException"/>
    </default-exception-strategy>
</flow>

At the top of the file we’re  specifying the name of the breaker,  how many “trips” until the breaker is opened and a timeout for the trip to be automatically closed.  We’ll use a contrived flow for testing purposes leveraging the test:component to throw an exception.

The  filter is configured before the component. If the circuit breaker is “open” then messages won’t be sent to the test:component.  The trip processor on is responsible for opening the breaker if enough FunctionalTestExceptions are thrown.  This configuration will stop messages from flowing to the the test:component for 5 minutes after 5 FunctionalTestException’s have been thrown.

Implementing the Circuit Breaker

Now that we know what our configuration will look like we can focus on the implementation.  The full module implementation is here, but we’ll consider it one piece at a time.  Its important to note that these implementations are all that’s required to supper the configuration above.  No custom namespace or Spring NamespaceHandler implementations are required.  We simply annotate our methods appropriately and Mule and mvn do the rest.

The trip() and filter() Methods

The trip() and filter() methods in CircuitBreakerModule.java provide the implementations for the message processors we just saw.

Let’s take a look at trip(), the method which opens the breaker, first:

 /**
     * Custom processor
     * <p/>
     * {@sample.xml ../../../doc/CircuitBreaker-connector.xml.sample circuitbreaker:trip}
     *
     * @param exceptionMessage The exception.
     * @param tripOnException  The exception type we should trip on.
     * @return Some string
     */
    @Processor
    public Object trip(String tripOnException, @Payload ExceptionMessage exceptionMessage) {

        if (exceptionMessage.getException().getCause().getClass().getCanonicalName().equals(tripOnException)) {
            incrementFailureCount();
            if (getFailureCount() == tripThreshold) {
                breakerTrippedOn = new Date();
            }
        }
        return exceptionMessage;
    }

This processor is called in the exception-strategy and will increment the failure count when the specified exception is thrown. If the failureCount is equal to the tripThreshold then we also set the breakerTrippedOn to the current time. getFailureCount() is retrieving its value from the ObjectStore .  We’ll look at how that’s implemented shortly. First let’s see how the filter is implemented:

/**
     * Custom processor
     * <p/>
     * {@sample.xml ../../../doc/CircuitBreaker-connector.xml.sample circuitbreaker:filter}
     *
     * @param payload The message payload
     * @return Some string
     */
    @Processor
    public Object filter(@Payload Object payload) {
        if (getFailureCount() < tripThreshold) {
            return payload;
        } else if (breakerTrippedOn != null && System.currentTimeMillis() - breakerTrippedOn.getTime() > tripTimeout) {
            breakerTrippedOn = null;
            resetFailureCount();
            return payload;
        } else {
            throw new CircuitOpenException();
        }
    }

The filter processor will throw a CircuitOpenException if the failureCount exceeds the tripThreshold.  If the tripTimeout has been exceeded then failureCount is reset and messages are allowed through.  Throwing a CircuitOpenException gives us flexibility in handling messages that have been blocked.  We could, for instance, place these messages on a retry queue to attempt delivery later after the breaker has been opened.

Storing the Circuit Breaker State in an ObjectStore

This implementation uses the default, persistent Mule ObjectStore to store  failureCount. The following convenience methods let the processors track this state.

Integer getFailureCount() {
        try {
            objectStoreMutex.acquire();
        } catch (InterruptedException e) {
            logger.error("Could not acquire mutex", e);
        }

        ObjectStore objectStore = objectStoreManager.getObjectStore(MuleProperties.OBJECT_STORE_DEFAULT_PERSISTENT_NAME);

        String key = String.format("%s.failureCount", breakerName);

        Integer failureCount = 0;
        try {
            if (objectStore.contains(key)) {
                failureCount = (Integer) objectStore.retrieve(key);
            }
        } catch (Exception e) {
            logger.error("Could not retrieve key from object-store: " + key, e);
        }

        objectStoreMutex.release();

        return failureCount;

    }

    void incrementFailureCount() {
        try {
            objectStoreMutex.acquire();
        } catch (InterruptedException e) {
            logger.error("Could not acquire mutex", e);
        }

        ObjectStore objectStore = objectStoreManager.getObjectStore(MuleProperties.OBJECT_STORE_DEFAULT_PERSISTENT_NAME);


        String key = String.format("%s.failureCount", breakerName);

        Integer failureCount = 0;
        try {
            if (objectStore.contains(key)) {
                failureCount = (Integer) objectStore.retrieve(key);
                objectStore.remove(key);
            }
            objectStore.store(key, failureCount + 1);
        } catch (Exception e) {
            logger.error("Could not retrieve key from object-store: " + key, e);
        }

        objectStoreMutex.release();
    }

    void resetFailureCount() {
        try {
            objectStoreMutex.acquire();
        } catch (InterruptedException e) {
            logger.error("Could not acquire mutex", e);
        }

        ObjectStore objectStore = objectStoreManager.getObjectStore(MuleProperties.OBJECT_STORE_DEFAULT_PERSISTENT_NAME);


        String key = String.format("%s.failureCount", breakerName);

        Integer failureCount = 0;
        try {
            if (objectStore.contains(key)) {
                failureCount = (Integer) objectStore.retrieve(key);
                objectStore.remove(key);
            }
            objectStore.store(key, 0);
        } catch (Exception e) {
            logger.error("Could not retrieve key from object-store: " + key, e);
        }

        objectStoreMutex.release();
    }

objectStoreMutex is a Semaphore with a single permit.  This is to avoid race conditions reading from and writing to the object-store during concurrent access to the circuit breaker.  Using the object-store gives us options to control the breaker, perhaps using JMX.  The fact that we’re grabbing the default persistent store also means our circuit breaker’s state will be persisted across Mule instance restarts.  If you’re app is running in a  Mule HA Cluster then the circuit-breaker’s state will be distributed across the clustered nodes.

This blog entry hopefully demonstrated that Mule makes it easy to implement complex, schema-aware message processors in your Mule apps.  This allows you to define your own integration patterns, perhaps particular to your organization, and have them peacefully co-exist with the rest of the Mule ecosystem.



Published at DZone with permission of John D'Emic, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)