Hamlet D'Arcy has been writing software for over a decade, and has spent considerable time coding in C++, Java, and Groovy. He's passionate about learning new languages and different ways to think about problems, and recently he's been discovering the joys of both F# and Scheme. He's an active member of the Groovy Users of Minnesota and the Object Technology User Group, is a committer on the Groovy project, and is a contributor on a few open source projects (including JConch and the IDEA Groovy Plugin). He blogs regularly at http://hamletdarcy.blogspot.com and can be found on Twitter as HamletDRC (http://twitter.com/hamletdrc). Hamlet is a DZone MVB and is not an employee of DZone and has posted 28 posts at DZone. You can read more from them at their website. View Full User Profile

Java 5 + GPars: Throttling Action Processing

  • submit to reddit

An interesting question came up on the GPars mailing list today: In a system that generates events, what is the best way to throttle back event processing to one event per second? I thought about an answer... then thought some more... and finally decided to write it all up in this blog post. The example uses Groovy and GPars, but it is easily adapted to a generic Java solution. Don't let the actors scare you! (or the lack of semi-colons, for that matter).

The example is the classic "Sleeping Barber" problem (I hadn't heard of it either). Basically, there is a barbershop. The barber is asleep. Customers walk into the waiting room periodically, and the barber wakes up to give each of them a haircut. When he's done he returns to his slumber. It's a lesson in reaction: something is asleep, then awakens to do some work, then returns to sleep.

The GPars docs provide a decent Actor based solution to this problem: there is a waiting room and a barber, and both are actors. When the barber is free, a customer from the waiting room is moved into the barber's chair. The barber and waiting room communicate via actor messages. But what if our barber is a bit of a diva, and no matter how busy the shop gets, he wants to give one haircut every 15 minutes and never any more (otherwise, he might get burnt out you see). That is the throttling problem: how do you make sure events are processed (a haircut is given) no more than x number of times in a given time period?

My solution: keep a work queue, and have a scheduled executor pull work off the queue at a specified interval. Java 5 gives you all the tools to do this without resorting to busy waiting, polling, or writing scheduling code. The classes you need to know about are ArrayBlockingQueue and ScheduledThreadPoolExecutor.

ArrayBlockingQueue is a FIFO (First-In-First-Out) queue that supports blocking instead of busy waiting. When you take() an item from an ABQ, the call blocks until an item is available... no polling or sleeping to see if an item is ready to available. Just call take() and your code won't proceed until there is an element found.

The ScheduledThreadPoolExecutor supports executing both Runnable and Callable objects at a fixed interval. If you're looking to execute the same task every 1 second then STPE is what you need... Timer, for all intents and purposes, has been deprecated.

So here's the barber that just won't stand to be over-worked... setting it all up we need customers, a barber, and a waiting room:

class Customer {
String name

// waiting room can't hold more than 12!
def waitingRoom = new ArrayBlockingQueue(12)

def barber = Executors.newScheduledThreadPool(1)
println 'Barber: Next customer please!'
Customer customer = waitingRoom.take()
println "${customer.name} gets a haircut at ${new Date().format('H:m:s')}"
} as Runnable, 0, 15, TimeUnit.MINUTES)

Customer is a simple bean; nothing interesting here. The waiting room is an ArrayBlockingQueue filled with customers that need a haircut. And the barber is an executor service with a scheduled task to give haircuts. The number of threads in the scheduled thread pool is 1 because there's only one barber. The barber takes customers from the waiting room and cuts their hair once every 15 minutes. The call to waitingRoom.take() is blocking... if there is a customer ready then he is serviced immediately, and if one is not, then the call blocks until someone is available. Once thing to note... the waitingRoom has a size of 12... if a 13th customer is added then the calling code will either block until there is enough room or throw an exception. There is an API to do either case.

So how do customers get into the waiting room? That's where GPars actors come in. The barber shop is a "reactor" in GPars terminology. Messages can be sent to the barbershop ("Enter" the waiting room), and the reactor adds the customer to the waiting room. A maître d' of sorts. Here it is in action:

def barberShop = new PooledActorGroup().reactor {message ->
switch (message) {
case Enter:
println "${message.customer.name} waits for a haircut..."

class Enter {
Customer customer

barberShop << new Enter(customer: new Customer(name: 'Jerry'))
barberShop << new Enter(customer: new Customer(name: 'Phil'))
barberShop << new Enter(customer: new Customer(name: 'Bob'))
barberShop << new Enter(customer: new Customer(name: 'Ron'))

The barberShop is a PooledActorGroup, a GPars object, and the "actor framework" just means adding a closure to the reactor() method of that group. The closure, or actor, and responds to Enter messages by adding the customer to the waitingRoom. At the bottom you see the nice << syntax for posting events to the ActorGroup.

So there you have it. There are many ways to do this, but I think the Java 5 Concurrency libraries are some of the best options. I'd be interested to hear other ideas too. Now go give those hippies some haircuts!

From http://hamletdarcy.blogspot.com

Published at DZone with permission of Hamlet D'Arcy, author and DZone MVB.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)



Artur Biesiadowski replied on Mon, 2009/12/14 - 5:20am

From the title, I was hoping to see the article describing the techniques how to, for example, manage 50000 events per seconds coming on input with only 5000 per second possible to send outside...

Dean Pehrsson-c... replied on Mon, 2009/12/14 - 8:54am

Nice example. What if my barber finishes in 20 mins (customer wanted a shave too, presumably)? If I understand the code, a new barber will have been working in parallel for 5 minutes?

Hamlet D'Arcy replied on Mon, 2009/12/14 - 10:31am

@ae589 No, a second barber will not start. The thread pool is only a thread of 1. To add barbers, just make the thread pool size bigger. However, a take() request will queue up every 15 minutes, and if there are never any customers then the take() requests might build up and get too large.


@Artur Yes, this technique is good for something that happens once every 15 minutes, but not something once every 15 milliseconds! The overhead would be too much for small time increments.

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.