Python coroutine, GIL global interpreter, mutex lock, thread pool, Concurrent module

The process is the smallest unit of resource allocation, and the thread is the smallest unit of CPU scheduling. There is at least one thread in every process.

Python’s support for concurrent programming

(1) Multi-threading: Threading uses the principle that CPU and IO can be executed at the same time, so that the CPU will not wait dryly for IO to complete.
(2) Multi-process: multiprocessing uses the capabilities of multi-core CPUs to truly execute tasks in parallel.
(3) Asynchronous IO: asyncio uses the principle of simultaneous execution of CPU and IO in a single thread to implement asynchronous execution of functions.
(4) Use Lock to lock resources to prevent conflicting access.
(5) Use Queue to implement data communication between different threads/processes and implement the producer-consumer model.
(6) Use thread pool/process pool pool to simplify thread/process task submission, waiting for completion, and obtaining results.
(7) Use subprocess to start the process of the external program and perform input and output interaction. 

There are three ways to do concurrent programming in Python:

Multi-thread Thread, multi-process Process, multi-coroutine. 

Why introduce concurrent programming?

Scenario 1: A web crawler, which took 1 hour to crawl sequentially, and reduced it to 20 minutes using concurrent downloads!
Scenario 2: An APP application. Before optimization, it took 3 seconds to open the page each time. Using asynchronous concurrency, it was increased to 200 milliseconds each time!
Concurrency is introduced to improve program running speed.

Comparison of multi-threads, multi-processes, and multi-coroutines

How to choose the corresponding technology according to the task?

GIL global interpreter lock
        The GIL Global Interpreter Lock (Global Interpreter Lock) is a mechanism used in the Python interpreter. Its main function is to prevent multiple threads from executing Python code at the same time.

        In Python, due to the GIL lock mechanism, when multiple threads execute Python code, only one thread can occupy the CPU to execute Python code at the same time, and other threads will always be in a waiting state.

        This mechanism helps ensure the stability and thread safety of Python code, but it also brings a certain performance loss. Therefore, for CPU-intensive Python applications, multi-threading does not improve the running speed. On the contrary, for I/O-intensive applications, multi-threading can effectively improve their operating efficiency. 

GIL steps

In a multi-threaded environment, the Python interpreter executes as follows:

Set GIL;
Switch to a thread to run;
Run the specified number of bytecode instructions or the thread actively gives up control (you can call time.sleep(0));
Set the thread to sleep state;
Unlock GIL;
Repeat all the above steps again.
When calling external code (such as C/C++ extension function), the GIL will be locked until the end of this function (since no Python bytecode is run during this period, no thread switching will be performed). Extension programmers can proactively unlock the GIL. 

Background related to GIL global interpreter

The GIL lock ensures that only one thread is executed at the same time. All threads must obtain the GIL lock to have execution permissions.

1. Python code runs on an interpreter. There is an interpreter to execute or interpret it.
2. Types of Python interpreters:
    1. CPython 2, IPython 3, PyPy 4, Jython 5, IronPython
3. The most commonly used (95%) interpreter in the current market is the CPython interpreter
4. The GIL global interpreter lock exists in CPython
5. The conclusion is that only one thread is executing at the same time? The problem you want to avoid is that multiple threads compete for resources.
    For example: Now start a thread to recycle garbage data and recycle the variable a=1. Another thread will also use this variable a. When the garbage collection thread has not finished recycling variable a, another thread will snatch this variable. a use.
    How to avoid this problem is that in the design of the Python language, a lock is added directly to the interpreter. This lock is to allow only one thread to execute at the same time. The implication is which thread wants to execute. You must first get the lock (GIL). Only when this thread releases the GIL lock can other threads get it and then have execution permissions.

Issues that need to be paid attention to in the GIL global interpreter

1. The reason why python has GIL lock is that multiple threads in the same process are actually executing at the same time.
 
2. Only Python is used to open processes. Other languages generally do not open multiple processes. It is enough to open multiple threads.
 
3. The cpython interpreter cannot take advantage of multi-cores when running multiple threads. Only by running multiple processes can it take advantage of multi-core advantages. This problem does not exist in other languages.
 
4. 8-core CPU computer, make full use of my 8-core, at least 8 threads, all 8 threads are calculations--->The computer CPU usage rate is 100%
 
5. If there is no GIL lock, if 8 threads are started in one process, it can make full use of CPU resources and run to full capacity of the CPU.
 
6. A lot of code and modules in the cpython interpreter are written based on the GIL lock mechanism and cannot be changed ---》We cannot have 8 cores, but I can only use 1 core now, ----》Enable multiple Process---》Threads started under each process can be scheduled and executed by multiple CPUs
 
7. cpython interpreter: io-intensive uses multi-threading, and computing-intensive uses multi-process
I/O intensive: The CPU will be switched when an IO operation is encountered. Assume that you have 8 threads opened, and all 8 threads have IO operations---》IO operations do not consume CPU---》For a period of time, it seems that, in fact, 8 All threads have been executed. It is better to choose multi-threading.

Computing intensive: consumes CPU. If 8 threads are opened, the first thread will always occupy the CPU and will not be scheduled to other threads for execution. The other 7 threads are not executed at all, so we open 8 processes, each The process has one thread, and the threads under 8 processes will be executed by 8 CPUs, resulting in high efficiency.

Mutex lock

The role of the mutex lock: In the case of multi-threading, when a piece of data is executed at the same time, data confusion will occur. The mutex lock can prevent this from happening.

n = 10
from threading import Lock
import time
 
def task(lock):
    lock.acquire()
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()
 
 
"""Exchange time for space and space for time. Time complexity"""
 
from threading import Thread
 
if __name__ == '__main__':
 
    tt = []
    lock=Lock()
    for i in range(10):
        t = Thread(target=task, args=(lock, ))
        t.start()
        tt.append(t)
    for j in tt:
        j.join()
 
    print("main", n)

GIL lock, mutex lock interview questions

Interview question: Since there is a GIL lock, why do we need a mutex lock? (Under multi-threading)


       For example: I started 2 threads to execute a=a + 1, a is 0 at the beginning
       1. The first thread comes, gets a=0, and starts executing a=a + 1. At this time, the result a is 1.
       2. The result 1 obtained by the first thread has not been assigned back to a. At this time, the second thread comes and gets a 0. It continues to execute, and the result of a=a + 1 is still 1.
       3. Adding a mutex lock can solve the problem of confusion when operating the same data under multiple threads.

Thread queue (queue used in threads)

Why are queues still used in threads?

The data of multiple threads in the same process is shared. Why do we still use queues in the same process in the first place?
Because the queue is a pipe + lock, the queue is used to ensure data security.


Process queue:
    1. First in, first out
    2. Last in, first out
    3. Priority queue
    
from multiprocessing import Queue
 
"""Thread Queue"""
 
import queue
queue.Queue()
 
# The disadvantage of queue.Queue is that its implementation involves multiple locks and condition variables, so it may affect performance and memory efficiency.
import queue
 
q=queue.Queue() # infinite,
q.put('first')
q.put('second')
q.put('third')
q.put('third')
 
 
print(q.get())
print(q.get())
print(q.get())
 
## Last in, first out
import queue
 
#Lifo:last in first out
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
 
print(q.get())
print(q.get())
print(q.get())
 
## Priority queue
import queue
 
q=queue.PriorityQueue()
#put enters a tuple. The first element of the tuple is the priority (usually a number, or it can be a comparison between non-numbers). The smaller the number, the higher the priority.
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
 
print(q.get())
print(q.get())
print(q.get())
'''
Result (the smaller the number, the higher the priority, and those with higher priority will be dequeued first):
(10, 'b')
(20, 'a')
(30, 'c')

”’

The use of process pool and thread pool

Pool: Pool, container type, can hold multiple elements

Process pool: Define a pool in advance, and then add processes to this pool. In the future, you only need to drop tasks into this process pool, and then any process in this process pool can execute the task.

Thread pool: Define a pool in advance, and then add threads to this pool. In the future, you only need to drop tasks into this thread pool, and then any thread in this thread pool can execute the task
def task(n, m):
    return n + m
 
def task1():
    return {'username':'kevin', 'password':123}
"""Open process pool"""
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
 
 
def callback(res):
    print(res) # Future at 0x1ed5a5e5610 state=finished returned int>
    print(res.result()) # 3
 
def callback1(res):
    print(res) # Future at 0x1ed5a5e5610 state=finished returned int>
    print(res.result()) # {'username': 'kevin', 'password': 123}
    print(res.result().get('username'))
if __name__ == '__main__':
    pool=ProcessPoolExecutor(3) # Define a process pool with 3 processes in it
    ## 2. Throw tasks into the pool
 
    pool.submit(task, m=1, n=2).add_done_callback(callback)
    pool.submit(task1).add_done_callback(callback1)
    pool.shutdown() # join + close
    print(123)

What are the benefits of process pool and thread pool?

(1) Reduce resource consumption. Reduce the cost of thread creation and destruction by reusing created threads.
(2) Improve response speed. When a task arrives, the task can be executed immediately without waiting for the thread to be created.
(3) Improve thread manageability. Threads are scarce resources. If they are created without restrictions, they will not only consume system resources, but also reduce the stability of the system. The thread pool can be used for unified allocation, tuning and monitoring.

Concurrent.futures module (crawler)
Module introduction
The concurrent.futures module provides a highly encapsulated asynchronous calling interface

ThreadPoolExecutor: Thread pool, providing asynchronous calls

ProcessPoolExecutor: process pool, providing asynchronous calls

Both implement the same interface, which is defined by the abstract Executor class.

Basic methods

submit(fn, *args, **kwargs): Submit task asynchronously

map(func, *iterables, timeout=None, chunksize=1): replaces the for loop submit operation

shutdown(wait=True): equivalent to the pool.close() + pool.join() operation of the process pool

wait=True, wait for all tasks in the pool to be executed and resources to be recycled before continuing.
wait=False, returns immediately and does not wait for the tasks in the pool to be completed.
But regardless of the value of the wait parameter, the entire program will wait until all tasks are completed.
submit and map must precede shutdown
result(timeout=None): Get the result

add_done_callback(fn): callback function

done(): Determine whether a thread is completed

cancel(): Cancel a task

ThreadPoolExecutor thread pool

Commonly used functions

When a function is submitted to the thread pool for running, a Future object will be automatically created and returned. This Future object contains the execution status of the function (such as whether it is paused, running, completed, etc.). And after the function is executed, it will also call future.set_result to set its own return value.
(1) To create a thread pool, you can specify the max_workers parameter to indicate the maximum number of threads to create. If not specified, a thread will be created for each function submitted.

When starting the thread pool, you definitely need to set the capacity, otherwise thousands of threads will be opened to process thousands of functions.

(2) The function can be submitted to the thread pool through submit. Once submitted, it will run immediately. Because a new thread is started, the main thread will continue to execute. As for the parameters of submit, just submit the corresponding parameters according to the function name.

(3) future is equivalent to a container, containing the execution status of internal functions. (4) When the function is executed, the return value will be set in the future, that is to say, once executed

future.set_result, then it means that the function has been executed, and then the outside world can call result to get the return value.


from concurrent.futures import ThreadPoolExecutor
import time
 
 
def task(name, n):
    time.sleep(n)
    return f"{name} slept for {n} seconds"
 
 
executor = ThreadPoolExecutor()
future = executor.submit(task, "You in front of the screen", 3)
 
print(future) # <Future at 0x7fbf701726d0 state=running
print(future.running()) # Whether the function is running True
print(future.done()) # Whether the function has been executed False
 
time.sleep(3) # The main program also sleeps for 3 seconds. Obviously the function has been executed at this time.
 
print(future) # <Future at 0x7fbf701726d0 state=finished returned str>The return value type is str
print(future.running()) # False
print(future.done()) # True
 
print(future.result())

Multi-threaded crawling of web pages

import requests
 
def get_page(url):
    res=requests.get(url)
    name=url.rsplit('/')[-1] + '.html'
    return {'name':name,'text':res.content}
 
def call_back(fut):
    print(fut.result()['name'])
    with open(fut.result()['name'],'wb') as f:
        f.write(fut.result()['text'])
 
 
if __name__ == '__main__':
    pool=ThreadPoolExecutor(2)
    urls=['http://www.baidu.com','http://www.cnblogs.com','http://www.taobao.com']
    for url in urls:
        pool.submit(get_page,url).add_done_callback(call_back)

Coroutine theory

Core understanding:

Switching is a programmer-level switching. We do it ourselves, not the operating system.

The essence of coroutines: Make the most efficient use of the computer’s CPU resources, deceive the computer, and keep the computer’s CPU in working condition.

Coroutine: It is concurrency under single thread, also known as micro-thread and fiber. The English name is Coroutine. One sentence explains what a coroutine is: a coroutine is a lightweight thread in user mode, that is, the coroutine is controlled and scheduled by the user program itself.

What needs to be emphasized is:

Python’s threads belong to the kernel level, that is, the scheduling is controlled by the operating system (for example, if a single thread encounters IO or the execution time is too long, it will be forced to surrender the CPU execution permission and switch to other threads to run)
When coroutines are started in a single thread, once IO is encountered, switching will be controlled from the application level (rather than the operating system) to improve efficiency (!!! Switching of non-IO operations has nothing to do with efficiency)
Compared with the switching of threads controlled by the operating system, the user controls the switching of coroutines within a single thread.

The advantages are as follows:

The switching overhead of coroutine is smaller, it is a program-level switching, and the operating system cannot sense it at all, so it is more lightweight.
Concurrency effects can be achieved within a single thread, maximizing utilization of the CPU.

The disadvantages are as follows:

The essence of coroutine is that it is single-threaded and cannot use multiple cores. It can be that one program starts multiple processes, multiple threads are started in each process, and coroutines are started in each thread.
Coroutine refers to a single thread, so once the coroutine blocks, the entire thread will be blocked.

Summarize the characteristics of coroutines:

Concurrency must be achieved in only a single thread
Modifying shared data does not require locking
The user program saves multiple control flow context stacks.
Additional: A coroutine automatically switches to other coroutines when encountering IO operations (how to detect IO, neither yield nor greenlet can be implemented, so the gevent module (select mechanism) is used)
Greenlet module of coroutine

1. Install the module

Installation: pip3 install greenlet

2. Greenlet implements state switching

from greenlet import greenlet
 
def eat(name):
    print('%s eat 1' %name)
    g2.switch('nick')
    print('%s eat 2' %name)
    g2.switch()
def play(name):
    print('%s play 1' %name)
    g1.switch()
    print('%s play 2' %name)
 
g1=greenlet(eat)
g2=greenlet(play)
 
g1.switch('nick')#You can pass in parameters during the first switch, and they are not needed in the future

Simple switching (without io or repeated operations to open up memory space) will actually reduce the execution speed of the program.

3. Efficiency comparison

#Sequential execution
import time
def f1():
    res=1
    for i in range(100000000):
        res + =i
 
def f2():
    res=1
    for i in range(100000000):
        res*=i
 
start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337
 
#switch
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res + =i
        g2.switch()
 
def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()
 
start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524

Greenlet only provides a more convenient switching method than generator. When switching to a task execution, if it encounters IO, it will block in place. It still does not solve the problem of automatic switching when encountering IO to improve efficiency.

The codes of these 20 tasks in a single thread usually include both calculation operations and blocking operations. We can completely encounter blocking when executing task 1, and use the blocked time to execute task 2… In this way, efficiency can be improved. This The Gevent module is used.

Gevent module of coroutine

1 monkey patch
1. This word was originally Guerrilla Patch, a miscellaneous army and guerrilla army, which shows that this part is not original. In English, the pronunciation of guerilla is similar to gollia (gorilla), and later it was written as monkey (monkey).

2. Another explanation is that because this method messes up the original code (messing with it), it is called monkeying about (naughty) in English, so it is called Monkey Patch.

1.1 Function of monkey patch (everything is an object)

It has the function of replacing when the module is running, for example: assigning a function object to another function object (replacing the original execution function of the function)

class Monkey():
    def play(self):
        print('Monkey is playing')
 
classDog():
    def play(self):
        print('The dog is playing')
m=Monkey()
m.play()
m.play=Dog().play
m.play()

1.2 Application scenarios of monkey patch

Here is a more practical example. Many people use import json, but later found that ujson has higher performance. If you think it is more expensive to change the import json of each file to import ujson as json, or you want to test whether the ujson replacement meets expectations. , just add at the entrance:

import json
import ujson
 
def monkey_patch_json():
    json.__name__ = 'ujson'
    json.dumps = ujson.dumps
    json.loads = ujson.loads
monkey_patch_json()
aa=json.dumps({'name':'lqz','age':19})
print(aa)

1.3 Introduction to Gevent

Gevent is a third-party library that can easily implement concurrent synchronous or asynchronous programming through gevent. The main mode used in gevent is Greenlet, which is a lightweight coroutine connected to Python in the form of a C extension module. Greenlets all run inside the main operating system process, but they are scheduled cooperatively.

usage

g1=gevent.spawn(func,1,,2,3,x=4,y=5) creates a coroutine object g1. The first parameter in spawn brackets is the function name, such as eat, and there can be multiple parameters afterwards. , which can be a positional argument or a keyword argument, both of which are passed to the function eat
 
g2=gevent.spawn(func2)
 
g1.join() #Wait for g1 to end
 
g2.join() #Wait for g2 to end
 
#Or the above two steps can be combined in one step: gevent.joinall([g1,g2])
 
g1.value#Get the return value of func1

Example 1 (automatic switching when encountering io)

import gevent
def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(2)
    print('%s eat 2' %name)
 
def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)
 
 
g1=gevent.spawn(eat,'lqz')
g2=gevent.spawn(play,name='lqz')
g1.join()
g2.join()
#Or gevent.joinall([g1,g2])
print('main')

Example 2

'''
The above example gevent.sleep(2) simulates io blocking that gevent can recognize.
However, time.sleep(2) or other blocking cannot be directly recognized by gevent. You need to use the following line of code and patch it to identify it.
from gevent import monkey;monkey.patch_all() must be placed in front of the person being patched, such as time, before the socket module
Or we can simply remember: to use gevent, you need to put from gevent import monkey;monkey.patch_all() at the beginning of the file
'''
from gevent import monkey;monkey.patch_all()
 
import gevent
import time
def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')
 
def play():
    print('play 1')
    time.sleep(1)
    print('play 2')
 
g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('main')
 
# We can use threading.current_thread().getName() to view each g1 and g2, and the result is DummyThread-n, which is a fake thread

Coroutines achieve high concurrency

Server:

from gevent import monkey;
 
monkey.patch_all()
import gevent
from socket import socket
# from multiprocessing import Process
from threading import Thread
 
 
def talk(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            print(data)
            conn.send(data.upper())
        except Exception as e:
            print(e)
    conn.close()
 
 
def server(ip, port):
    server = socket()
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        # t=Process(target=talk,args=(conn,))
        # t=Thread(target=talk,args=(conn,))
        #t.start()
        gevent.spawn(talk, conn)
 
 
if __name__ == '__main__':
    g1 = gevent.spawn(server, '127.0.0.1', 8080)
    g1.join()

Client:

    import socket
from threading import current_thread, Thread
 
 
def socket_client():
    cli = socket.socket()
    cli.connect(('127.0.0.1', 8080))
    while True:
        ss = '%s say hello' % current_thread().getName()
        cli.send(ss.encode('utf-8'))
        data = cli.recv(1024)
        print(data)
 
 
for i in range(5000):
    t = Thread(target=socket_client)
    t.start()