Java 5 + GPars: Throttling Action Processing
An interesting question came up on the GPars
mailing list today: In a system that generates events, what is the best
way to throttle back event processing to one event per second? I
thought about an answer... then thought some more... and finally
decided to write it all up in this blog post. The example uses Groovy
and GPars, but it is easily adapted to a generic Java solution. Don't
let the actors scare you! (or the lack of semi-colons, for that matter).
The example is the classic "Sleeping Barber"
problem (I hadn't heard of it either). Basically, there is a
barbershop. The barber is asleep. Customers walk into the waiting room
periodically, and the barber wakes up to give each of them a haircut.
When he's done he returns to his slumber. It's a lesson in reaction:
something is asleep, then awakens to do some work, then returns to
sleep.
The GPars docs provide a decent Actor based solution
to this problem: there is a waiting room and a barber, and both are
actors. When the barber is free, a customer from the waiting room is
moved into the barber's chair. The barber and waiting room communicate
via actor messages. But what if our barber is a bit of a diva, and no
matter how busy the shop gets, he wants to give one haircut every 15
minutes and never any more (otherwise, he might get burnt out you see).
That is the throttling problem: how do you make sure events are
processed (a haircut is given) no more than x number of times in a
given time period?
My solution: keep a work queue, and have a
scheduled executor pull work off the queue at a specified interval.
Java 5 gives you all the tools to do this without resorting to busy
waiting, polling, or writing scheduling code. The classes you need to
know about are ArrayBlockingQueue and ScheduledThreadPoolExecutor.
ArrayBlockingQueue
is a FIFO (First-In-First-Out) queue that supports blocking instead of
busy waiting. When you take() an item from an ABQ, the call blocks
until an item is available... no polling or sleeping to see if an item
is ready to available. Just call take() and your code won't proceed
until there is an element found.
The ScheduledThreadPoolExecutor supports executing both Runnable and Callable
objects at a fixed interval. If you're looking to execute the same task
every 1 second then STPE is what you need... Timer, for all intents and
purposes, has been deprecated.
So here's the barber that just
won't stand to be over-worked... setting it all up we need customers, a
barber, and a waiting room:
class Customer {
String name
}
// waiting room can't hold more than 12!
def waitingRoom = new ArrayBlockingQueue(12)
def barber = Executors.newScheduledThreadPool(1)
barber.scheduleAtFixedRate({
println 'Barber: Next customer please!'
Customer customer = waitingRoom.take()
println "${customer.name} gets a haircut at ${new Date().format('H:m:s')}"
} as Runnable, 0, 15, TimeUnit.MINUTES)
Customer
is a simple bean; nothing interesting here. The waiting room is an
ArrayBlockingQueue filled with customers that need a haircut. And the
barber is an executor service with a scheduled task to give haircuts.
The number of threads in the scheduled thread pool is 1 because there's
only one barber. The barber takes customers from the waiting room and
cuts their hair once every 15 minutes. The call to waitingRoom.take()
is blocking... if there is a customer ready then he is serviced
immediately, and if one is not, then the call blocks until someone is
available. Once thing to note... the waitingRoom has a size of 12... if
a 13th customer is added then the calling code will either block until
there is enough room or throw an exception. There is an API to do
either case.
So how do customers get into the waiting room?
That's where GPars actors come in. The barber shop is a "reactor" in
GPars terminology. Messages can be sent to the barbershop ("Enter" the
waiting room), and the reactor adds the customer to the waiting room. A
maƮtre d' of sorts. Here it is in action:
def barberShop = new PooledActorGroup().reactor {message ->
switch (message) {
case Enter:
println "${message.customer.name} waits for a haircut..."
waitingRoom.add(message.customer)
break
}
}
class Enter {
Customer customer
}
barberShop << new Enter(customer: new Customer(name: 'Jerry'))
barberShop << new Enter(customer: new Customer(name: 'Phil'))
barberShop << new Enter(customer: new Customer(name: 'Bob'))
barberShop << new Enter(customer: new Customer(name: 'Ron'))
The
barberShop is a PooledActorGroup, a GPars object, and the "actor
framework" just means adding a closure to the reactor() method of that
group. The closure, or actor, and responds to Enter messages by adding
the customer to the waitingRoom. At the bottom you see the nice
<< syntax for posting events to the ActorGroup.
So there
you have it. There are many ways to do this, but I think the Java 5
Concurrency libraries are some of the best options. I'd be interested
to hear other ideas too. Now go give those hippies some haircuts!
- Login or register to post comments
- 6998 reads
- Printer-friendly version
(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)




Comments
Artur Biesiadowski replied on Mon, 2009/12/14 - 5:20am
Dean Pehrsson-c... replied on Mon, 2009/12/14 - 8:54am
Hamlet D'Arcy replied on Mon, 2009/12/14 - 10:31am
@ae589 No, a second barber will not start. The thread pool is only a thread of 1. To add barbers, just make the thread pool size bigger. However, a take() request will queue up every 15 minutes, and if there are never any customers then the take() requests might build up and get too large.
@Artur Yes, this technique is good for something that happens once every 15 minutes, but not something once every 15 milliseconds! The overhead would be too much for small time increments.