[Multithreading] Communication between Java threads

Article directory

  • background
  • 1. Lock and synchronization
  • 2. Waiting/Notification Mechanism
  • Three, semaphore – Volatile
    • Application scenarios of semaphores
  • 4. Pipeline input/output stream
    • Application Scenarios of Pipeline Communication
  • 5. Thread.join() method
  • 6. ThreadLocal class
    • Application scenarios of ThreadLocal
  • Summarize

Background

Reasonable use of Java multithreading can make better use of server resources. Generally speaking, a thread has its own private thread context inside and does not interfere with each other. But when we need multiple threads to cooperate with each other, we need to master the communication method of Java threads.

This article will introduce several communication principles between Java threads.

1. Lock and synchronization

In Java, the concept of locks is based on objects, so we often call it object locks. A lock can only be held by one thread at a time. In other words, if a lock is held by a thread, if other threads need to obtain the lock, they have to wait for the thread to release the lock.

Between threads, there is a concept of synchronization. In multithreading, there may be multiple threads trying to access a limited resource, and this must be prevented from happening.

Therefore, a synchronization mechanism is introduced: when a thread uses a resource, it is locked, so that other threads cannot access that resource until it is unlocked.

Thread synchronization means that threads are executed in a certain order.

Let’s first look at a lock-free program:

public class NoLock {<!-- -->

    static class ThreadA implements Runnable {<!-- -->
        @Override
        public void run() {<!-- -->
            for (int i = 0; i < 100; i ++ ) {<!-- -->
                System.out.println("Thread A " + i);
            }
        }
    }

    static class ThreadB implements Runnable {<!-- -->
        @Override
        public void run() {<!-- -->
            for (int i = 0; i < 100; i ++ ) {<!-- -->
                System.out.println("Thread B " + i);
            }
        }
    }

    public static void main(String[] args) {<!-- -->
        new Thread(new ThreadA()).start();
        new Thread(new ThreadB()).start();
    }
}

Execute this program, and you will see on the console that thread A and thread B work independently and output their own print values. Every run will have different results. The following is the result of a certain run on my computer.

Thread B 74
Thread B 75
Thread B 76
Thread B 77
Thread B 78
Thread B 79
Thread B 80
Thread A 3
Thread A 4
Thread A 5
Thread A 6
Thread A 7
Thread A 8
Thread A 9
...

Now there is a demand, and I want to wait for A to execute first, and then let B execute it. What should I do? The easiest way is to use an “object lock”.

public class ObjLock {<!-- -->

    private static Object lock = new Object();

    static class ThreadA implements Runnable {<!-- -->
        @Override
        public void run() {<!-- -->
            synchronized (lock) {<!-- -->
                for (int i = 0; i < 100; i ++ ) {<!-- -->
                    System.out.println("Thread A " + i);
                }
            }
        }
    }

    static class ThreadB implements Runnable {<!-- -->
        @Override
        public void run() {<!-- -->
            synchronized (lock) {<!-- -->
                for (int i = 0; i < 100; i ++ ) {<!-- -->
                    System.out.println("Thread B " + i);
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {<!-- -->
        new Thread(new ThreadA()).start();
        Thread. sleep(10);
        new Thread(new ThreadB()).start();
    }
}

An object lock named lock is declared here. In the code blocks that need to be synchronized in ThreadA and ThreadB, we use the synchronized keyword to add the same object lock lock.

As we mentioned above, according to the relationship between threads and locks, only one thread holds a lock at the same time, then thread B will release the lock after thread A finishes executing, and then thread B can acquire the locklock.

Here, the sleep method is used in the main thread to sleep for 10 milliseconds to prevent thread B from getting the lock first. Because if you start at the same time, both thread A and thread B are in a ready state, and the operating system may let B run first. In this way, the content of B will be output first, and then the lock will be released automatically after B is executed, and thread A will execute again.

2. Wait/notify mechanism

In the above method based on “lock”, the thread needs to keep trying to acquire the lock, and if it fails, keep trying again. This can consume server resources. And the wait/notify mechanism is another way.

The waiting/notifying mechanism of Java multithreading is based on the wait() method and notify(), notifyAll()< of the Object class /code> method to achieve.

The notify() method will randomly wake up a waiting thread, and notifyAll() will wake up all waiting threads.

We mentioned earlier that a lock can only be held by one thread at a time. And if thread A now holds a lock lock and starts executing, it can use lock.wait() to put itself into a waiting state. At this time, the lock is released.

At this time, thread B obtains the lock of lock and starts to execute. It can use lock.notify() at a certain moment to notify the previously held lock The thread A that locks and enters the waiting state says, “You don’t have to wait for thread A, you can continue to execute.”

It should be noted that thread B does not release the lock lock at this time, unless thread B uses lock.wait() to release the lock at this time, or thread B releases itself after execution Lock, thread A can get the lock lock.

Use code to achieve it:

public class WaitNotify {<!-- -->

    private static Object lock = new Object();

    static class ThreadA implements Runnable {<!-- -->
        @Override
        public void run() {<!-- -->
            synchronized (lock) {<!-- -->
                for (int i = 0; i < 5; i ++ ) {<!-- -->
                    try {<!-- -->
                        System.out.println("ThreadA: " + i);
                        lock. notify();
                        lock. wait();
                    } catch (InterruptedException e) {<!-- -->
                        e.printStackTrace();
                    }
                }
                lock. notify();
            }
        }
    }

    static class ThreadB implements Runnable {<!-- -->
        @Override
        public void run() {<!-- -->
            synchronized (lock) {<!-- -->
                for (int i = 0; i < 5; i ++ ) {<!-- -->
                    try {<!-- -->
                        System.out.println("ThreadB: " + i);
                        lock. notify();
                        lock. wait();
                    } catch (InterruptedException e) {<!-- -->
                        e.printStackTrace();
                    }
                }
                lock. notify();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {<!-- -->
        new Thread(new ThreadA()).start();
        Thread. sleep(1000);
        new Thread(new ThreadB()).start();
    }
}

// output:

ThreadA: 0
ThreadB: 0
ThreadA: 1
ThreadB: 1
ThreadA: 2
ThreadB: 2
ThreadA: 3
ThreadB: 3
ThreadA: 4
ThreadB: 4

Thread A and thread B first print out what they need, then use the notify() method to wake up another waiting thread, and then use the wait() method to fall into the Wait for and release the lock lock.

It should be noted that the wait/notify mechanism uses the same object lock. If your two threads use different object locks, they cannot use the wait/notify mechanism to communicate.

3. Semaphore – Volatile

Semaphore (Semaphore): Sometimes called a semaphore, it is a facility used in a multi-threaded environment and can be used to ensure that two or more critical code segments are not called concurrently. Before entering a critical code section, a thread must acquire a semaphore; once the critical code section is complete, the thread must release the semaphore. Other threads that want to enter the critical code section must wait until the first thread releases the semaphore.

This article is not to introduce this class, but to introduce a self-implemented semaphore communication based on the volatile keyword.

The volatile keyword can ensure the visibility of memory. If a variable is declared with the volatile keyword and the value of the variable is changed in one thread, then other threads can immediately see the changed value.

For example, I have a requirement now, I want thread A to output 0, then thread B to output 1, then thread A to output 2…and so on. How should I achieve this?

public class Count {<!-- -->
    private static volatile int count = 0;

    static class ThreadA implements Runnable {<!-- -->
        @Override
        public void run() {<!-- -->
            while (count < 5) {<!-- -->
                if (count % 2 == 0) {<!-- -->
                    System.out.println("threadA: " + count);
                    synchronized (this) {<!-- -->
                        count + + ;
                    }
                }
            }
        }
    }

    static class ThreadB implements Runnable {<!-- -->
        @Override
        public void run() {<!-- -->
            while (count < 5) {<!-- -->
                if (count % 2 == 1) {<!-- -->
                    System.out.println("threadB: " + signal);
                    synchronized (this) {<!-- -->
                        count + + ;
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {<!-- -->
        new Thread(new ThreadA()).start();
        Thread. sleep(1000);
        new Thread(new ThreadB()).start();
    }
}

// output:

threadA: 0
threadB: 1
threadA: 2
threadB: 3
threadA: 4

We can see that a volatile variable count is used to implement the “semaphore” model. It should be noted here that volatile variables need to be atomically operated, and count + + is not an atomic operation, use synchronized to it as needed “Lock”, or use atomic classes such as AtomicInteger.

Application scenario of semaphore

If in a parking lot, the parking space is our public resource, the thread is like a vehicle, and the gatekeeper acts as a “semaphore”.
Because in this scenario, multiple threads need to cooperate with each other, it is not so convenient for us to use simple “lock” and “wait for notification mechanism”. At this time, the semaphore can be used.

4. Pipeline input/output stream

Pipeline is a communication method based on “pipe flow”. JDK provides PipedWriter, PipedReader, PipedOutputStream, PipedInputStream.
Among them, the first two are based on characters, and the latter two are based on byte streams.
The following sample code uses character-based:

public class Pipe {<!-- -->
    static class ReaderThread implements Runnable {<!-- -->
        private PipedReader reader;

        public ReaderThread(PipedReader reader) {<!-- -->
            this. reader = reader;
        }

        @Override
        public void run() {<!-- -->
            System.out.println("this is reader");
            int receive = 0;
            try {<!-- -->
                while ((receive = reader. read()) != -1) {<!-- -->
                    System.out.print((char)receive);
                }
            } catch (IOException e) {<!-- -->
                e.printStackTrace();
            }
        }
    }

    static class WriterThread implements Runnable {<!-- -->

        private PipedWriter writer;

        public WriterThread(PipedWriter writer) {<!-- -->
            this.writer = writer;
        }

        @Override
        public void run() {<!-- -->
            System.out.println("this is a writer");
            int receive = 0;
            try {<!-- -->
                writer.write("test");
            } catch (IOException e) {<!-- -->
                e.printStackTrace();
            } finally {<!-- -->
                try {<!-- -->
                    writer. close();
                } catch (IOException e) {<!-- -->
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {<!-- -->
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader();
        writer. connect(reader);

        new Thread(new ReaderThread(reader)).start();
        Thread. sleep(1000);
        new Thread(new WriterThread(writer)).start();
    }
}

// output:

this is reader
this is writer
test

We pass in the PipedWrite and PipedReader objects through the thread’s constructor. You can briefly analyze the execution flow of this sample code:

  1. The thread ReaderThread starts executing,
  2. The thread ReaderThread uses the pipeline reader.read() to enter “blocking”,
  3. The thread WriterThread starts executing,
  4. The thread WriterThread writes a string to the pipeline with writer.write(“test”),
  5. The thread WriterThread uses writer.close() to end the pipeline writing and completes the execution.
  6. The thread ReaderThread receives the string output by the pipeline and prints it,
  7. The thread ReaderThread finishes executing.

Application scenarios of pipeline communication

This is easy to understand. Using pipes is mostly related to I/O streams. When one of our threads needs to send a message (such as a string) or a file, etc. before another thread, we need to use pipeline communication.

5. Thread. join() method

The join() method is an instance method of the Thread class. Its function is to put the current thread into a “waiting” state, and continue to execute the current thread after the execution of the joined thread is completed.

Sometimes, the main thread creates and starts sub-threads. If a large number of time-consuming calculations are required in the sub-threads, the main thread will often end before the end of the sub-threads.

If the main thread wants to wait for the sub-thread to finish executing and obtain a certain data processed in the sub-thread, the join method must be used.
Sample code:

public class Join {<!-- -->
    static class ThreadA implements Runnable {<!-- -->

        @Override
        public void run() {<!-- -->
            try {<!-- -->
                System.out.println("I am a child thread, I will sleep for a second");
                Thread. sleep(1000);
                System.out.println("I am a child thread, I slept for a second");
            } catch (InterruptedException e) {<!-- -->
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {<!-- -->
        Thread thread = new Thread(new ThreadA());
        thread. start();
        thread. join();
        System.out.println("If you don't add the join method, I will be typed out first, and it will be different if you add it");
    }
}

6. ThreadLocal class

ThreadLocal is a local thread copy variable tool class. Inside is a weakly referenced Map to maintain.
Strictly speaking, the ThreadLocal class does not belong to the communication between multiple threads, but allows each thread to have its own “independent” variable, and the threads do not affect each other. It creates a copy for each thread, and each thread can access its own internal copy variable.
The most commonly used ThreadLocal class is the set method and get method. Sample code:

public class Profiler {<!-- -->

    private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>() {<!-- -->
        protected Long initialValue() {<!-- -->
            return System. currentTimeMillis();
        }
    };

    public static void begin() {<!-- -->
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }

    public static long end() {<!-- -->
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }

    public static void main(String[] args) throws InterruptedException {<!-- -->
        Profiler.begin();
        TimeUnit. SECONDS. sleep(1);
        System.out.println("Time-consuming: " + Profiler.end() + "mills");
    }
}

// output:

Time-consuming: 1001mills

Profiler can be reused in the time-consuming statistics function of the method, execute the begin() method before the method entry, and execute the end( ) method, the advantage is that the calls of the two methods do not need to be in one method and class, such as in AOP (aspect-oriented programming), you can execute begin() method, and execute the end() method at the method call entry point, so that the execution time of the method can still be obtained.

Application scenario of ThreadLocal

The most common ThreadLocal usage scenarios are used to solve database connections, Session management, etc. Database connection and Session management involve the initialization and closing of multiple complex objects. If some private variables are declared in each thread for operation, then this thread becomes less “lightweight” and needs to frequently create and close connections.

Summary

Inter-thread communication makes threads a whole, improves the interactivity between systems, and can effectively control and supervise thread tasks while improving CPU utilization.

syntaxbug.com © 2021 All Rights Reserved.