DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Analysis of Failure Modes in Producer-Consumer Systems
  • Understanding Kafka and Event-Driven Architecture [Video Tutorials]
  • AWS Redshift Data Sharing: Unlocking the Power of Collaborative Analytics
  • Understanding the Integration of Embedded Systems in Consumer Electronics

Trending

  • Building Resilient Identity Systems: Lessons from Securing Billions of Authentication Requests
  • Mastering Fluent Bit: Installing and Configuring Fluent Bit on Kubernetes (Part 3)
  • Can You Run a MariaDB Cluster on a $150 Kubernetes Lab? I Gave It a Shot
  • Ethical AI in Agile
  1. DZone
  2. Data Engineering
  3. Data
  4. Producers and Consumers - Part 3 Poison Pills

Producers and Consumers - Part 3 Poison Pills

By 
Roger Hughes user avatar
Roger Hughes
·
Mar. 18, 13 · Interview
Likes (2)
Comment
Save
Tweet
Share
29.5K Views

Join the DZone community and get the full member experience.

Join For Free

A couple of weeks ago I wrote part 2 of a short series of blogs on the Producer Consumer pattern. This blog focused upon the need to close down my Teletype’s worker thread, fixing a bug in the original code from part 1 of the series.

The idea here is that the Teletype’s worker thread can be controlled by a command from the application’s main thread. This command tells the worker thread to shutdown thus allowing the app the gracefully shutdown as demonstrated by the code below:

@Override
  public void run() {

    while (run) {

      try {
        Message message = queue.take();
        printHead.print(message.toString());
        messageCount++;
      } catch (InterruptedException e) {
        printHead.print("Teletype closing down...");
      }
    }
    printHead.print("Teletype Off.");
  }

  public void destroy() {
    run = false;
    thread.interrupt();
  }

In this sample, the main thread calls the destroy() method, which sets the run variable to false and interrupts the worker’s blocking call to queue.take().

However, there’s a problem with this idea in certain circumstances. For example, will suddenly terminating the consumer’s worker thread cause problems in other parts of the system? Will there be data loss as important messages in the queue don’t get processed? If the answer to these questions is ‘yes’ then there’s another approach you can take: use a Poison Pill.

Poison Pill is a rather melodramatic name for simply placing a certain, known, data item on the queue and when the consumer reads this item it closes down. Obviously, the poison pill has to be the last item placed on the queue or else the consumer will shut down prematurely.

This idea is great in simple systems with only one producer and consumer as shown below:


...but takes a little more thought when there are multiple producers with a single consumer as in my football match updates scenario:


...and could fall apart completely in the case of multiple produces and consumers:


... as ensuring that each consumer receives a poison pill at the right time and all the data in the queue gets processed could be quite tricky.

In this blog I’m updating my Teletype code to shut itself down once the two MatcherReporters have sent all their data. The first thing to do is to decide on the message that will act as a poison pill. In the snippet below you can see that I’ve inserted a message that contains the text “END OF FILE” at the end of the match update stream.
<value>95:30 END OF FILE</value>
  <value>95:00 Final Score  Fulham 0 - 1 Man Utd</value>
  <value>94:59 Full time The referee signals the end of the game.</value> 
I’ve inserted one of these messages into each set of game data.

The next thing to do is to modify the Teletype code adding a check for the poison pill message:
public class Teletype implements Runnable {

  private static final String POISON_PILL_MESSAGE = "END OF FILE";

  private final BlockingQueue<Message> queue;

  private final PrintHead printHead;

  private final int matchesPlayed;

  private volatile boolean run = true;

  private int pillsRecieved;

  public Teletype(PrintHead printHead, BlockingQueue<Message> queue, int matchesPlayed) {
    this.queue = queue;
    this.printHead = printHead;
    this.matchesPlayed = matchesPlayed;
  }

  public void start() {

    Thread thread = new Thread(this, "Studio Teletype");
    thread.start();
    printHead.print("Teletype Online.");
  }

  @Override
  public void run() {

    while (run) {

      try {
        Message message = queue.take();
        handleMessage(message);
      } catch (InterruptedException e) {
        printHead.print("Teletype closing down...");
      }
    }
    printHead.print("Teletype Off.");
  }

  private void handleMessage(Message message) {
    if (allGamesAreOver(message.getMessageText())) {
      run = false;
    } else {
      printHead.print(message.toString());
    }
  }

  private boolean allGamesAreOver(String messageText) {

    if (POISON_PILL_MESSAGE.equals(messageText)) {
      pillsRecieved++;
    }

    return pillsRecieved == matchesPlayed ? true : false;
  }

  @VisibleForTesting
  boolean isRunning() {
    return run;
  }
}
One of the most significant changes here is the addition of the matchesPlayed instance variable. This variable tells the Teletype how many MatchReporters there are supplying it with data. Ultimately this breaks the Producer Consumer pattern in that the consumer now knows about the rest of the system; however, it’s necessary because we need to ensure that the Teletype shuts down at the end of all the data. In a single producer/consumer one to one system this isn’t necessary.

The other big change in the Teletype code is to the run() loop. Once a message has been retrieved from the queue it’s passed to the new handleMessage(...) method. The handleMessage(...) method checks whether or not all the games it’s receiving data from are over by calling allGamesAreOver(...), which checks the message text against the poison pill string. If the message test is the poison pill string then the pillsRecieved counter is updated. If the pillsRecieved equals the matchesPlayed variable then all the all the games are over and allGamesAreOver(...) returns true. This sets the run instance variable to false and the worker thread’s run() method exits.

So that’s about it, the melodramatic Poison Pill pattern in a nutshell, next time Murder in the Red Barn.



The code for this sample is available on GitHub.

consumer producer Data (computing)

Published at DZone with permission of Roger Hughes, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Analysis of Failure Modes in Producer-Consumer Systems
  • Understanding Kafka and Event-Driven Architecture [Video Tutorials]
  • AWS Redshift Data Sharing: Unlocking the Power of Collaborative Analytics
  • Understanding the Integration of Embedded Systems in Consumer Electronics

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!