Java multi-threaded programming-inter-thread collaboration wait/notify

Foreword:

This article is based on my personal understanding of Chapter 5 of “Java Multi-Threaded Programming Practical Guide – Core Chapter”. The source code is excerpted from the author’s source code, and my own understanding will be added to the source code. The reading notes are currently being updated by the author as follows: “Java Multi-Threaded Programming Practical Guide – Core Chapter”, “How Tomcat Works”, and then “spring source code” interpretation.

Waiting and notification: wait/notify

In single-threaded programming, if the program needs to operate a target action when certain conditions are met, an if statement is needed. When handling this situation in multi-threaded programming, the protection condition may only be temporary, and other threads may update it later. The protection condition is designed with shared variables to make it true, so the current thread can be suspended and waked up to continue the operation until the protection condition is true. The pseudocode is as follows:

//Atomic operation
atomic {

   while (protection condition is not established) {
        Pause the current thread;
   }

   doAction();
}

If written in Java code form:

synchronized(someObject) {
    while (protection condition is not established) {
        someObject.wait();
   }
   doAction();
}

My personal understanding here is that one thing to note is that the above operations need to be atomic.

Think about it from another angle, what would happen if the above operation is not atomic (that is, comment out the synchronize line), and break down the steps:

1. When the code runs to someObject.wait(), the current thread will be suspended and the someObject internal lock held will be released. The thread life cycle will enter the waiting state. At this time, the someObject.wait() statement will not return until Other threads call someObject.notifiy().

2. When other threads call someObject.notify() and update the protection conditions, notify will wake up the waiting thread, someObject.wai() will apply for the internal lock of someObject, hold the internal lock, and the statement will return.

3.while re-judges the protection condition, but because the operation is not atomic, that is, during the second to third steps, other threads may change the protection condition, making the while condition not true again, so wait is entered again. statement.

4. The same applies to doAction. It must be ensured that the protection condition is established before executing the target action. Otherwise, other threads may update the sharing just before executing doAction, causing the protection condition to become invalid again.

Therefore, the above while statement judgment, as well as doAction and wait calls need to be placed in the critical section guided by the same object lock.

Use Object.notify() to notify, as shown in the following pseudo code:

synchronized(someObject){
    //Update the shared variables designed by the protection conditions of the waiting thread
    updateShareState();

    //Wake up other threads
    someObject.notify();
}

It includes updating shared variables and waking up other threads. Since a thread can only execute the object’s notify if it holds the internal lock of an object, this is why the first step of the wait statement just mentioned will release the object’s internal lock, otherwise notify cannot be called. For details, please refer to the pseudocode of the internal implementation of wait below.

Another thing to note about notify is that it should be placed as close to the end of the critical section as possible, that is, the sentence in the above code before the curly brace near the end. This is because when notify is called, the waiting thread will be awakened, but notify itself The internal lock will not be released, so if it is not close, the waiting thread may not be able to get the internal lock and be suspended again.

Wait internal implementation pseudocode

Public void wait() {

    //The execution thread must hold the internal lock corresponding to the current object
    if(!Thread.holdsLock(this)){
        Throws new IllegalMonitorStateException();
    }

    if(the current object is not in the waiting set){
    //Add the current thread to the current object waiting set

       addToWaitSet(Thread.currentThread());
    }

    atomic{//Atomic operation starts
        //Release the internal lock of the current object

        releaseLock(this);
        //Pause the current thread
        block(Thread.currentThread());

    }//Atomic operation ends

    //Apply again for the internal lock of the current object
    acquireLock(this);
    //Remove the current thread from the wait and wait of the current object
    removeFromWaitSet(Thread.currentThread());
    return;//return
}

Practical cases

The book includes a practical case of wait/notify. In fact, I personally feel that this practical case is not very good.

The case is that a distributed system has an alarm system, which reports the alarm information and sends it to the alarm server through a network connection.

AlarmAgent maintains two working threads internally: one working thread is responsible for establishing a network connection with the alarm server, which is the network connection thread, and the other working thread is responsible for regularly checking the network connection between the alarm agent and the alarm server, which is the heartbeat thread.

public class CaseRunner5_1 {
  final static AlarmAgent alarmAgent;
  static {
    alarmAgent = AlarmAgent.getInstance();
    alarmAgent.init();
  }

  public static void main(String[] args) throws InterruptedException {

    alarmAgent.sendAlarm("Database offline!");
    Tools.randomPause(12000);
    alarmAgent.sendAlarm("XXX service unreachable!");
  }
}
import java.util.Random;


public class AlarmAgent {
    //Save the only instance of this class
    private final static AlarmAgent INSTANCE = new AlarmAgent();
    // Whether to connect to the alarm server
    private boolean connectedToServer = false;
    //Heartbeat thread, used to detect whether the network connection between the alarm agent and the alarm server is normal
    private final HeartbeartThread heartbeatThread = new HeartbeartThread();

    private AlarmAgent() {
        // do nothing
    }

    public static AlarmAgent getInstance() {
        return INSTANCE;
    }

    public void init() {
        connectToServer();
        heartbeatThread.setDaemon(true);
        heartbeatThread.start();
    }

    private void connectToServer() {
        //Create and start a network connection thread, and establish a connection with the alarm server in this thread
        new Thread() {
            @Override
            public void run() {
                doConnect();
            }
        }.start();
    }

    private void doConnect() {
        // Simulate the actual operation time
        Tools.randomPause(100);
        synchronized (this) {
            connectedToServer = true;
            // The connection has been established, notification to wake up the alarm sending thread
            notify();
        }
    }

    public void sendAlarm(String message) throws InterruptedException {
        synchronized (this) {
            // Make the current thread wait until the connection between the alarm agent and the alarm server is established or restored
            while (!connectedToServer) {

                Debug.info("Alarm agent was not connected to server.");

                wait();
            }

            // Actually report the alarm message to the alarm server
            doSendAlarm(message);
        }
    }

    private void doSendAlarm(String message) {
        // ...
        Debug.info("Alarm sent:%s", message);
    }

    //Heartbeat thread
    class HeartbeartThread extends Thread {
        @Override
        public void run() {
            try {
                // Leave a certain amount of time for the network connection thread to establish a connection with the alarm server
                Thread.sleep(1000);
                while (true) {
                    if (checkConnection()) {
                        connectedToServer = true;
                    } else {
                        connectedToServer = false;
                        Debug.info("Alarm agent was disconnected from server.");

                        // Detect connection interruption, re-establish connection
                        connectToServer();
                    }
                    Thread.sleep(2000);
                }
            } catch (InterruptedException e) {
                // do nothing;
            }
        }

        // Detect the network connection with the alarm server
        private boolean checkConnection() {
            boolean isConnected = true;
            final Random random = new Random();

            // Simulate random network disconnection
            int rand = random.nextInt(1000);
            if (rand <= 500) {
                isConnected = false;
            }
            return isConnected;
        }
    }
}
import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.Date;

public class Debug {
    private static ThreadLocal<SimpleDateFormat> sdfWrapper = new ThreadLocal<SimpleDateFormat>() {
        @Override
        protected SimpleDateFormat initialValue() {
            return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        }

    };

    enum Label {
        INFO("INFO"),
        ERR("ERROR");
        String name;

        Label(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }

    // public static void info(String message) {
    // printf(Label.INFO, "%s", message);
    // }

    public static void info(String format, Object... args) {
        printf(Label.INFO, format, args);
    }

    public static void info(boolean message) {
        info("%s", message);
    }

    public static void info(int message) {
        info("%d", message);
    }

    public static void error(String message, Object... args) {
        printf(Label.ERR, message, args);
    }

    public static void printf(Label label, String format, Object... args) {
        SimpleDateFormat sdf = sdfWrapper.get();
        @SuppressWarnings("resource")
        final PrintStream ps = label == Label.INFO ? System.out : System.err;
        ps.printf('[' + sdf.format(new Date()) + "][" + label.getName()
                 + "]["
                 + Thread.currentThread().getName() + "]:" + format + " %n", args);
    }
}
import sun.misc.Unsafe;

import java.io.*;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class Tools {
    private static final Random rnd = new Random();
    private static final Logger LOGGER = Logger.getAnonymousLogger();

    public static void startAndWaitTerminated(Thread... threads)
            throws InterruptedException {
        if (null == threads) {
            throw new IllegalArgumentException("threads is null!");
        }
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
    }

    public static void startThread(Thread... threads) {
        if (null == threads) {
            throw new IllegalArgumentException("threads is null!");
        }
        for (Thread t : threads) {
            t.start();
        }
    }

    public static void startAndWaitTerminated(Iterable<Thread> threads)
            throws InterruptedException {
        if (null == threads) {
            throw new IllegalArgumentException("threads is null!");
        }
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
    }

    public static void randomPause(int maxPauseTime) {
        int sleepTime = rnd.nextInt(maxPauseTime);
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void randomPause(int maxPauseTime, int minPauseTime) {
        int sleepTime = maxPauseTime == minPauseTime ? minPauseTime : rnd
                .nextInt(maxPauseTime - minPauseTime) + minPauseTime;
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static Unsafe getUnsafe() {
        try {
            Field f = Unsafe.class.getDeclaredField("theUnsafe");
            ((Field) f).setAccessible(true);
            return (Unsafe) f.get(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void silentClose(Closeable... closeable) {
        if (null == closeable) {
            return;
        }
        for (Closeable c : closeable) {
            if (null == c) {
                continue;
            }
            try {
                c.close();
            } catch (Exception ignored) {
            }
        }
    }

    public static void split(String str, String[] result, char delimeter) {
        int partsCount = result.length;
        int posOfDelimeter;
        int fromIndex = 0;
        String recordField;
        int i = 0;
        while (i < partsCount) {
            posOfDelimeter = str.indexOf(delimeter, fromIndex);
            if (-1 == posOfDelimeter) {
                recordField = str.substring(fromIndex);
                result[i] = recordField;
                break;
            }
            recordField = str.substring(fromIndex, posOfDelimeter);
            result[i] = recordField;
            i + + ;
            fromIndex = posOfDelimeter + 1;
        }
    }

    public static void log(String message) {
        LOGGER.log(Level.INFO, message);
    }

    public static String md5sum(final InputStream in) throws NoSuchAlgorithmException, IOException {
        MessageDigest md = MessageDigest.getInstance("MD5");
        byte[] buf = new byte[1024];
        try (DigestInputStream dis = new DigestInputStream(in, md)) {
            while (-1 != dis.read(buf))
                ;
        }
        byte[] digest = md.digest();
        BigInteger bigInt = new BigInteger(1, digest);
        String checkSum = bigInt.toString(16);

        while (checkSum.length() < 32) {
            checkSum = "0" + checkSum;
        }
        return checkSum;
    }

    public static String md5sum(final File file) throws NoSuchAlgorithmException, IOException {
        return md5sum(new BufferedInputStream(new FileInputStream(file)));
    }

    public static String md5sum(String str) throws NoSuchAlgorithmException, IOException {
        ByteArrayInputStream in = new ByteArrayInputStream(str.getBytes("UTF-8"));
        return md5sum(in);
    }

    public static void delayedAction(String prompt, Runnable action, int delay/* seconds */) {
        Debug.info("%s in %d seconds.", prompt, delay);
        try {
            Thread.sleep(delay * 1000);
        } catch (InterruptedException ignored) {
        }
        action.run();
    }

    public static Object newInstanceOf(String className) throws InstantiationException,
            IllegalAccessException, ClassNotFoundException {
        return Class.forName(className).newInstance();
    }

}

References

“Java Multi-Threaded Programming Practical Guide-Core”