Multi-threading case–>Blocking queue

1. What is a blocking queue

1. The blocking queue is a special queue that also adheres to the “first in, first out” principle.

2. The blocking queue can be a thread-safe data structure with the following characteristics:

  • When the queue is full, if you continue to queue, it will be blocked until other threads take elements from the queue.
  • When the queue is empty, continuing to dequeue will also cause blocking until other threads insert elements into the queue.

2. Producer-consumer model

  • The producer-consumer pattern solves the strong coupling problem between producers and consumers through a container.
  • Producers and consumers do not communicate directly with each other, but communicate through blocking queues.

2.1 Examples

Just like the project of making dumplings, we need to knead the dough, roll out the dumpling wrappers, and wrap the fillings.

At this time, if there are three people responsible for this project and there is only one rolling pin, we have the following two divisions of labor:

1. Everyone is responsible for their own responsibility, roll out their own dumpling wrappers and make one dumpling after rolling out one sheet [inefficient]

2. One person is responsible for rolling out the dumpling wrappers, and the other two are responsible for wrapping. This is a producer-consumer model. At this time, the person rolling out the dough is the producer, and the person making the dumplings is the consumer. The tool is the trading place. [Efficient]

2.2 Advantages

2.2.1 Decoupling

  • Decoupling means “reducing the coupling between modules” (the stronger the connection between modules, the higher the coupling)

First, let’s look at a distributed system:

  • At this time, if you want to recharge, A will directly send the request to B. At this time, the direct coupling between A and B is more obvious.
  • If server B is down, it will have a great impact on server A, and vice versa.
  • And if you want to add another server C, you need to make major changes to the code of A.

At this time, introducing the producer-consumer model and introducing a blocking queue can effectively solve the above problems:

  • At this time, A and B interact through the blocking queue, which can be well decoupled.
  • If A or B fails, there is no direct interaction between them, so there is no significant impact.
  • And if you want to add a new server C, server A does not need any modification at all, just let C take elements from the queue.

2.2.2 Peak shaving and valley filling

  • There is an upper limit to the number of requests that a server can handle at the same time. Each request processed requires a certain amount of hardware resources. Different servers have different configurations (different hardware resources provided) and handle different tasks (each request consumes resources are different), and the upper limit of commitment is also different. (Just like our school’s academic administration system, it collapses as soon as classes are taken)
  • In a distributed system, it often happens that some servers bear greater pressure and some have less pressure.

Let’s look at a distributed system:

  • At this time, every time A receives a request, B needs to process a request immediately, which may cause B to hang up first.

Introduce blocking queue:

  • At this time, when external requests suddenly surge and A receives more requests, A will write more data to the queue, but B can still process the requests according to the established rhythm without hanging up.
  • This blocking queue acts as a buffer, taking the pressure that B would have endured (peak clipping)
  • Of course, the peak is only temporary. When the peak subsides, A receives fewer requests, and B still processes requests according to the established rhythm, so as not to be too idle (filling the valley)

Give a typical example:

Three Gorges Dam:

When the upstream water volume surges, the dam blocks the flood peak and ensures that the downstream water is released at a relatively gentle rate (peak shaving).

When the amount of water in the upstream decreases sharply, the dam can open the gate and release water to ensure the normal supply of water downstream (grain filling).

3. Message Queue

  • From the above description, we can see the importance of the producer-consumer model. Although the blocking queue is just a data structure, this data structure can be implemented as a separate server program and deployed using a separate host/host cluster. At this time , this so-called blocking queue evolved into a “message queue”
  • The message queue adds “message type” on the basis of the blocking queue, and performs first-in, first-out according to the specified type.

4. Use blocking queue

The Java standard library provides the implementation of blocking queues. Below we write a simple producer-consumer model based on blocking queues:

//Producer-consumer model
public class demo2 {
    public static void main(String[] args) {
        //Blocking queue, used as a trading place
        BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();

        //Responsible for producing elements
        Thread t1 = new Thread(() ->{
           int count = 0;
           while (true){
               try {
                   queue.put(count);
                   System.out.println("Production elements: " + count);

                   count + + ;

                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }

           }
        });

        //Responsible for consuming elements
        Thread t2 = new Thread(() ->{
            while (true){
                try {
                    Integer n = queue.take();
                    System.out.println("Consumer element: " + n);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t1.start();
        t2.start();
    }
}

The running results are as follows:

Notice:

  • The implementation of blocking queue can be based on linked list, heap or array.
  • ArrayBlockingQueue is faster, but only if you know the maximum number of elements (frequent expansion is expensive).
  • If you don’t know how many elements there are, it is more appropriate to use LinkedBlockingQueue.

5. Implement blocking queue

Implement a circular queue based on an array to implement a blocking queue

The implementation of the circular queue has been introduced in detail in the data structure column. I will not explain it too much here and make slight changes and additions.

5.1 Circular Queue

Here we use the method of “use a separate variable to represent the current number of elements” to determine whether the queue is full.

class MyBlockingQueue {
    //Use an array of type String to save elements. Assume that only String is stored here.
    private String[] items = new String[1000];
    private int head = 0;
    //Point to the head of the queue

    private int tail = 0;
    // Points to the next element at the tail of the queue. In general, the range of valid elements in the queue [head, tail)
    // When head and tail are equal (coincident), it is equivalent to an empty queue.

    private int size = 0;
    // Use size to represent the number of elements.


    //Enqueue
    public void put(String elem) throws InterruptedException {
    
            if(size >= items.length) {
                //The queue is full.
                return;
            }
            items[tail] = elem;
            tail + + ;
            if (tail >= items.length) {
                tail = 0;
            }
            size + + ;
    }


    // dequeue
    public String take() throws InterruptedException {
   
            if (size == 0) {
                // The queue is empty and cannot be dequeued temporarily.
                return null;
            }
            String elem = items[head];
            head + + ;
            if (head >= items.length) {
                head = 0;
            }
            size--;
            return elem;
    }
}

The code shown in the figure below is used here to perform the operation of returning to the beginning when tail reaches the end.

Before, we used the operation tail=(tail + 1)% item.length. Now we use the above code, which has two benefits:

1. Make the written code more efficient in development (easy to read, understand, and modify)

2. Make the written code run more efficiently (faster execution)

5.2 Transformed into blocking queue

5.2.1 Thread safety

5.2.1.1 Locking
class MyBlockingQueue{
    //Use an array of String type to save elements
    private String[] items = new String[1000];
    private int head = 0;//points to the head of the queue
    private int tail = 0;//Points to the next element at the end of the queue
    //The range of valid elements in the queue [head, tail)
    //When head and tail are equal (coincident), it is equivalent to an empty queue
    //Use size to represent the number of elements
    private int size = 0;

    //Queue
    public void put(String elem){
        //Equivalent to writing synchronized directly to the method
        synchronized (this){
            if (size >= items.length){
                //The queue is full
                return;
            }

            items[tail] = elem;
            tail + + ;
            if (tail >= items.length){
                tail = 0;
            }
            size + + ;
        }

    }

    //dequeue
    public String take(){
        synchronized (this){
            if (size == 0){
                //Queue is empty
                return null;
            }
            String elem = items[head];
            head + + ;
            if (head >= items.length){
                head = 0;
            }
            size--;
            return elem;
        }

    }

}
5.2.1.2 Memory Visibility
class MyBlockingQueue{
    //Use an array of String type to save elements
    private String[] items = new String[1000];
    volatile private int head = 0;//points to the head of the queue
    volatile private int tail = 0;//Points to the next element at the end of the queue
    //The range of valid elements in the queue [head, tail)
    //When head and tail are equal (coincident), it is equivalent to an empty queue
    //Use size to represent the number of elements
    volatile private int size = 0;

    //Queue
    public void put(String elem){
        //Equivalent to writing synchronized directly to the method
        synchronized (this){
            if (size >= items.length){
                //The queue is full
                return;
            }

            items[tail] = elem;
            tail + + ;
            if (tail >= items.length){
                tail = 0;
            }
            size + + ;
        }

    }

    //dequeue
    public String take(){
        synchronized (this){
            if (size == 0){
                //Queue is empty
                return null;
            }
            String elem = items[head];
            head + + ;
            if (head >= items.length){
                head = 0;
            }
            size--;
            return elem;
        }

    }

}

5.2.2 Implement blocking

a) When the queue is full, another put will cause blocking.

b) When the queue is empty, taking again will cause blocking.

class MyBlockingQueue{
    //Use an array of String type to save elements
    private String[] items = new String[1000];
    volatile private int head = 0;//points to the head of the queue
    volatile private int tail = 0;//Points to the next element at the end of the queue
    //The range of valid elements in the queue [head, tail)
    //When head and tail are equal (coincident), it is equivalent to an empty queue
    //Use size to represent the number of elements
    volatile private int size = 0;

     //Enqueue
    public void put(String elem) throws InterruptedException {
        // The writing method here is equivalent to writing synchronized directly into the method.
        synchronized (locker) {
            if (size >= items.length) {
                //The queue is full.
                // return;
                locker.wait();
            }
            items[tail] = elem;
            tail + + ;
            if (tail >= items.length) {
                tail = 0;
            }
            size + + ;
            // Used to wake up the blocking situation when the queue is empty
            locker.notify();
        }
    }

    // dequeue
    public String take() throws InterruptedException {
        synchronized (locker) {
            if (size == 0) {
                // The queue is empty and cannot be dequeued temporarily.
                // return null;
                locker.wait();
            }
            String elem = items[head];
            head + + ;
            if (head >= items.length) {
                head = 0;
            }
            size--;
            // Use this notify to wake up the blocking situation when the queue is full
            locker.notify();
            return elem;
        }
    }

}

Let’s think about a question. In the above code, wait is performed when the conditions are met. Then when wait is awakened, the conditions must be met? (For example, if the wait in the put operation is awakened, then the queue must be full at this time. Is it possible it’s still full?)

of course not!!!

  • In the current code, wait can also be awakened by interrupt. In this case, an exception will be caused directly and execution will not continue.
  • But if it is written according to the try catch method, once the interrupt wakes up, the code will go down and enter the catch. After the catch is executed, the method will not end. If the execution continues downward, the “cover element” will be triggered. logic.

So how to change it?

  • After wait is awakened, the condition is determined again!!! If the condition is still full at this time, continue waiting. If not, you can continue execution.
  • Here, with the help of while loop, we cleverly realize that after wait wakes up, we confirm the conditions again.

The final complete code is as follows:

class MyBlockingQueue {
    //Use an array of type String to save elements. Assume that only String is stored here.
    private String[] items = new String[1000];
    //Point to the head of the queue
    volatile int head = 0;
    // Points to the next element at the tail of the queue. In general, the range of valid elements in the queue [head, tail)
    // When head and tail are equal (coincident), it is equivalent to an empty queue.
    volatile private int tail = 0;
    // Use size to represent the number of elements.
    volatile private int size = 0;

    private Object locker = new Object();

    //Enqueue
    public void put(String elem) throws InterruptedException {
        // The writing method here is equivalent to writing synchronized directly into the method.
        synchronized (locker) {
            while (size >= items.length) {
                //The queue is full.
                // return;
                locker.wait();
            }
            items[tail] = elem;
            tail + + ;
            if (tail >= items.length) {
                tail = 0;
            }
            size + + ;
            // Used to wake up the blocking situation when the queue is empty
            locker.notify();
        }
    }

    // dequeue
    public String take() throws InterruptedException {
        synchronized (locker) {
            while (size == 0) {
                // The queue is empty and cannot be dequeued temporarily.
                // return null;
                locker.wait();
            }
            String elem = items[head];
            head + + ;
            if (head >= items.length) {
                head = 0;
            }
            size--;
            // Use this notify to wake up the blocking situation when the queue is full
            locker.notify();
            return elem;
        }
    }
}


public class Demo20 {
    public static void main(String[] args) throws InterruptedException {
        //Create two threads, representing producers and consumers
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread t1 = new Thread(() -> {
            int count = 0;
            while (true) {
                try {
                    queue.put(count + "");
                    System.out.println("Production elements: " + count);
                    count + + ;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread t2 = new Thread(() -> {
            while (true) {
                try {
                    String count = queue.take();
                    System.out.println("Consuming elements: " + count);

                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t1.start();
        t2.start();
    }
}

The running results are as follows: