wait-notify and protective pause design

wait-notify

A brief introduction to the principle

Simply understand the picture below:

  • Monitor The monitor is actually the object of the synchronized lock. In Java, each object can be associated with a Monitor object, and its instance is stored in the heap.
  • Thread-2 A certain thread is now the Owner of the Monitor, that is, it has obtained the current lock.
  • EntryList contains threads that are blocked and waiting because they have not competed for the lock.
  • WaitSet The thread that was previously competed for, but needs to wait because there are not enough resources

  • The OWNER thread finds that the conditions are not met and calls the wait method to enter the WaitSet and change to the Waiting state.
  • Both BLOCKED and WAITING threads are in a blocked state and do not occupy the CUP time slice.
  • The BLOCKED thread will be awakened when the OWNER thread releases the lock
  • The WAITING thread will wake up when the OWNER thread calls notify or notifyAll, but waking up does not mean acquiring the lock immediately. It still needs to enter the EntryList to compete again.

Basic API

  • obj.wait() lets the thread entering the Object monitor wait in waitSet

  • obj.wait(long timeout) Time-limited waiting, end waiting after n milliseconds, or wake up

  • obj.notify() randomly picks one of the threads waiting for waitSet on object to wake up

  • obj.ontifyAll() wakes up all threads waiting for waitSet on the object

They are all means of collaboration between threads, and they all belong to the methods of the Object object. The lock of this object must be obtained before these methods can be called.

You need to acquire the object lock before calling Lock object.wait() (only when you become the owner of the lock can you wait). notify randomly wakes up a thread, and notifyAll wakes up all threads to compete for the CPU.

Note: wait is a suspended thread, and all that need to be awakened are suspended operations. Blocked threads can compete for locks by themselves, and suspended threads need to wake up to compete for locks.

Compare sleep():

  • The principles are different: the sleep() method belongs to the Thread class and is used by the thread to control its own process, causing the thread to suspend execution for a period of time and giving the execution opportunity to other threads; the wait() method belongs to the Object class and is used for inter-thread communication.

  • The lock processing mechanism is different: during the process of calling the sleep() method, the thread will not release the object lock. When the wait() method is called, the thread will give up the object lock and enter the process of waiting for this object. Waiting for the lock pool (how can other threads seize the lock and perform wake-up operations without releasing the lock), but the CPU will be released

  • The usage areas are different: the wait() method must be used in synchronization control methods and synchronization code blocks (acquire the lock first), while the sleep() method can be used anywhere

Protective pause in sync mode

Definition

That is, Guarded Suspension, used when one thread waits for the execution result of another thread

Main points

  • There is a result that needs to be passed from one thread to another thread, so that they are associated with the same GuardedObject
  • If you have results that are constantly going from one thread to another then you can use a message queue (see Producer/Consumer)
  • In JDK, the implementation of join and the implementation of Future adopt this mode.
  • Because you have to wait for the result of the other party, it is classified into synchronous mode.

Single task version:

GuardedObject class:

class GuardedObject {
    // result object
    private Object response;
    // lock object
    private final Object lock = new Object();

    //Get results
    //timeout: maximum waiting time
    /**
    * get method, using the principle of wait, enters waiting after acquiring the lock
    */
    public Object get(long millis) {
        synchronized (lock) {
            // 1) Record the initial time
            long begin = System.currentTimeMillis();
            // 2) The time that has passed
            long timePassed = 0;
            // Keep looping until response is not empty or waiting time millis is up
            while (response == null) {
                // 4) Suppose millis is 1000, and the result is a false wake-up at 400, then there are still 600 to wait.
                long waitTime = millis - timePassed;
                log.debug("waitTime: {}", waitTime);
                //The elapsed time exceeds the maximum waiting time and exits the loop
                if (waitTime <= 0) {
                    log.debug("break...");
                    break;
                }
                try {
                    //The current thread enters waitSet and waits for wake-up
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 3) If woken up early, the time that has elapsed is assumed to be 400
                // The pass time of each while loop here needs to be recorded, and false wake-ups must be considered. If a thread has been falsely woken up, if the mills waiting time is not subtracted from the pass time, it will wait forever.
                timePassed = System.currentTimeMillis() - begin;
                log.debug("timePassed: {}, object is null {}",
                        timePassed, response == null);
            }
            return response;
        }
    }

    //produces result
    /**
    * The method of producing a result object. Every time a result is produced, all waiting threads in waitSet are awakened.
    */
    public void complete(Object response) {
        synchronized (lock) {
            // When the conditions are met, notify the waiting thread
            this.response = response;
            log.debug("notify...");
            lock.notifyAll();
        }
    }
}

test:

public static void main(String[] args) {
    GuardedObject object = new GuardedObject();
    new Thread(() -> {
        sleep(1);
        object.complete(Arrays.asList("a", "b", "c"));
    }).start();
    
    Object response = object.get(2500);
    if (response != null) {
        log.debug("get response: [{}] lines", ((List<String>) response).size());
    } else {
        log.debug("can't get response");
    }
}

Multi-tasking version:

Let’s take receipt (Peolel class for consumption requirements) and delivery (Postman class for production requirements) as examples.

GuardedObject:

Although at first glance, the multi-tasking mode of protective pause is very similar to the mq message queue, protective pause requires that consumption and production must be in one-to-one correspondence and cannot be reused.

So we need to add a member variable for unique identification to the GuardedObject class. Others are consistent with the single-task version.

class GuardedObject {
    //Identification, Guarded Object
    private int id;//Add get set method

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public GuardedObject(int id) {
        this.id = id;
    }

    //The result object to be returned
    private Object response;
    // lock object
    private final Object lock = new Object();

    //Get results
    //timeout: maximum waiting time
    public Object get(long millis) {
        synchronized (lock) {
            // 1) Record the initial time
            long begin = System.currentTimeMillis();
            // 2) The time that has passed
            long timePassed = 0;
            // If there is no corresponding result, continue looping
            while (response == null) {
                // 4) Suppose millis is 1000, and the result is a false wake-up at 400, then there are still 600 to wait.
                long waitTime = millis - timePassed;
                log.debug("waitTime: {}", waitTime);
                //The elapsed time exceeds the maximum waiting time and exits the loop
                if (waitTime <= 0) {
                    log.debug("break...");
                    break;
                }
                try {
                    lock.wait(waitTime); //The thread enters waitSet and blocks to wait.
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 3) If awakened early (real wake-up or false wake-up), the elapsed time is assumed to be 400
                // Be sure to record the waiting time of this while loop, otherwise if a wait thread is constantly being falsely awakened, the waiting time mills will be in vain.
                timePassed = System.currentTimeMillis() - begin;
                log.debug("timePassed: {}, object is null {}", timePassed, response == null);
            }
            return response;
        }
    }

    //produces result
    public void complete(Object response) {
        synchronized (lock) {
            // When the conditions are met, notify the waiting thread
            this.response = response;
            log.debug("notify...");
            lock.notifyAll(); // When there is a message, wake up all waiting blocking queues in waitSet
        }
    }
}

Mailboxes:

Mailboxes plays a multi-tasking role similar to a message center. It manages all GuardedObjects.

class Mailboxes {
    // In order to ensure thread safety, the implementation class is HashTable
    private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
    private static int id = 1;

    // Generate unique id
    private static synchronized int generateId() {
        return id + + ;
    }

    public static GuardedObject getGuardedObject(int id) {
        return boxes.remove(id);
    }

    public static GuardedObject createGuardedObject() {
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(), go);
        return go;
    }

    // Used to get the ids of all messages in the mailbox
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}

People:

Act as a consumer-like object, inherit the Thread class, and override the run method.

/**
 * Recipient (consumer)
 * Inherit the Thread class and override the run method
 */
@Slf4j(topic = "c.People")
class People extends Thread{
    @Override
    public void run() {
        //Receive mail
        // 1) Apply for a GuardedObject from Mailboxes
        GuardedObject guardedObject = Mailboxes.createGuardedObject();
        log.debug("Start receiving mail id:{}", guardedObject.getId());
        // 2) The thread enters waitSet and blocks to wait for the letter. It does not re-enter the blocking queue until waitTime or a message appears to compete for the CPU time slice.
        Object mail = guardedObject.get(5000);
        log.debug("Received letter id: {}, content: {}", guardedObject.getId(),mail);
    }
}

Postman:

/**
 * Sender (producer)
 * In the protective pause design mode, a People must correspond to a Postman
 * Inherit the Thread class and override the run method
 */
@Slf4j(topic = "c.Postman")
class Postman extends Thread{
    private int id;
    private String mail;

    public Postman(Integer id, String s) {
        this.id = id;
        this.mail = s;
    }

    //Construction method
    @Override
    public void run() {
        // Get the GuardedObject corresponding to the id of the item to be dispatched from Mailboxes
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("Start sending message id:{}, content:{}", guardedObject.getId(), mail);
        // Produce messages. After the complete method successfully produces messages, it will wake up all waiting threads of waitSet.
        guardedObject.complete(mail);
    }
}

test:

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i + + ) {
            //Three receiving threads, after opening, submit a receiving application to MailBoxes (Mailboxes.createGuardedObejt)
            new People().start();
        }
        // The main thread sleeps for a period of time to ensure that all receiving threads can submit receiving applications.
        Thread.sleep(1000);
        // Traverse Mailboexs, get the ID that needs to be sent, and send the letter
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id, id + "The express delivery number has arrived").start();
        }
    }

}