Coroutines, GIL global interpreter, mutex locks, thread pools, Concurrent module

The process is the smallest unit of resource allocation, and the thread is the smallest unit of CPU scheduling. There is at least one thread in every process.

Python’s support for concurrent programming

(1) Multi-threading: Threading uses the principle that CPU and IO can be executed at the same time, so that the CPU will not wait dryly for IO to complete.
(2) Multi-process: multiprocessing uses the capabilities of multi-core CPUs to truly execute tasks in parallel.
(3) Asynchronous IO: asyncio uses the principle of simultaneous execution of CPU and IO in a single thread to implement asynchronous execution of functions.
(4) Use Lock to lock resources to prevent conflicting access.
(5) Use Queue to implement data communication between different threads/processes and implement the producer-consumer model.
(6) Use thread pool/process pool pool to simplify thread/process task submission, waiting for completion, and obtaining results.
(7) Use subprocess to start the process of the external program and perform input and output interaction.

There are three ways of concurrent programming in Python:

Multi-thread Thread, multi-process Process, multi-coroutine Coroutine.

Why introduce concurrent programming?

Scenario 1: A web crawler that took 1 hour to crawl sequentially and reduced it to 20 minutes using concurrent downloads!
Scenario 2: An APP application. Before optimization, it took 3 seconds to open the page each time. Using asynchronous concurrency, it was increased to 200 milliseconds each time!
Concurrency is introduced to improve program running speed.

Comparison of multi-threads, multi-processes, and multi-coroutines

How to choose the corresponding technology according to the task?

GIL global interpreter lock

The GIL Global Interpreter Lock (Global Interpreter Lock) is a mechanism used in the Python interpreter. Its main function is to prevent multiple threads from executing Python code at the same time.

In Python, due to the GIL lock mechanism, when multiple threads execute Python code, only one thread can occupy the CPU to execute Python code at the same time, and other threads will always be in a waiting state.

This mechanism helps ensure the stability and thread safety of Python code, but it also brings a certain performance loss. Therefore, for CPU-intensive Python applications, multi-threading does not improve the running speed. On the contrary, for I/O-intensive applications, multi-threading can effectively improve their operating efficiency.

GIL steps

In a multi-threaded environment, the Python interpreter executes as follows:

  1. Set GIL;
  2. Switch to a thread to run;
  3. Run the specified number of bytecode instructions or the thread actively gives up control (you can call time.sleep(0));
  4. Set the thread to sleep state;
  5. Unlock GIL;
  6. Repeat all the above steps again.

When calling external code (such as C/C++ extension function), the GIL will be locked until the end of this function (since no Python bytecode is run during this period, no thread switching will be performed). Extension programmers can proactively unlock the GIL.

GIL global interpreter related background

The GIL lock ensures that only one thread is executed at the same time. All threads must obtain the GIL lock to have execution permissions.

1. Python code runs on an interpreter, and there is an interpreter to execute or interpret it.
2. Types of Python interpreters:
1. CPython 2, IPython 3, PyPy 4, Jython 5, IronPython
3. The most commonly used (95%) interpreter in the current market is the CPython interpreter
4. The GIL global interpreter lock exists in CPython
5. The conclusion is that only one thread is executing at the same time? The problem you want to avoid is that multiple threads compete for resources.
For example: Now start a thread to recycle garbage data and recycle the variable a=1. Another thread will also use this variable a. When the garbage collection thread has not finished recycling variable a, another thread will snatch this variable. a use.
    How to avoid this problem is that in the design of the Python language, a lock is added directly to the interpreter. This lock is to allow only one thread to execute at the same time. The implication is which thread wants to execute. You must first get the lock (GIL). Only when this thread releases the GIL lock can other threads get it and then have execution permissions.

Issues that need to be paid attention to in the GIL global interpreter

1. The reason why python has GIL lock is that multiple threads in the same process are actually executing at the same time.

2. Only Python is used to open processes. Other languages generally do not open multiple processes. It is enough to open multiple threads.

3. The cpython interpreter cannot take advantage of multi-cores when running multiple threads. Only by running multiple processes can it take advantage of multi-core advantages. This problem does not exist in other languages.

4. 8-core CPU computer, make full use of my 8-core, at least 8 threads, all 8 threads are calculations--->The computer CPU usage rate is 100%

5. If there is no GIL lock, if 8 threads are started in one process, it can make full use of CPU resources and run to full capacity of the CPU.

6. A lot of code and modules in the cpython interpreter are written based on the GIL lock mechanism and cannot be changed ---》We cannot have 8 cores, but I can only use 1 core now, ----》Enable multiple Process---》Threads started under each process can be scheduled and executed by multiple CPUs

7. cpython interpreter: io-intensive uses multi-threading, computing-intensive uses multi-process

I/O intensive: The CPU will be switched when encountering io operations. Suppose you have 8 threads opened and all 8 threads have io operations—》IO operations do not consume CPU—》It seems that within a period of time , in fact, all 8 threads have been executed, it is better to choose multi-threads

Computing-intensive: consumes CPU. If 8 threads are opened, the first thread will always occupy the CPU and will not be scheduled to other threads for execution. The other 7 threads are not executed at all, so we open 8 processes. , each process has one thread, and the threads under 8 processes will be executed by 8 CPUs, resulting in high efficiency.

Mutex lock

The role of the mutex lock: In the case of multi-threading, if a piece of data is executed at the same time, data confusion will occur. The mutex lock can prevent this from happening.

n = 10
from threading import Lock
import time

def task(lock):
    lock.acquire()
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()


"""Exchange time for space and space for time. Time complexity"""

from threading import Thread

if __name__ == '__main__':

    tt = []
    lock=Lock()
    for i in range(10):
        t = Thread(target=task, args=(lock, ))
        t.start()
        tt.append(t)
    for j in tt:
        j.join()

    print("main", n)

GIL lock, mutex lock interview questions

Interview question: Now that we have a GIL lock, why do we need a mutex lock? (Multiple threads)

For example: I started 2 threads to execute a=a + 1, a is 0 at the beginning
1. The first thread comes, gets a=0, and starts executing a=a + 1. At this time, the result a is 1
2. The result 1 obtained by the first thread has not been assigned back to a. At this time, the second thread comes, and the a obtained is 0. Continue execution, a=a + 1The result is still 1
3. Adding a mutex lock can solve the problem of confusion when operating the same data under multiple threads

Thread queue (queue used in threads)

Why are queues still used in threads?

The data of multiple threads in the same process is shared. Why do we still use queues in the same process in the first place?
Because the queue is a pipe + lock, the queue is used to ensure data security.

Process queue:
1. First in, first out
    2. Last in, first out
    3. Priority queue
    
from multiprocessing import Queue

"""Thread Queue"""

import queue
queue.Queue()

# The disadvantage of queue.Queue is that its implementation involves multiple locks and condition variables, so it may affect performance and memory efficiency.
import queue

q=queue.Queue() # infinite,
q.put('first')
q.put('second')
q.put('third')
q.put('third')


print(q.get())
print(q.get())
print(q.get())
 
## Last in, first out
import queue

#Lifo:last in first out
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

## Priority queue
import queue

q=queue.PriorityQueue()
#put enters a tuple. The first element of the tuple is the priority (usually a number, or it can be a comparison between non-numbers). The smaller the number, the higher the priority.
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
Result (the smaller the number, the higher the priority, and those with higher priority will be dequeued first):
(10, 'b')
(20, 'a')
(30, 'c')
'''

The use of process pool and thread pool

Pool: Pool, container type, can hold multiple elements

Process Pool: Define a pool in advance, and then add processes to this pool. In the future, you only need to drop tasks into this process pool. Then, there will be any process in this process pool. to perform tasks

Thread pool: Define a pool in advance, and then add threads to this pool. In the future, you only need to drop tasks into this thread pool, and then, there will be any thread in this thread pool. to perform tasks

def task(n, m):
    return n + m

def task1():
    return {'username':'kevin', 'password':123}
"""Open process pool"""
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor


def callback(res):
    print(res) # Future at 0x1ed5a5e5610 state=finished returned int>
    print(res.result()) # 3

def callback1(res):
    print(res) # Future at 0x1ed5a5e5610 state=finished returned int>
    print(res.result()) # {'username': 'kevin', 'password': 123}
    print(res.result().get('username'))
if __name__ == '__main__':
    pool=ProcessPoolExecutor(3) # Define a process pool with 3 processes in it
    ## 2. Throw tasks into the pool

    pool.submit(task, m=1, n=2).add_done_callback(callback)
    pool.submit(task1).add_done_callback(callback1)
    pool.shutdown() # join + close
    print(123)

What are the benefits of process pool and thread pool?

(1) Reduce resource consumption. Reduce the cost of thread creation and destruction by reusing created threads.
(2) Improve response speed. When a task arrives, the task can be executed immediately without waiting for the thread to be created.
(3) Improve thread manageability. Threads are scarce resources. If they are created without restrictions, they will not only consume system resources, but also reduce the stability of the system. The thread pool can be used for unified allocation, tuning and monitoring.

Concurrent.futures module (crawler)

Module introduction

The concurrent.futures module provides a highly encapsulated asynchronous calling interface

ThreadPoolExecutor: Thread pool, providing asynchronous calls

ProcessPoolExecutor: process pool, providing asynchronous calls

Both implement the same interface, which is defined by the abstract Executor class.

Basic methods

submit(fn, *args, **kwargs):Asynchronously submit tasks

map(func, *iterables, timeout=None, chunksize=1): replaces the for loop submit operation

shutdown(wait=True): Equivalent to the pool.close() + pool.join() operation of the process pool

  • wait=True, wait for all tasks in the pool to be executed and resources to be recycled before continuing.
  • wait=False, returns immediately and does not wait for the tasks in the pool to be completed.
  • But regardless of the value of the wait parameter, the entire program will wait until all tasks are completed.
  • submit and map must be before shutdown

result(timeout=None): Get the result

add_done_callback(fn): callback function

done(): Determine whether a thread is completed

cancle(): Cancel a task

ThreadPoolExecutor thread pool

Commonly used functions

When a function is submitted to the thread pool for running, a Future object will be automatically created and returned. This Future object contains the execution status of the function (such as whether it is paused, running, or completed, etc.). And after the function is executed, it will also call future.set_result to set its own return value.
(1) To create a thread pool, you can specify the max_workers parameter to indicate the maximum number of threads to create. If not specified, a thread will be created for each function submitted.

When starting the thread pool, you definitely need to set the capacity, otherwise thousands of threads will be opened to process thousands of functions.

(2) The function can be submitted to the thread pool through submit. Once submitted, it will run immediately. Because a new thread is started, the main thread will continue to execute. As for the parameters of submit, just submit the corresponding parameters according to the function name.

(3)future is equivalent to a container, containing the execution status of internal functions.

(4) When the function is executed, the return value will be set in future, which means that once future.set_result is executed, it means that the function has been executed, and then The outside world can call result to get the return value.

from concurrent.futures import ThreadPoolExecutor
import time


def task(name, n):
    time.sleep(n)
    return f"{name} slept for {n} seconds"


executor = ThreadPoolExecutor()
future = executor.submit(task, "You in front of the screen", 3)

print(future) # <Future at 0x7fbf701726d0 state=running
print(future.running()) # Whether the function is running True
print(future.done()) # Whether the function has been executed False

time.sleep(3) # The main program also sleeps for 3 seconds. Obviously the function has been executed at this time.

print(future) # <Future at 0x7fbf701726d0 state=finished returned str>The return value type is str
print(future.running()) # False
print(future.done()) # True

print(future.result())

Multi-threaded crawling of web pages

import requests

def get_page(url):
    res=requests.get(url)
    name=url.rsplit('/')[-1] + '.html'
    return {'name':name,'text':res.content}

def call_back(fut):
    print(fut.result()['name'])
    with open(fut.result()['name'],'wb') as f:
        f.write(fut.result()['text'])


if __name__ == '__main__':
    pool=ThreadPoolExecutor(2)
    urls=['http://www.baidu.com','http://www.cnblogs.com','http://www.taobao.com']
    for url in urls:
        pool.submit(get_page,url).add_done_callback(call_back)

Coroutine theory

Core understanding: Switching is a programmer-level switching. We do it ourselves, not the operating system

The essence of coroutines: Make the most efficient use of the computer’s CPU resources, deceive the computer, and keep the computer’s CPU in working condition

Coroutines: It is concurrency under a single thread, also known as micro-threads and fibers. The English name is Coroutine. One sentence explains what a coroutine is: a coroutine is a lightweight thread in user mode, that is, the coroutine is controlled and scheduled by the user program itself.

What needs to be emphasized is:

  1. Python’s threads belong to the kernel level, that is, the scheduling is controlled by the operating system (for example, if a single thread encounters IO or the execution time is too long, it will be forced to surrender the CPU execution permission and switch other threads running)
  2. Start the coroutine in a single thread. Once io is encountered, the control switch will be from the application level (rather than the operating system) to improve efficiency (!!! Switching of non-io operations has nothing to do with efficiency)

Compared with the switching of threads controlled by the operating system, the user controls the switching of coroutines within a single thread.

The advantages are as follows:

  1. The switching overhead of coroutine is smaller, it is a program-level switching, and the operating system cannot sense it at all, so it is more lightweight
  2. Concurrency effects can be achieved within a single thread, maximizing the use of the CPU

The disadvantages are as follows:

  1. The essence of coroutine is that it is single-threaded and cannot use multiple cores. It can be that one program starts multiple processes, each process starts multiple threads, and each thread starts the coroutine. Procedure
  2. Coroutine refers to a single thread, so once the coroutine blocks, the entire thread will be blocked.

Summary of coroutine characteristics:

  1. Concurrency must be achieved in only one single thread
  2. Modifying shared data does not require locking
  3. Save multiple control flow context stacks in the user program
  4. Additional: When a coroutine encounters an IO operation, it automatically switches to other coroutines (how to detect IO, neither yield nor greenlet can be implemented, so the gevent module (select mechanism) is used ))

Coroutine greenlet module

1. Install the module

Installation: pip3 install greenlet

2. Greenlet implements state switching

from greenlet import greenlet

def eat(name):
    print('%s eat 1' %name)
    g2.switch('nick')
    print('%s eat 2' %name)
    g2.switch()
def play(name):
    print('%s play 1' %name)
    g1.switch()
    print('%s play 2' %name)

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('nick')#You can pass in parameters during the first switch, and they are not needed in the future.

Simple switching (without io or repeated operations to open up memory space) will actually reduce the execution speed of the program.

3. Efficiency comparison

#Sequential execution
import time
def f1():
    res=1
    for i in range(100000000):
        res + =i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337

#switch
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res + =i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524

Greenlet only provides a more convenient switching method than generator. When switching to a task execution, if it encounters IO, it will block in place. It still does not solve the problem of automatic switching when encountering IO to improve efficiency.

The codes of these 20 tasks in a single thread usually include both calculation operations and blocking operations. We can completely encounter blocking when executing task 1, and use the blocked time to execute task 2… In this way, efficiency can be improved. This The Gevent module is used.

Gevent module of coroutine

1 monkey patch

1. This word was originally Guerrilla Patch, a miscellaneous army and guerrilla army, which shows that this part is not original. In English, the pronunciation of guerilla is similar to gollia (gorilla), and later it was written as monkey (monkey).

2. Another explanation is that because this method messes up the original code (messing with it), it is called monkeying about (naughty) in English, so it is called Monkey Patch.

1.1 Function of monkey patch (everything is an object)

It has the function of replacing when the module is running, for example: assigning a function object to another function object (replacing the original execution function of the function)

class Monkey():
    def play(self):
        print('Monkey is playing')

classDog():
    def play(self):
        print('The dog is playing')
m=Monkey()
m.play()
m.play=Dog().play
m.play()

1.2 Application scenarios of monkey patch

Here is a more practical example. Many people use import json, but later found that ujson has higher performance. If you think it is more expensive to change the import json of each file to import ujson as json, or you want to test whether the ujson replacement meets expectations. , just add at the entrance:

import json
import ujson

def monkey_patch_json():
    json.__name__ = 'ujson'
    json.dumps = ujson.dumps
    json.loads = ujson.loads
monkey_patch_json()
aa=json.dumps({'name':'lqz','age':19})
print(aa)

1.3 Gevent introduction

Gevent is a third-party library that can easily implement concurrent synchronous or asynchronous programming through gevent. The main mode used in gevent is Greenlet, which is a lightweight C extension module that is connected to Python. Coroutines. Greenlets all run inside the main operating system process, but they are scheduled cooperatively.

Usage

#Usage
g1=gevent.spawn(func,1,,2,3,x=4,y=5) creates a coroutine object g1. The first parameter in spawn brackets is the function name, such as eat, and there can be multiple parameters afterwards. , which can be a positional argument or a keyword argument, both of which are passed to the function eat

g2=gevent.spawn(func2)

g1.join() #Wait for g1 to end

g2.join() #Wait for g2 to end

#Or the above two steps can be combined in one step: gevent.joinall([g1,g2])

g1.value#Get the return value of func1

Example 1 (automatically switch when encountering io)

import gevent
def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)


g1=gevent.spawn(eat,'lqz')
g2=gevent.spawn(play,name='lqz')
g1.join()
g2.join()
#Or gevent.joinall([g1,g2])
print('main')

Example 2

'''
The above example gevent.sleep(2) simulates io blocking that gevent can recognize.

However, time.sleep(2) or other blocking cannot be directly recognized by gevent. You need to use the following line of code and patch it to identify it.

from gevent import monkey;monkey.patch_all() must be placed in front of the person being patched, such as time, before the socket module

Or we can simply remember: to use gevent, you need to put from gevent import monkey;monkey.patch_all() at the beginning of the file
'''
from gevent import monkey;monkey.patch_all()

import gevent
import time
def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('main')

# We can use threading.current_thread().getName() to view each g1 and g2. The result is DummyThread-n, which is a fake thread.

Coroutines achieve high concurrency

Server:

Server:
from gevent import monkey;

monkey.patch_all()
import gevent
from socket import socket
# from multiprocessing import Process
from threading import Thread


def talk(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            print(data)
            conn.send(data.upper())
        except Exception as e:
            print(e)
    conn.close()


def server(ip, port):
    server = socket()
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        # t=Process(target=talk,args=(conn,))
        # t=Thread(target=talk,args=(conn,))
        #t.start()
        gevent.spawn(talk, conn)


if __name__ == '__main__':
    g1 = gevent.spawn(server, '127.0.0.1', 8080)
    g1.join()

Client:

Client:
import socket
from threading import current_thread, Thread


def socket_client():
    cli = socket.socket()
    cli.connect(('127.0.0.1', 8080))
    while True:
        ss = '%s say hello' % current_thread().getName()
        cli.send(ss.encode('utf-8'))
        data = cli.recv(1024)
        print(data)


for i in range(5000):
    t = Thread(target=socket_client)
    t.start()

END