Performance Zone is brought to you in partnership with:

Lives in the UK. Likes blogging, cycling and eating lemon drizzle cake. Roger is a DZone MVB and is not an employee of DZone and has posted 143 posts at DZone. You can read more from them at their website. View Full User Profile

Producers and Consumers - Part 2 - Interrupting Worker Threads

02.26.2013
| 3404 views |
  • submit to reddit

This blog isn’t really about Producers and Consumers, I covered all that in my last blog. If you’ve not seen it then to recap it demonstrates the Producer Consumer pattern using the scenario of commentators reporting on football (soccer) games by putting updates on a queue. These updates are then read back in the TV studio by a Teletype and displayed on the viewer’s TV screen.



At the end of the blog I said “although the Teletype code works it contains a couple of neat little flaws in that there's no way of shutting it down and that it's not particularly testable”. This blog is about fixing those two problems.

The Teletype uses a simple worker thread to read messages from the queue and to display them on the screen. When creating a worker thread, it’s quite usual to include a while loop in the run() method and do the thread’s work within that loop. If you create an infinite while(true) loop then you have the additional problems of closing the thread down (breaking out of the infinite loop) and writing something that’ll test the infinite loop. So, the first thing to do is to never use infinite loops as I did in my broken Teletype code...
  @Override
  public void run() {

    while (true) {

      try {
        Message message = queue.take();
        printHead.print(message.toString());
      } catch (InterruptedException e) {
        // TODO add some real error handling here
        printHead.print("Teletype error - try switching it off and on.");
      }
    }

  }
The aim here is to find a way of closing the Teletype down and that means creating a way of busting out of the infinite while(true) loop and an easy way of doing that is exchanging while(true) for while(run) as demonstrated below:
public class Teletype implements Runnable {

  private final BlockingQueue<Message> queue;

  private final PrintHead printHead;

  private volatile boolean run = true;

  private Thread thread;

  private volatile int messageCount;

  public Teletype(PrintHead printHead, BlockingQueue<Message> queue) {
    this.queue = queue;
    this.printHead = printHead;
  }

  public void start() {

    thread = new Thread(this, "Studio Teletype");
    thread.start();
    printHead.print("Teletype Online.");
  }

  @Override
  public void run() {

    while (run) {

      try {
        Message message = queue.take();
        printHead.print(message.toString());
        messageCount++;
      } catch (InterruptedException e) {
        printHead.print("Teletype closing down...");
      }
    }
    printHead.print("Teletype Off.");
  }

  public void destroy() {
    run = false;
    thread.interrupt();
  }

  public int getMessageCount() {
    return messageCount;
  }
}

If you take a look at this code, you’ll see that it has new run instance variable that's used in the while(run) code. When start() is called and the worker thread executes, then the run() method takes a message from the queue and prints it in the usual way before looking for another message on the queue. The next big difference in the code is the addition of the new destroy() method. This method simply flips the run variable to false and then interrupts the worker thread. This interrupt() call causes the blocking call to queue.take() to throw an InterruptedException and return without a message. The exception is caught n the catch block and execution goes back to the while(run). As the run variable is now false the run() method exits.

The next step is to make the code more testable. Obviously, the best way of achieving this to to make it testable in the first place using Test Driven Development, rather then retro fitting testability; however, if retro fitting is what’s needed, then retro-fitting is what we do.

The final change to the Teletype code is the messageCount functionality. Every time it picks a message from the queue, the count is incremented. This is a useful statistic that can be accessed by getMessageCount() and is used by the following unit test code.
public class TeletypeTest {

  private BlockingQueue<Message> queue;
  private Teletype instance;
  private PrintHead printhead;

  @Before
  public void setUp() throws Exception {

    printhead = mock(PrintHead.class);
    queue = new LinkedBlockingQueue<Message>();
    instance = new Teletype(printhead, queue);
  }

  @Test
  public void testTeletype_with_two_messages_in_queue() throws InterruptedException {

    int numMessages = initializeQueueWithMessages();

    instance.start();

    synchWithTestInstanceThread(numMessages);

    instance.destroy();

    // assert that we didn't time out.
    assertEquals(numMessages, instance.getMessageCount());
    verify(printhead, times(5)).print(anyString());
  }

  private int initializeQueueWithMessages() {
    List<Message> messages = getTestMessages();
    queue.addAll(messages);
    int numMessages = messages.size();
    return numMessages;
  }

  private List<Message> getTestMessages() {

    List<Message> messages = new ArrayList<Message>();
    Message message = new Message("name", 1L, "String messageText", "String matchTime");
    messages.add(message);
    message = new Message("name", 2L, "String messageText", "String matchTime");
    messages.add(message);

    return messages;
  }

  private void synchWithTestInstanceThread(int numMessages) throws InterruptedException {

    // Synchronize on the number of messages
    // This will wait for 1/2 a second at most and then timeout
    for (int i = 0; (i < 5) && (instance.getMessageCount() < numMessages); i++) {

      Thread.sleep(100);
    }
  }
}
The unit test code for the Teletype first calls initializeQueueWithMessages(...) to add couple of messages to the queue before calling the Teletype’s start() method. The start() method kicks off the worker thread whose run() method will read and display any messages it finds on the queue. Meanwhile, the main thread waits for the worker thread to complete its tasks by calling synchWithTestInstanceThread(..). Once this method returns the Teletype is shutdown via a call to destroy(). All that’s left to do is to check that synchWithTestInstanceThread(..) didn’t timeout with a call to assert that the expected number of messages were read from the queue using:
    assertEquals(numMessages, instance.getMessageCount());
...and to verify that the messages were displayed on the screen using Mockito’s
    verify(printhead, times(5)).print(anyString())

In this case I’ve added the messageCount statistic solely for the purpose of making the Teletype class easier to test. In talking to various colleagues I do sense that to some the idea of adding code to a class in order to make it testable is a fairly contentious issue. I have no problem with this as one of the big ideas about Spring and dependency injection is that you can write code that is easily testable and if adding a little extra code to a class makes it testable then so be it. I’d rather have a class that’s a little heavier that is fully tested and works than a super lightweight class that fails. Besides, if this was a real world situation and I was putting a Teletype live on a machine somewhere then I’d want to include a whole bunch of useful statistics so that I and/or the Ops guys could check its status at any time.
The final clump of code to consider is the main(...) method as shown below.
 public static void main(String[] args) throws InterruptedException {

    System.out.println("Producer Consumer Demo Code...");
    ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("context2.xml");

    // Wait until all matches are over.
    Thread.sleep(98000);

    ctx.close();
    System.out.println("Games Over");
  }
This has been slightly modified in for the original version in my last blog. In this case after the Spring context is loaded the main thread waits for the games to end. Once they’re over the context is closed ensuring that any default callback methods are called. In this case the important default callback method is the destroy() method on the Teletype class, which, as shown above, turns the Teletype off.

Finally the main thrust of this blog has been about adapting the Teletype class giving it the ability to close down gracefully when asked by setting the run flag to false and interrupting the worker thread. The problem with this is that the termination of the worker thread depends upon an interaction with the main thread; something that can be undesirable. There is, however, another way of terminating the Teletype known as Poison Pill pattern, but more on that next time.

The code for this sample is available on GitHub.

 

Published at DZone with permission of Roger Hughes, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)