JUC concurrent programming (2)

JUC concurrent programming (continued)

Continued from the previous note: https://blog.csdn.net/weixin_44780078/article/details/130694996

5. Java Memory Model

JMM is Java Memory Model, which defines the abstract concept of main memory and working memory, and the bottom layer corresponds to CPU registers, cache, hardware memory, CPU instruction optimization, etc.

JMM is reflected in the following aspects:

  • Atomicity: Guarantees that instructions will not be affected by thread context switches.
  • Visibility: Guarantees that instructions are not affected by the cpu cache.
  • Orderedness: Guarantees that instructions will not be affected by parallel optimization of cpu instructions.

Context switching in multithreading: Context switching means that when a thread is executing, the CPU needs to suspend the execution of the current thread and save its context (such as program counter, register content, stack pointer, etc.) The context of one thread, executing another thread.

1 Visibility

Let’s look at the case first: a cycle that cannot be exited

 private static boolean run = true;
    public static void main(String[] args) {<!-- -->
        Thread t1 = new Thread(() -> {<!-- -->
            while (run) {<!-- -->
                //...
            }
        }, "Thread t1");
        t1. start();
        sleep(1000);
        log.info("Thread t1 stopped");
        run = false; // After changing to false, it is reasonable to exit the while loop
    }

Result: After changing run to false, it didn’t stop as expected, but kept running.

Result analysis:

  1. In the initial state, thread t1 has just read the value of run from the main memory to the working memory.
  2. Because thread t1 frequently reads the value of run from the main memory, the JIT compiler will cache the value of run to the cache in its own working memory, reducing access to run in the main memory and improving efficiency.
  3. After 1 second, the main thread modifies the value of run and synchronizes it to the main memory, while thread t1 reads the variable value from the cache in its own working memory, and the result is always the old value before.

    tips: The reason for this phenomenon is that the main thread modifies the variable value in the main memory, but it is not visible to another thread (t1).

Solution: Adding the volatile keyword to the variable ensures that thread t1 reads from the main memory every time, ensuring the visibility of the thread to the data. (But the performance is lost because thread t1 was reading from its working memory cache before, and now it is reading from main memory)

 private volatile static boolean run = true;
    public static void main(String[] args) {<!-- -->
        Thread t1 = new Thread(() -> {<!-- -->
            while (run) {<!-- -->
                //...
            }
        }, "Thread t1");
        t1. start();
        sleep(1000);
        log.info("Thread t1 stopped");
        run = false;
    }

Result: resolved.

tips: The volatile keyword can be used to modify member variables and static member variables. It can prevent threads from looking up the value of variables from their own working cache. It must be obtained from main memory. Threads operate volatile Variables operate directly on main memory.

2 Visibility vs Atomicity

The previous example reflects visibility, which guarantees that among multiple threads, the modification of a volatile variable by one thread is visible to another thread, but cannot guarantee atomicity, Applicable to the case of one writing thread and multiple reading threads.

The synchronized statement block can not only ensure the atomicity of the code block, but also ensure the visibility of the variables in the code block. But the disadvantage is that synchronized is a heavyweight operation with relatively lower performance.

 private volatile static boolean run = true;
    final static Object lock = new Object();

    public static void main(String[] args) {<!-- -->
        Thread t1 = new Thread(() -> {<!-- -->
            //...
            synchronized (lock) {<!-- -->
                while (!run) {<!-- -->
                    break;
                }
            }
        }, "Thread t1");
        t1. start();
        sleep(1000);
        log.info("Thread t1 stopped");
        synchronized (lock) {<!-- -->
            run = false;
        }
    }

3 volatile principles

The underlying implementation principle of volatile is a memory barrier, Memory Barrier, a write barrier will be added after the write instruction to the volatile variable; a read barrier will be added before the read instruction to the volatile variable.

  1. The write barrier ensures that changes to shared variables are synchronized to main memory before the barrier.
  2. The read barrier ensures that after the barrier, the read of the shared variable loads the latest data in the main memory.

The Case for Visibility: The Singleton Pattern (Lazy)

Analyze the following code, if thread t1 accesses, instance == null, so it will enter if to create a Singleton () object, if the new Singleton () object has not been completed, then thread t2 visits again, instance is still null, Therefore, the Singleton() object will still be created, which leads to the loss of the meaning of the singleton mode.

public class Singleton {<!-- -->
    private Singleton() {<!-- -->};
    private static Singleton instance = null;

    public static Singleton getInstance() {<!-- -->
        if (instance == null) {<!-- -->
            instance = new Singleton();
        }
        return instance;
    }
}

So add synchronized to improve the code:

public class Singleton {<!-- -->
    private Singleton() {<!-- -->};
    private static Singleton instance = null;

    public static synchronized Singleton getInstance() {<!-- -->
        if (instance == null) {<!-- -->
            instance = new Singleton();
        }
        return instance;
    }
}

After adding synchronized synchronization lock, the problem is solved. But after thinking about it carefully, we found that such a modification is not the best modification, because we only need to lock the thread when it accesses for the first time. If the object has been created, the instance will not be null when the subsequent thread accesses, and return directly That’s it, there is no need to lock again, obviously the efficiency of the code is reduced.

So modify the code again: add two judgments. In this way, when thread t1 accesses for the first time, the instance is obviously null, so the lock is created to create the object. Assuming that thread t2 accesses again during the object creation process, it is obviously still null at this time, and thread t2 will block and wait until thread t1 After the object is created, the thread t2 enters the second if. At this time, the instance is not null, so it exits the if directly. When the subsequent thread visits again, since the instance is not null, it returns directly, so synchronized only adds a lock once.

public class Singleton {<!-- -->
    private Singleton() {<!-- -->};

    private volatile static Singleton instance = null;

    public static Singleton getInstance() {<!-- -->
        if (instance == null) {<!-- -->
            synchronized(Singleton. class) {<!-- -->
                if (instance == null) {<!-- -->
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

6. Lock-free shared model

For thread-safe code, it can be solved by locking, such as: synchronized, but it can also be implemented in a lock-free way: AtomicInteger. Since AtomicInteger does not use locking, how is it implemented?

There is a method inside AtomicInteger: compareAndSet (compare and set, comparison and exchange, referred to as CAS), its operation is atomic. Its bottom layer is the lock cmpxchg instruction (x86 architecture), which can guarantee the atomicity of comparison-exchange under single-core CPU and multi-core CPU.

1 CAS and volatile

When obtaining a shared variable, in order to ensure the visibility of the variable, you need to use volatile modification, which can be used to modify member variables and static member variables, which can prevent threads from looking up the value of the variable from their own working cache, and must obtain it from the main memory Its value, the thread operation volatile variable is directly operated on the main memory. That is, the modification of a volatile variable by one thread is visible to another thread.

Note: volatile only guarantees the visibility of shared variables, allowing other threads to see the latest values, but it cannot solve the problem of instruction interleaving (atomicity cannot be guaranteed).
CAS must use volatile to read the latest value of the shared variable to achieve the effect of comparison-exchange.

2 CAS lock-free efficiency

In the case of no lock, even if it fails to retry, the thread is always running at high speed without stopping, and synchronized will cause the thread to switch context and enter the blocked state when the lock is not acquired. Therefore, CAS lock-free is more efficient than locking.

3 CAS Features

Combining CAS and volatile can achieve lock-free concurrency, which is suitable for situations with a small number of threads and multi-core CPUs.

  • CAS is based on the idea of optimistic locking: it is not afraid of other threads to modify the shared variable, even if it is changed, it does not matter, just try again.
  • Synchronized is based on the idea of pessimistic locking: lock it at the beginning to prevent other threads from modifying the shared variable. After adding the lock, no thread can modify it. Only when the lock is released will it be the turn of the next thread to execute.
  • CAS embodies lock-free concurrency and non-blocking concurrency, so it is more efficient.

4 atomic integer

  • AtomicInteger
  • Atomic Boolean
  • Atomic Long

The suffix after the Atomic is clearly displayed. Here is an example of AtomicInteger:

AtomicInteger a = new AtomicInteger(1);
System.out.println(a.getAndIncrement()); // print 1, analysis: the obtained value is printed first, then + 1, that is, the value of a in the memory has actually changed to 2
System.out.println(a.incrementAndGet()); // print 3, analysis: get value + 1 and then print
System.out.println(a.getAndAdd(5)); // print 3, analysis: get the print first, then + 5, the actual value becomes 8
System.out.println(a.addAndGet(5)); // print 13, analysis: get the actual value first, then print after + 5

System.out.println(a.getAndUpdate(Value -> Value * 10)); // print 13, analysis: first get the value to print, it is 13, then multiply by 10, the value in the actual memory is 130
System.out.println(a.updateAndGet(Value -> Value * 10)); // print 1300, analysis: first get the value 130, multiply it by 10, and then print, it is 1300

Why reference atomic types?

The Atomic family mainly guarantees atomicity in a multi-threaded environment, and is lighter than synchronized. More commonly used is AtomicInteger, which is used to encapsulate Integer type operations, while AtomicReference is used to encapsulate ordinary objects.

AtomicReference example:
First define a User class

@Data
@AllArgsConstructor
public class User {<!-- -->
    private String name;
    private Integer age;
}

Initialize with AtomicReference and assign:

 public static void main(String[] args) {<!-- -->
        User user1 = new User("Zhang San", 23);
        User user2 = new User("Lisi", 25);
        User user3 = new User("Wang Wu", 20);
        // Initialize as user1
        AtomicReference<User> atomicReference = new AtomicReference<>();
        atomicReference.set(user1);
        System.out.println(atomicReference.get()); //print: User(name=Zhangsan, age=23)
   }

Use the compareAndSet method:

 public static void main(String[] args) {<!-- -->
        User user1 = new User("Zhang San", 23);
        User user2 = new User("Lisi", 25);
        User user3 = new User("Wang Wu", 20);

        // Initialize as user1
        AtomicReference<User> atomicReference = new AtomicReference<>();
        atomicReference.set(user1);

        // assign user2 to atomicReference
        atomicReference. compareAndSet(user1, user2);
        System.out.println(atomicReference.get()); //print: User(name=Li Si, age=25)

        // assign user3 to atomicReference
        atomicReference. compareAndSet(user1, user3);
        System.out.println(atomicReference.get()); //print: User(name=Li Si, age=25)
    }

compareAndSet method explanation:

compareAndSet(V expect, V update)

The function of this method is: if atomicReference == expect, assign update to atomicReference, otherwise do not do any processing. For the above case, since atomicReference is initialized as user1 at the beginning, when executing atomicReference.compareAndSet(user1, user2), since expect is user1, the updated user 2 will be assigned to atomicReference, and atomicReference.compareAndSet(user1, user3) will be executed. , since the atomicReference is user2, which is not equal to expect, no operation is performed, and the assignment of user3 fails.

4 Atomic references – ABA problem

If we want to change the string A to C, we can run it directly like this:

 static AtomicReference<String> ref = new AtomicReference<>("A");
    public static void main(String[] args) {<!-- -->

        log.debug("main start...");
        String prev = ref. get();
        sleep(1000);
        log.debug("change A->C: {}", ref.compareAndSet(prev,"C"));
    }

But we modify the code to do the following:

 /**
* Default initial value: "A"
*/
    static AtomicReference<String> ref = new AtomicReference<>("A");
    public static void main(String[] args) {<!-- -->
        log.debug("main start...");
        String prev = ref. get();
        other();
        sleep(1000);
        /**
* Incoming value: prev
* The value to update: "C"
*
* Determine whether the incoming prev is equal to the preset initial value, modify it if it is, otherwise it will not be modified.
*/
        log.debug("change A->C: {}", ref.compareAndSet(prev,"C"));
    }

    public static void other() {<!-- -->
        new Thread(() -> {<!-- -->
            log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
        },"Thread t1").start();
        sleep(500);
        new Thread(() -> {<!-- -->
            log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
        },"Thread t2").start();
    }

It is found that the requirements can also be realized, but it is only a circle, first change A to B, and then change B to A, so that when A is changed to C, the modification can also be successful.

If the main thread can make a pre-judgment in the process of changing A to C, if A has been modified by other threads before the modification, no operation will be performed, how can it be realized?

A new method is introduced here: AtomicStampedReference

AtomicStampedReference: As long as other threads have manipulated the shared variable, your own cas will fail. At this time, just comparing the value is not enough, you need to add a version number, that is, if a thread has manipulated the shared variable, let the version number + 1.

 /**
* Default initial value: "A"
* Default version number: 0, other numbers can also be set, the rules are customized
*/
    static AtomicStampedReference<String> ref = new AtomicStampedReference<>( "A",0);
    
    public static void main(String[] args) {<!-- -->
        String prev = ref. getReference();
        int stamp = ref. getStamp();
        log.debug("The version number is: {}", stamp);
        other();
        sleep(1000);
        log.debug("Other method execution ends, version number:", stamp);
        /**
* Incoming value: prev
* The value to update: "C"
* The version number brought in: stamp
* Default tag value modified after successful modification: false
*
* Judging whether the incoming prev is equal to the preset initial value, and judging whether the version number is equal to the initial version number, modify it if it is, and add the version number + 1 after modification, otherwise it will not be modified.
*/
        log.debug("change A->C: {}", ref.compareAndSet(prev,"C",stamp,stamp + 1));
    }

    public static void other() {<!-- -->
        new Thread(() -> {<!-- -->
            int stamp = ref. getStamp();
            log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", stamp, stamp + 1));
        },"Thread t1").start();
        sleep(500);
        new Thread(() -> {<!-- -->
            int stamp = ref. getStamp();
            log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", stamp, stamp + 1));
        },"Thread t2").start();
    }


It was found that the modification of the main thread failed and the original requirement was met.

Introduce another method here: AtomicMarkableReference

AtomicMarkableReference: Compared with AtomicStampedReference, AtomicMarkableReference only records a boolean value. If the initial value is true and other threads have operated it, it will be changed to false, so that there is no need to record the version number.

 /**
* Default initial value: "A"
* Default tag value: true, can also be false, the rule is customized
*/
    static AtomicMarkableReference<String> ref = new AtomicMarkableReference<>( "A",true);
    
    public static void main(String[] args) {<!-- -->
        String prev = ref. getReference();
        other();
        sleep(1000);
        /**
* Incoming value: prev
* The value to update: "C"
* The preset tag value brought in: true
* Default tag value modified after successful modification: false
*
* Judging whether the incoming prev is equal to the preset initial value, and judging whether the flag is true, if so, modify it, and change the flag to fasle after modification, otherwise it will not be modified.
*/
        log.debug("change A->C: {}", ref.compareAndSet(prev,"C",true,false));
    }

    public static void other() {<!-- -->
        new Thread(() -> {<!-- -->
            log.debug("change A->B {}", ref. compareAndSet(ref. getReference(), "B", true,false));
        },"Thread t1").start();
        sleep(500);
        new Thread(() -> {<!-- -->
            log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", true,false));
        },"Thread t2").start();
    }

Seven, immutable objects

We know that String is immutable:

public final class String
    implements java.io.Serializable, Comparable<String>, CharSequence {<!-- -->
    /** The value is used for character storage. */
    private final char value[];

    /** Cache the hash code for the string */
    private int hash;

It is found that all properties in this class and the class are final modified:

  • The attribute is modified with final to ensure that the attribute is read-only and cannot be modified;
  • The class is decorated with final to ensure that the methods in the class cannot be overridden, preventing subclasses from inadvertently destroying immutability;

VIII. Concurrency Tools

1 thread pool

Why use a thread pool?

  • 1. Thread is a kind of system resource. Every time a thread is created, it needs to occupy a certain amount of memory (stack memory needs to be allocated). If there are many tasks coming in an instant under high concurrency, each task needs to create a thread. This will definitely take up too many resources, and may also cause out of memory (memory overflow);
  • 2. The more threads you create, the better. Due to the limited number of CPUs in our computer, creating too many threads will cause most threads to block because they cannot be scheduled by the CPU. The context of too many threads in the CPU Toggling can also severely impact performance.

Thread pool: Create a batch of threads so that these threads can be used repeatedly, which can avoid creating too many threads and avoid too many threads from causing CPU thread context switching.

Custom thread pool:

2 Thread Pool Implementation in JDK – ThreadPoolExecutor

Thread pool status:

ThreadPoolExecutor uses the high 3 bits of int to represent the thread pool status, and the low 29 bits to represent the number of threads.

state name higher 3 bits receiving new tasks processing blocked queue tasks Description
running 111 Y Y
shutdown 000 N Y Will not receive new tasks, but will process remaining tasks in the blocking queue
stop 001 N N will interrupt the task being executed, and abandon the blocking queue task
tidying 010 The tasks are all executed, and the active thread is 0 and it is about to enter the end
terminated 011 Terminal state

Compare numerically: terminated > tidying > stop > shutdown > running
These information are stored in an atomic variable ctl, the purpose is to combine the thread pool status and the number of threads into one, so that a cas atomic operation can be used for assignment.

// c is the old value, and the result returned by ctlOf is the new value
ctl. compareAndSet(c, ctlOf(targetState, workerCountOf(c))));

// rs is the upper 3 bits representing the state of the thread pool, wc is the lower 29 bits representing the number of threads, ctl is to merge them
private static int ctlOf(int rs, int wc) {<!-- -->
return rs | wc;
}


Construction method:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize: the number of core threads (the maximum number of reserved threads);
  • maximumPoolSize: maximum number of threads;
  • keepAliveTime: survival time for emergency threads;
  • unit: time unit for emergency threads;
  • workQueue: blocking queue;
  • thrcadFactory: The thread factory can give a good name for the thread creation;
  • handler: rejection policy;
  • There are no threads in the thread pool at the beginning. When a task is submitted to the thread pool, the thread pool will create a new thread to execute the task. When the number of threads reaches corePoolSize and no threads are idle, add tasks at this time, and the newly added tasks will be added to the workQueue queue until there are idle threads. If the queue selects a bounded queue, when the task exceeds the queue size, it will create maximumPoolSize – corePoolSize number of threads for emergency.
  • If the thread reaches the maximumPoolSize and there are still new tasks, the rejection policy will be executed at this time. The rejection strategy jdk provides 4 implementations, and other well-known frameworks also provide implementations:
    • AbortPolicy allows the caller to throw a RejectedExecutionException, which is the default policy;
    • CallerRunsPolicy allows the caller to run the task;
    • DiscardPolicy abandons this task;
    • DiscardOldestPolicy discards the oldest task in the queue and replaces it with this task;
  • The implementation of Dubbo will record the log before throwing the RejectedExecutionException exception, and dump the thread stack information to facilitate the positioning of the problem;
  • The implementation of Netty is to create a new thread to perform tasks;
  • The implementation of ActiveMQ, with a timeout wait (60s) to try to put into the queue, similar to our previous custom rejection strategy;
  • The implementation of PinPoint, which uses a rejection strategy chain, will try each rejection strategy in the strategy chain one by one. When the peak value passes, if the emergency threads exceeding corePoolSize have no tasks to do for a period of time, they need to end to save resources. This time is determined by keepAliveTime and unit to control;

Thread pool status:

public static ExecutorService newFixedThreadPool(int nThreads) {<!-- -->
return new ThreadPoolExecutor( nThreads, nThreads
0L, TimeUnit. MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

Features:

  • Number of core threads — the maximum number of threads (no emergency threads are created), so there is no need for a timeout;
  • The blocking queue is unbounded and can hold any number of tasks;

It is updated every day. . . .