B.Tech in Computer Science and Enginering, interested in Distributed Systems. Currently working as a Software Development Engineer in Amazon Swaranga is a DZone MVB and is not an employee of DZone and has posted 4 posts at DZone. You can read more from them at their website. View Full User Profile

Solving a Producer-Consumer Problem in Java

05.14.2012
| 41726 views |
  • submit to reddit
The producer-consumer problem is one of the most frequently encountered problems when we attempt multi threaded programming. While not as challenging as some of the other problems in multi-threaded programming, an incorrect implementation of this problem can create a mess of your application. Produced items will be left unconsumed, starting items will be skipped, consumption depends on whether the production began earlier or later than consumption attempts etc. To add to this you might notice the anomalies long after it has actually happened and most importantly like almost all multi-threaded programs, this one is hard to debug and reproduce too.


So in this post I thought I would attempt to solve this problem in Java with the help of Java' awesome java.util.concurrent package and its classes.

First of all, let us see the characteristics of the producer-consumer problem:

  •     Producer(s) produce items.
  •     Consumer(s) consume the items produced by the producer(s).
  •     Producer(s) finish production and let the consumers know that they are done.


Note that in this producer-consumer problem the producer is running on a different thread than the ones on consumer. This setup makes sense in two cases:

    The steps to do the consumption of the item produced in independent and not dependent on other items.
    The time to process the items is larger that the time to produce them.

The term "larger" in the second point is used a bit loosely. Consider the case where producer reads a line from a file and the "consumption and processing" is just to log the line in a special format back to a file then the use of a producer consumer problem solution can be considered a case of over-engineering a solution. However if for each of those lines the "consumption and processing" step is to make a HTTP GET/POST request to a web-server and then dump the result somewhere then we should opt for a producer-consumer solution. In this case I am assuming that all the data to do a GET/POST is available in the line (item) itself and we are not dependent on previous/next lines.

So let us first see the take a look at the characteristics of the producer-consumer problem solution that I have posted below:

  •     There can be multiple producer.
  •     There will be multiple consumers.
  •     Once the production of new items is done the producer(s) will let the consumers know so that the consumer will exit after the last item is consumed and processed.


It is interesting to note that to solve this problem at a generic level we can address only the consumer side and not the producer side. This is because the production of items might be done at any time and there is very little that we can do in a generic way to control the production of items. We can, however control the consumer's behaviour while accepting items from producer(s). Having laid out the rules let us take a look at the consumer contract:

package com.maximus.producerconsumer;  
  
public interface Consumer  
{  
 public boolean consume(Item j);  
   
 public void finishConsumption();  
} 

Here the consumer can be shared between multiple producers of similar items; by similar items I mean producer that produces objects of type "Item". The definition if Item is as follows:

    package com.maximus.consumer;  
      
    public interface Item  
    {  
     public void process();  
    }

Now we take a look at an implementation of the Consumer interface:

    package com.maximus.consumer;  
      
    import java.util.LinkedList;  
    import java.util.List;  
    import java.util.concurrent.BlockingQueue;  
    import java.util.concurrent.ExecutorService;  
    import java.util.concurrent.Executors;  
    import java.util.concurrent.LinkedBlockingQueue;  
      
    public class ConsumerImpl implements Consumer  
    {  
     private BlockingQueue< Item > itemQueue =   
      new LinkedBlockingQueue< Item >();  
       
     private ExecutorService executorService =   
      Executors.newCachedThreadPool();  
       
     private List< ItemProcessor > jobList =   
      new LinkedList< ItemProcessor >();  
       
     private volatile boolean shutdownCalled = false;  
        
     public ConsumerImpl(int poolSize)  
     {  
      for(int i = 0; i < poolSize; i++)  
      {  
       ItemProcessor jobThread =   
        new ItemProcessor(itemQueue);  
         
       jobList.add(jobThread);  
       executorService.submit(jobThread);  
      }  
     }  
       
     public boolean consume(Item j)  
     {  
      if(!shutdownCalled)  
      {  
       try  
       {  
        itemQueue.put(j);  
       }  
       catch(InterruptedException ie)  
       {  
        Thread.currentThread().interrupt();  
        return false;  
       }  
       return true;  
      }  
      else  
      {  
       return false;  
      }  
     }  
       
     public void finishConsumption()  
     {  
      for(ItemProcessor j : jobList)  
      {  
       j.cancelExecution();  
      }  
        
      executorService.shutdown();  
     }  
    }  

Now the only point of interest is the ItemProcessor that the consumer internally uses to process the incoming items. ItemProcessor is coded as follows:

    package com.maximus.consumer;  
      
    import java.util.concurrent.BlockingQueue;  
    import java.util.concurrent.TimeUnit;  
      
    public class ItemProcessor implements Runnable  
    {  
     private BlockingQueue< Item> jobQueue;  
       
     private volatile boolean keepProcessing;  
        
     public ItemProcessor(BlockingQueue< Item > queue)  
     {  
      jobQueue = queue;  
      keepProcessing = true;  
     }  
       
     public void run()  
     {  
      while(keepProcessing || !jobQueue.isEmpty())  
      {  
       try  
       {  
        Item j = jobQueue.poll(10, TimeUnit.SECONDS);  
          
        if(j != null)  
        {  
         j.process();  
        }  
       }  
       catch(InterruptedException ie)  
       {  
        Thread.currentThread().interrupt();  
        return;  
       }  
      }  
     }  
       
     public void cancelExecution()  
     {  
      this.keepProcessing = false;  
     }  
    }   


The only challenge above is the condition in the while loop. The while loop is so written to support the continuation of the consumption of items even after the producer(s) have finished production and has notified the consumer that production is finished. The above while loop ensures that consumption of all the items is done before the threads exit.This will be the case when producers run faster that consumers.

The above consumer is thread-safe and can be shared multiple producers such that each producer may concurrently call consumer.consume() without bothering about synchronization and other multi-threading caveats. Producers just need to submit an implementation of the Item interface whose process() method will contain the logic of how the consumption will be done.

As a bonus for reading the post I put forward a test program that demonstrates how to use the above classes:

    package com.maximus.consumer;  
      
    import java.io.BufferedReader;  
    import java.io.File;  
    import java.io.FileInputStream;  
    import java.io.InputStreamReader;  
      
    public class Test  
    {  
     public static void main(String[] args) throws Exception  
            {  
             Consumer consumer = new ConsumerImpl(10);  
               
             BufferedReader br =   
              new BufferedReader(  
              new InputStreamReader(  
              new FileInputStream(  
              new File(args[0]))));  
               
             String line = "";  
               
             while((line = br.readLine()) != null)  
             {  
              System.out.println(  
               "Producer producing: " + line);  
              consumer.consume(new PrintJob(line));  
             }  
               
             consumer.finishConsumption();  
            }  
    }  
      
    class PrintJob implements Item  
    {  
     private String line;  
       
     public PrintJob(String s)  
     {  
      line = s;  
     }  
       
     public void process()  
     {  
      System.out.println(  
       Thread.currentThread().getName() +   
       " consuming :" + line);  
     }  
    }  


The above consumer can be tweaked in a host of different ways to make it more flexible. We can define what the consumer will do when production is done. It may be tweaked to allow batch processing but I leave that to the user. Feel free to use it and twist it in whatever way you want.

Happy coding!

Published at DZone with permission of Swaranga Sarma, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

Vitalii Tymchyshyn replied on Mon, 2012/05/14 - 11:19am

I don't get what do you get by implementing ExecutorService over ExecutorService? Here is your example without this new layer:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) throws Exception {
        ExecutorService consumer = Executors.newFixedThreadPool(10);

        BufferedReader br =
                new BufferedReader(
                new InputStreamReader(
                new FileInputStream(
                new File(args[0]))));

        String line = "";

        while ((line = br.readLine()) != null) {
            System.out.println(
                    "Producer producing: " + line);
            consumer.submit(new PrintJob(line));
        }

        consumer.shutdown();
        consumer.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    }
}

class PrintJob implements Runnable {
    private String line;

    public PrintJob(String s) {
        line = s;
    }

    @Override
    public void run() {
        System.out.println(
                Thread.currentThread().getName() +
                        " consuming :" + line);
    }
}

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.