What are some ways to implement the producer-consumer pattern in concurrent programming?

Java all-round learning + interview guide: https://javaxiaobear.cn

We mainly learn how to use wait/notify/Condition/BlockingQueue to implement the producer-consumer pattern.

Producer-consumer model

Let’s first take a look at what the producer-consumer pattern is. The producer-consumer pattern is a very common design pattern in programming and is widely used in scenarios such as decoupling and message queues. In the real world, we call the party that produces goods a producer, and the party that consumes goods is called a consumer. Sometimes producers produce very fast, but consumers cannot keep up with their consumption speed. This is commonly known as “overcapacity” , or when multiple producers correspond to multiple consumers, everyone may be in a hurry. How can we make everyone cooperate better? At this time, an intermediary is needed between the producer and the consumer for scheduling, so the producer-consumer model was born.

Using the producer-consumer model usually requires adding a blocking queue as a media between the two. With the media, it is equivalent to a buffer, which balances the capabilities of the two. The overall design is as shown in the figure. The top is Blocking queue, 1 on the right is the producer thread, the producer stores the data in the blocking queue after producing the data, 2 on the left is the consumer thread, the consumer obtains the data in the blocking queue. The 3 and 4 in the middle respectively represent the process of communication between producers and consumers, because blocking may occur regardless of whether the blocking queue is full or empty. After blocking, the blocked thread needs to be awakened at the appropriate time.

So when does the blocking thread need to be awakened? There are two situations. The first situation is that when the consumer sees that the blocking queue is empty, it starts to wait. At this time, once the producer puts data into the queue, it will notify all consumers and wake up the blocked consumer thread. Another situation is that if the producer finds that the queue is full, it will also be blocked. Once the consumer obtains the data, it is equivalent to an empty position in the queue. At this time, the consumer will notify all blocking producers to produce. , this is a brief introduction to the producer-consumer model.

How to use BlockingQueue to implement the producer-consumer model

Let’s look at how to use wait/notify/Condition/BlockingQueue to implement the producer-consumer model. Let’s start with the simplest BlockingQueue:

public static void main(String[] args) {<!-- -->
 
  BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
 
 Runnable producer = () -> {<!-- -->
    while (true) {<!-- -->
          queue.put(new Object());
  }
   };
 
new Thread(producer).start();
new Thread(producer).start();
 
Runnable consumer = () -> {<!-- -->
      while (true) {<!-- -->
           queue.take();
}
   };
new Thread(consumer).start();
new Thread(consumer).start();
}

As shown in the code, first, a BlockingQueue of ArrayBlockingQueue type is created, named queue and its capacity is set to 10; secondly, a simple producer is created, and queue.put() in the while(true) loop body is responsible Add data to the queue; then, create two producer threads and start them; the consumer is also very simple. queue.take() in the while(true) loop body is responsible for consuming data, and two consumer threads are created and started at the same time. In order to keep the code concise and highlight the design ideas, the try/catch detection is omitted in the code, and we will not worry about some grammatical details. The above is the code that uses BlockingQueue to implement the producer-consumer pattern. Although the code is very simple, ArrayBlockingQueue has actually done a lot of work behind the scenes, such as blocking the producer thread when the queue is full, waking up the producer thread when the queue is free, etc.

How to use Condition to implement the producer-consumer pattern

BlockingQueue’s implementation of the producer-consumer model seems simple, but there are hidden secrets behind it. On the basis of mastering this method, we still need to master more complex implementation methods. Let’s next look at how to use Condition to implement the producer-consumer pattern based on mastering BlockingQueue. The implementation principles behind them are very similar, which is equivalent to implementing a simple version of BlockingQueue ourselves:

public class MyBlockingQueueForCondition {<!-- -->
 
   private Queue queue;
   private int max = 16;
   private ReentrantLock lock = new ReentrantLock();
   private Condition notEmpty = lock.newCondition();
   private Condition notFull = lock.newCondition();
 
 
   public MyBlockingQueueForCondition(int size) {<!-- -->
       this.max = size;
       queue = new LinkedList();
   }
 
   public void put(Object o) throws InterruptedException {<!-- -->
       lock.lock();
       try {<!-- -->
           while (queue.size() == max) {<!-- -->
               notFull.await();
           }
           queue.add(o);
           notEmpty.signalAll();
       } finally {<!-- -->
           lock.unlock();
       }
   }
 
   public Object take() throws InterruptedException {<!-- -->
       lock.lock();
       try {<!-- -->
           while (queue.size() == 0) {<!-- -->
               notEmpty.await();
           }
           Object item = queue.remove();
           notFull.signalAll();
           return item;
       } finally {<!-- -->
           lock.unlock();
       }
   }
}

As shown in the code, firstly, a queue variable queue is defined and the maximum capacity is set to 16; secondly, a Lock lock of ReentrantLock type is defined, and two Conditions are created based on the Lock lock, one is notEmpty and the other is notFull represents the conditions that the queue is not empty and not full respectively; finally, the two core methods put and take are declared.

Because the producer-consumer model usually faces multi-threaded scenarios, certain synchronization measures are required to ensure thread safety, so in the put method, the Lock is locked first, and then, in the while condition, it is checked whether the queue is full. If it is full, call await() of notFull to block the producer thread and release the Lock. If it is not full, put data into the queue and use notEmpty.signalAll() to notify all waiting consumers and wake them up. Finally, use the lock.unlock() method to unlock in finally. It is a basic principle to put the unlock method in finally. Otherwise, the lock may not be released.

Let’s look at the take method. The take method actually corresponds to the put method. It also checks whether the queue is empty through while. If it is empty, the consumer starts waiting. If it is not empty, it gets the data from the queue and notifies the producer. If there is a free space in the queue, it will be unlocked in finally.

It should be noted here that we use while( queue.size() == 0 ) in the take() method to check the queue status, but cannot use if( queue.size() == 0 ). why? Let’s think about this situation. Because producers and consumers are often multi-threaded, we assume that there are two consumers. When the first consumer thread obtains data, it finds that the queue is empty and enters the waiting state; because the first consumer thread The thread will release the Lock while waiting, so the second consumer can enter and execute if(queue.size() == 0) and find that the queue is empty, so the second thread also enters waiting; at this time, If the producer produces a piece of data, it will wake up two consumer threads, and only one of the two threads can get the lock and perform the queue.remove operation. The other thread is stuck in waking up because it does not get the lock. place, and after the first thread completes the operation, it will be unlocked through unlock in finally. At this time, the second thread can get the lock released by the first thread, continue to perform the operation, and also call queue. remove operation, however at this time the queue is already empty, so a NoSuchElementException exception will be thrown, which does not conform to our logic. If you use while to check, after the first consumer is awakened to obtain the lock and remove the data, the second thread will still perform a while check before executing remove, and find that queue.size() == 0 is still satisfied at this time. If the condition is met, the await method will continue to be executed to avoid the situation where the obtained data is null or an exception is thrown.

How to use wait/notify to implement the producer-consumer pattern

Finally, let’s take a look at how to use wait/notify to implement the producer-consumer pattern. In fact, the implementation principle is very similar to Condition. They are brothers:

class MyBlockingQueue {<!-- -->
 
   private int maxSize;
   private LinkedList<Object> storage;
 
   public MyBlockingQueue(int size) {<!-- -->
       this.maxSize = size;
       storage = new LinkedList<>();
   }
 
   public synchronized void put() throws InterruptedException {<!-- -->
       while (storage.size() == maxSize) {<!-- -->
           wait();
       }
       storage.add(new Object());
       notifyAll();
   }
 
   public synchronized void take() throws InterruptedException {<!-- -->
       while (storage.size() == 0) {<!-- -->
           wait();
       }
       System.out.println(storage.remove());
       notifyAll();
   }
}

As shown in the code, the most important parts are still the take and put methods. Let’s look at the put method first. The put method is protected by synchronized. The while checks whether the queue is full. If not, it puts data into it and wakes it up through notifyAll(). other threads. Similarly, the take method is also modified by synchronized, while checks whether the queue is empty, and if not, gets the data and wakes up other threads. The producer-consumer code implemented using this MyBlockingQueue is as follows:

/**
* Description: The wait form implements the producer-consumer model
*/
public class WaitStyle {<!-- -->
 
   public static void main(String[] args) {<!-- -->
       MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
       Producer producer = new Producer(myBlockingQueue);
       Consumer consumer = new Consumer(myBlockingQueue);
       new Thread(producer).start();
       new Thread(consumer).start();
   }
}
 
class Producer implements Runnable {<!-- -->
 
   private MyBlockingQueue storage;
 
   public Producer(MyBlockingQueue storage) {<!-- -->
       this.storage = storage;
   }
 
   @Override
   public void run() {<!-- -->
       for (int i = 0; i < 100; i + + ) {<!-- -->
           try {<!-- -->
               storage.put();
           } catch (InterruptedException e) {<!-- -->
               e.printStackTrace();
           }
       }
   }
}
 
class Consumer implements Runnable {<!-- -->
 
   private MyBlockingQueue storage;
 
   public Consumer(MyBlockingQueue storage) {<!-- -->
       this.storage = storage;
   }
 
   @Override
   public void run() {<!-- -->
       for (int i = 0; i < 100; i + + ) {<!-- -->
           try {<!-- -->
               storage.take();
           } catch (InterruptedException e) {<!-- -->
               e.printStackTrace();
           }
       }
   }
}

The above is an explanation of the three implementations of the producer-consumer model. Among them, the first BlockingQueue model is relatively simple to implement, but the implementation principles behind it are reflected in the second and third implementation methods. This implementation method is essentially that we implement some core logic of BlockingQueue for use by producers and consumers.