CyclicBarrier thread synchronization

About the author: CSDN content partner and technical expert, started from scratch to build APP with tens of millions of daily users, and led the team to earn more than ten million in a single day.
Focus on sharing original series of articles in various fields, specializing in java backend, mobile development, commercialization, artificial intelligence, etc. I hope you will support me a lot.

Directory

  • 1. Introduction
  • 2. Overview
      • The difference between CyclicBarrier and countdownlatch
  • 3. Use
  • 4. Principle
  • 5. Recommended reading

ddd

1. Introduction

We continue to summarize and learn Basic knowledge of Java, review the past and learn the new.

This article involves knowledge points:
AQS – AbstractQueuedSynchronizer
CAS (Compare And Swap)
Lock concept volatile
ReentrantLock

2. Overview

1. What is
CyclicBarrier is a synchronization tool provided by JDK. Its function is to allow a group of threads to all reach a state and then execute them all at the same time.
Features:
All threads can be reused after execution.

2. Implementation principle
CyclicBarrier is implemented using the shared lock of AQS – AbstractQueuedSynchronizer.
The thread calling await will be blocked until the counter reaches 0, and execution will continue.
Parties is maintained internally to record the total number of threads, and count is used for counting. Initially, count=parties. After calling await(), count is atomically decremented. When count reaches 0, parties are assigned to count again. This is the principle of reuse.

3. Usage scenarios
After a group of threads all reach a state, they are all executed at the same time.
e.g.:
Multi-threading calculates the results and finally merges the results.

4. Advantages and Disadvantages

  • Advantages: The code is elegant, there is no need to operate the thread pool, and there are good usage scenarios when using the thread pool as a bean.
  • Disadvantages: Need to know the number of threads in advance; performance is indeed, uh uh uh uh, a bit worse. Oh, by the way, you also need to add exception judgment in the thread code block, otherwise if an exception occurs before countDown and is not handled, the main thread will be blocked in await forever.
    Need to know the quantity in advance.

It is necessary to pay attention to exception handling. await() and num must appear in pairs, otherwise the thread will remain blocked.

The difference between CyclicBarrier and countdownlatch

CyclicBarrier and CountDownLatch are both multi-thread synchronization tools, but their working mechanisms and uses are slightly different:
CyclicBarrier is more suitable for scenarios where you need to wait for multiple threads in a loop, while CountDownLatch is more suitable for scenarios where you need to wait for a specific number of threads to complete.

  • Construction and counting mechanisms
    CountDownLatch requires an initial count value when it is constructed. Each time the countDown() method is called, the counter is decremented by 1. When the counter reaches 0, the await() method releases the waiting thread. CyclicBarrier does not require an initial count value. It blocks threads by setting a barrier in each thread and will not release it until all threads reach the barrier.

  • Thread status
    CountDownLatch does not change the thread state, it just makes the thread wait, while CyclicBarrier blocks the thread when the barrier is not reached, and unblocks it when the barrier is reached.

  • Reusability
    CyclicBarrier is reusable and a new barrier can be set again after all threads have passed the barrier, while CountDownLatch is one-time use.

  • Number of waiting threads
    The number of threads CountDownLatch waits for is fixed, while CyclicBarrier can wait for any number of threads.

countdownlatch is a thread that waits for other threads to finish executing before executing it.
CyclicBarrier means that each thread waits for all threads to finish executing before executing

3. Use

From the literal meaning, we can know that the Chinese meaning of this class is “circulating fence”. It roughly means a recyclable barrier.
Its function is to make all threads wait for completion before continuing to the next step.
In real life, we often encounter situations where we need to wait for everyone to arrive before starting an activity. For example, when eating, you have to wait until the whole family is seated before using chopsticks; when traveling, you have to wait until everyone is here before starting; during competitions, you have to wait until all the athletes are on the field.

CyclicBarrier is called a loopback barrier. Its function is to let a group of threads all reach a state before they are all executed at the same time, and one of its features is that all threads can be reused after execution.

public class CyclicBarrierTest {<!-- -->
    private static int num = 3;
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> {<!-- -->
        System.out.println("---------Everyone is fine, the number of awaits has reached ----------");
    });
    private static ExecutorService executorService = Executors.newFixedThreadPool(num);

    public static void main(String[] args) throws Exception{<!-- -->

        test2Cyclic();
// test2Cyclic1();
        executorService.shutdown();
    }

    public static void test2Cyclic1() {<!-- -->
        executorService.submit(() -> {<!-- -->
            System.out.println("A is going to the toilet");
            try {<!-- -->
                Thread.sleep(4000);
                System.out.println("A is finished");
                cyclicBarrier.await();
                System.out.println("The meeting ended, A exited");
            } catch (Exception e) {<!-- -->
                e.printStackTrace();
            }finally {<!-- -->

            }
        });
        executorService.submit(()->{<!-- -->
            System.out.println("B is going to the toilet");
            try {<!-- -->
                Thread.sleep(2000);
                System.out.println("B is finished");
                cyclicBarrier.await();
                System.out.println("The meeting ended, B exited");
            } catch (Exception e) {<!-- -->
                e.printStackTrace();
            }finally {<!-- -->

            }
        });
        executorService.submit(()->{<!-- -->
            System.out.println("C is going to the toilet");
            try {<!-- -->
                Thread.sleep(3000);
                System.out.println("C finished");
                cyclicBarrier.await();
                System.out.println("The meeting ended, C exited");
            } catch (Exception e) {<!-- -->
                e.printStackTrace();
            }finally {<!-- -->

            }
        });
    }

    public static void test2Cyclic() {<!-- -->
        executorService.submit(() -> {<!-- -->
            System.out.println("A is going to the toilet");
            try {<!-- -->
                Thread.sleep(4000);
                System.out.println("A is finished");
                cyclicBarrier.await();
                System.out.println("The meeting is over, A exits and starts coding");
                cyclicBarrier.await();
                System.out.println("C work is over, go home from get off work");
                cyclicBarrier.await();
            } catch (Exception e) {<!-- -->
                e.printStackTrace();
            } finally {<!-- -->

            }
        });
        executorService.submit(() -> {<!-- -->
            System.out.println("B is going to the toilet");
            try {<!-- -->
                Thread.sleep(2000);
                System.out.println("B is finished");
                cyclicBarrier.await();
                System.out.println("The meeting is over, B exits and starts fishing");
                cyclicBarrier.await();
                System.out.println("B fishing is over, go home from get off work");
                cyclicBarrier.await();
            } catch (Exception e) {<!-- -->
                e.printStackTrace();
            } finally {<!-- -->

            }
        });
        executorService.submit(() -> {<!-- -->
            System.out.println("C is going to the toilet");
            try {<!-- -->
                Thread.sleep(3000);
                System.out.println("C finished");
                cyclicBarrier.await();
                System.out.println("The meeting is over, C exits and starts fishing");
                cyclicBarrier.await();
                System.out.println("C fishing is over, go home from get off work");
                cyclicBarrier.await();
            } catch (Exception e) {<!-- -->
                e.printStackTrace();
            } finally {<!-- -->

            }
        });

        executorService.shutdown();

    }
}

4. Principle

The tasks of the subcategories are:

  1. Maintain shared variable state through CAS operations.
  2. Rewrite the resource acquisition method.
  3. Override the way resources are released.

Let’s look at the code together

// 1. Create CountDownLatch and set the counter value. count represents the number of counters (it is a shared lock internally, which essentially means how many times the lock has been locked)
The second constructor has a Runnable parameter, which means the task to be done by the last arriving thread.
public CyclicBarrier(int parties, Runnable barrierAction)

// 2. When the child thread calls the await() method, it acquires the exclusive lock, decrements the count at the same time, enters the blocking queue, and then releases the lock.
// When the first thread is blocked and releases the lock, other child threads compete to acquire the lock, and the operation is the same as 1
public void await()

//wait for a certain amount of time
public void await(long timeout, TimeUnit unit)

//synchronization lock
    private final ReentrantLock lock = new ReentrantLock();

    //Conditional queue
    private final Condition trip = lock.newCondition();

CyclicBarrier uses ReentranLock and Condition to block threads

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {<!-- -->
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {<!-- -->
            //Current generation
            final Generation g = generation;
            //Determine the status of the current generation. If the barrier behind the current generation is broken, g.broken returns true, otherwise it returns false.
            if (g.broken)
                throw new BrokenBarrierException();
            //Determine whether the current thread is interrupted
            if (Thread.interrupted()) {<!-- -->
                //If the current thread has been interrupted, call breakBarrier()
                //The code for this method is generation.broken = true;count = parties;trip.signalAll();
                //It can be seen that only 3 things are done: first change the current generation barrier to a broken state, then reset the counter value, and finally wake up all blocked threads
                breakBarrier();
                //Finally throw an interrupt exception
                throw new InterruptedException();
            }
            //decrement the counter value by 1
            int index = --count;
            if (index == 0) {<!-- -->
                //If the current counter value is 0
                boolean ranAction = false;
                try {<!-- -->
                    //The generation replacement task is executed first. It can be seen that it is executed by the last thread to reach the barrier.
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //Start the next generation. The code for this method is trip.signalAll(); count = parties; generation = new Generation();
                    //This code wakes up all blocked threads, resets the counter value, and instantiates the next generation
                    nextGeneration();
                    return 0;
                } finally {<!-- -->
                    //If the generation replacement task is not executed successfully, first change the barrier of the current generation to the broken state, then reset the counter value, and finally wake up all blocked threads
                    if (!ranAction)
                        breakBarrier();
                }
            }

            //The current thread is blocked until one of the three "parties threads arrive at the barrier" or "the current thread is interrupted" or "timeout" occurs
            //infinite loop
            for (;;) {<!-- -->
                try {<!-- -->
                    if (!timed)
                        //If it is not a scheduled wait, call await() of the condition queue to block.
                        trip.await();
                    else if (nanos > 0L)
                        //If it is timed waiting, call awaitNanos of the condition queue to wait.
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {<!-- -->
                    //If the current thread is interrupted while waiting
                    if (g == generation & amp; & amp; ! g.broken) {<!-- -->
                        //After being interrupted, it is still in the current generation, and the barrier of the current generation has not been broken.
                        //The current situation is that the last thread has not reached the barrier yet, and the current thread has arrived early and is waiting, but is interrupted while waiting.
                        //Break the barrier of the current generation and wake up all blocked threads
                        breakBarrier();
                        throw ie;
                    } else {<!-- -->
                        //If the generation has been changed, interrupt manually
                        Thread.currentThread().interrupt();
                    }
                }
                //The thread is awakened at this time and needs to determine why it was awakened.
                
                //If some other thread is interrupted or the barrier of the current generation is broken due to timeout, an exception is thrown.
                if (g.broken)
                    throw new BrokenBarrierException();

                //If it is a normal replacement, return the index value
                if (g != generation)
                    return index;

                //If it is a scheduled wait and the time has expired, break the barrier, wake up all blocked threads, and finally throw an exception
                if (timed & amp; & amp; nanos <= 0L) {<!-- -->
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {<!-- -->
            lock.unlock();
        }
    }
  1. When the child thread calls the await() method, it acquires the exclusive lock, decrements the count at the same time, enters the blocking queue, and then releases the lock.
  2. When the first thread is blocked and releases the lock, other child threads compete to acquire the lock, and the operation is the same as 1
  3. Until the final count is 0, execute the tasks in the CyclicBarrier constructor. After the execution is completed, the child thread continues to execute downwards.

5. Recommended reading

Java column

SQL Column

Data Structures and Algorithms

Android learning column