Performance Zone is brought to you in partnership with:

I am a software architect passionate about software integration, high scalability and concurrency challenges. Vlad is a DZone MVB and is not an employee of DZone and has posted 69 posts at DZone. You can read more from them at their website. View Full User Profile

Lock Processing Logic by Customer

10.08.2013
| 2590 views |
  • submit to reddit

In the current application we are developing there was one use case where we wanted to synchronize message processing by message provider (customer generating those messages). The flow looks something like this:

Customer process flow

So, messages may come randomly since there are more customer jobs running in parallel, but we want to ensure that messages belonging to the same customer are processed one after the other (an analog to the Serializable Database isolation level), while allowing messages coming from different customers to be processed in parallel.

So, this is how I did it:

public class CustomerLockSerializedExecution {

    private Map<K, ReentrantLock> lockMap = new HashMap<K, ReentrantLock>();

    private synchronized Lock getLock(K customerId) {
        ReentrantLock lock = lockMap.get(customerId);
        if (lock == null) {
            lock = new ReentrantLock();
            lockMap.put(customerId, lock);
        }
        return lock;
    }

    /**
     * Lock on the customer and execute the specific logic
     *
     * @param customerId customer id
     * @param callable   custom logic callback
     */
    public  void lockExecution(K customerId, Callable callable) {
        Lock lock = getLock(customerId);
        try {
            lock.lockInterruptibly();
            callable.call();
        } catch (Exception e) {
            throw new CallableException(e, callable);
        } finally {
            lock.unlock();
        }
    }
}

The unit test I wrote starts with 10 threads, each one having the same customerId, so all of them are concurring for executing their logic, which consists in adding 3 consecutive numbers (starting from an initial index) to a common buffer.


This is the execution log:

INFO  [main]: CustomerLockSerializedExecutionTest - Scheduling thread index 0
INFO  [main]: CustomerLockSerializedExecutionTest - Scheduling thread index 10
INFO  [main]: CustomerLockSerializedExecutionTest - Scheduling thread index 20
INFO  [main]: CustomerLockSerializedExecutionTest - Scheduling thread index 30
INFO  [main]: CustomerLockSerializedExecutionTest - Scheduling thread index 40
INFO  [main]: CustomerLockSerializedExecutionTest - Scheduling thread index 50
INFO  [main]: CustomerLockSerializedExecutionTest - Scheduling thread index 60
INFO  [main]: CustomerLockSerializedExecutionTest - Scheduling thread index 70
INFO  [main]: CustomerLockSerializedExecutionTest - Scheduling thread index 80
INFO  [main]: CustomerLockSerializedExecutionTest - Scheduling thread index 90
INFO  [main]: CustomerLockSerializedExecutionTest - Waiting for threads to be done
INFO  [Thread-9]: CustomerLockSerializedExecutionTest - Running thread index 90
INFO  [Thread-9]: CustomerLockSerializedExecutionTest - Adding 90
INFO  [Thread-9]: CustomerLockSerializedExecutionTest - Adding 91
INFO  [Thread-9]: CustomerLockSerializedExecutionTest - Adding 92
INFO  [Thread-6]: CustomerLockSerializedExecutionTest - Running thread index 60
INFO  [Thread-6]: CustomerLockSerializedExecutionTest - Adding 60
INFO  [Thread-6]: CustomerLockSerializedExecutionTest - Adding 61
INFO  [Thread-6]: CustomerLockSerializedExecutionTest - Adding 62
INFO  [Thread-8]: CustomerLockSerializedExecutionTest - Running thread index 80
INFO  [Thread-8]: CustomerLockSerializedExecutionTest - Adding 80
INFO  [Thread-8]: CustomerLockSerializedExecutionTest - Adding 81
INFO  [Thread-8]: CustomerLockSerializedExecutionTest - Adding 82
INFO  [Thread-7]: CustomerLockSerializedExecutionTest - Running thread index 70
INFO  [Thread-7]: CustomerLockSerializedExecutionTest - Adding 70
INFO  [Thread-7]: CustomerLockSerializedExecutionTest - Adding 71
INFO  [Thread-7]: CustomerLockSerializedExecutionTest - Adding 72
INFO  [Thread-5]: CustomerLockSerializedExecutionTest - Running thread index 50
INFO  [Thread-5]: CustomerLockSerializedExecutionTest - Adding 50
INFO  [Thread-5]: CustomerLockSerializedExecutionTest - Adding 51
INFO  [Thread-5]: CustomerLockSerializedExecutionTest - Adding 52
INFO  [Thread-0]: CustomerLockSerializedExecutionTest - Running thread index 0
INFO  [Thread-0]: CustomerLockSerializedExecutionTest - Adding 0
INFO  [Thread-0]: CustomerLockSerializedExecutionTest - Adding 1
INFO  [Thread-0]: CustomerLockSerializedExecutionTest - Adding 2
INFO  [Thread-4]: CustomerLockSerializedExecutionTest - Running thread index 40
INFO  [Thread-4]: CustomerLockSerializedExecutionTest - Adding 40
INFO  [Thread-4]: CustomerLockSerializedExecutionTest - Adding 41
INFO  [Thread-4]: CustomerLockSerializedExecutionTest - Adding 42
INFO  [Thread-3]: CustomerLockSerializedExecutionTest - Running thread index 30
INFO  [Thread-3]: CustomerLockSerializedExecutionTest - Adding 30
INFO  [Thread-3]: CustomerLockSerializedExecutionTest - Adding 31
INFO  [Thread-3]: CustomerLockSerializedExecutionTest - Adding 32
INFO  [Thread-2]: CustomerLockSerializedExecutionTest - Running thread index 20
INFO  [Thread-2]: CustomerLockSerializedExecutionTest - Adding 20
INFO  [Thread-2]: CustomerLockSerializedExecutionTest - Adding 21
INFO  [Thread-2]: CustomerLockSerializedExecutionTest - Adding 22
INFO  [Thread-1]: CustomerLockSerializedExecutionTest - Running thread index 10
INFO  [Thread-1]: CustomerLockSerializedExecutionTest - Adding 10
INFO  [Thread-1]: CustomerLockSerializedExecutionTest - Adding 11
INFO  [Thread-1]: CustomerLockSerializedExecutionTest - Adding 12
INFO  [main]: CustomerLockSerializedExecutionTest - Threads are done

As you can see, each thread is running randomly even if they all are scheduled to run simultaneously, and there is no number adding overlapping between those, so every thread adds its three numbers without interleaving the adding with some other thread.

Warning!

You should be aware of deadlocks, since we are holding a lock while executing a specific logic calling some non-private method, and that particular called logic might acquire some other lock, too.

Fortunately, this is not our case since our message pipeline goes from one end to the other so there is only one way of entering this processing logic.

Anyway, when multiple locks are acquired (e.g. A, B and C), it’s mandatory to always acquire those locks in the same order: A – B – C / A – B / B – C / A – C.

Combinations like A – B / B – A / A – B – C / C – B – A are forbidden since they may end up in a dead lock.

Also, I always try to avoid calling external API while holding a lock, since those may end up being slow (a long processing web service call) which may affect our processing scalability, since the lock would be kept for a long time. But external API calls may also acquire locks we are not aware of, increasing the chance of deadlock, if by any chance we are locking on the same objects as the external API.

Source code on GitHub


Published at DZone with permission of Vlad Mihalcea, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)