Python multi-threading principles and practice
Purpose:
(1) Understand the principle of python thread execution
(2) Master multi-thread programming and thread synchronization
(3) Understand the use of thread pools
1 Basic concepts of threads
1.1 What is a thread?
Thread refers to an execution unit within a process and is also a schedulable entity within the process.
Difference from process:
(1) Address space: an execution unit within a process; a process has at least one thread; they share the address space of the process; and the process has its own independent address space;
(2) Resource ownership: The process is the unit of resource allocation and ownership. Threads in the same process share the resources of the process.
(3) Threads are the basic unit of processor scheduling, but processes are not.
(4) Both can be executed concurrently.
In short, a program has at least one process, and a process has at least one thread.
The division scale of threads is smaller than that of processes, making multi-threaded programs highly concurrency.
In addition, the process has an independent memory unit during execution, and multiple threads share memory, thus greatly improving the running efficiency of the program.
1.2 What is the relationship between threads and processes?
A process is an execution process of an application on a processor. It is a dynamic concept. Threads are part of a process, and a process contains multiple threads. Running.
Multiple threads can share global variables, but multiple processes cannot. In multi-threading, the process numbers of all sub-threads are the same; in multi-processes, different sub-processes have different process numbers.
A process is a running activity of a program with certain independent functions on a certain data set. A process is an independent unit for resource allocation and scheduling in the system.
A thread is an entity of a process and is the basic unit of CPU scheduling and dispatch. It is a basic unit that is smaller than a process and can run independently. The thread itself basically does not own system resources, only a few resources that are essential during operation. (such as a program counter, a set of registers and a stack), but it can share all resources owned by the process with other threads belonging to the same process.
A thread can create and destroy another thread; multiple threads in the same process can execute concurrently.
2 Python thread module
Python mainly implements multi-threading support through the two modules thread and threading. Python’s thread module is a relatively low-level module. Python’s threading module encapsulates threads and can be used more conveniently. However, python (cpython) cannot use threading to fully utilize CPU resources due to the existence of GIL. If you want to fully utilize the computing power of multi-core CPU, you need to use the multiprocessing module (there will be many problems when using it under Windows).
2.1 How to create a thread
The start_new_thread() function in the functional thread module used in Python 2.x to generate new threads has been abandoned in python3.x.
There are two ways to create a new thread through the threading module in python3.x: one is through threading.Thread(Target=executable Method) – that is, passing an executable method (or object) to the Thread object; the second is to inherit threading .Thread defines a subclass and overrides the run() method. In the second method, the only method that must be rewritten is run()
(1) Create multi-threads through threading.Thread
import threading import time def target(): print("the current threading %s is running" %(threading.current_thread().name)) time.sleep(1) print("the current threading %s is ended"%(threading.current_thread().name)) print("the current threading %s is running"%(threading.current_thread().name)) ## Part belonging to thread t t = threading.Thread(target=target) t.start() ## Part belonging to thread t t.join() # join blocks the current thread (the current thread here is the main thread). The main thread does not end until Thread-1 ends. print("the current threading %s is ended"%(threading.current_thread().name))
(2) Create multi-threads by inheriting threading.Thread and defining subclasses
Use the Threading module to create a thread, inherit directly from threading.Thread, and then override the init method and run method:
import threading import time class myThread(threading.Thread): # Inherit the parent class threading.Thread def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): # Write the code to be executed into the run function. The thread will run the run function directly after it is created. print("Starting " + self.name) print_time(self.name, self.counter, 5) print("Exiting " + self.name) def print_time(threadName, delay, counter): while counter: time.sleep(delay) print("%s process at: %s" % (threadName, time.ctime(time.time()))) counter -= 1 #Create new thread thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # Start thread thread1.start() thread2.start() # Wait for the thread to end thread1.join() thread2.join() print("Exiting Main Thread")
From the above case, we can know that the execution order of thread1 and thread2 is out of order. To make it orderly, thread synchronization is required
3 Synchronization between threads
If multiple threads jointly modify a certain data, unpredictable results may occur. In order to ensure the correctness of the data, multiple threads need to be synchronized.
Simple thread synchronization can be achieved using the Thread object’s Lock and Rlock. Both objects have acquire and release methods. For data that requires only one thread to operate at a time, the operation can be placed between the acquire and release methods. between.
It should be noted that Python has a GIL (Global Interpreter Lock) mechanism. Any thread must obtain this global lock before running. Whenever 100 bytecodes are executed, the global lock will be released and switching to other threads for execution. .
3.1 Thread synchronization problem
There are four ways to achieve multi-thread synchronization:
Lock mechanism, semaphore, conditional judgment and synchronization queue.
Below I mainly focus on two synchronization mechanisms: lock mechanism and synchronization queue.
(1) Lock mechanism
Lock class of threading, use the acquire function of this class to lock, and the realease function to unlock
import threading import time class myThread(threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): print("Starting " + self.name) # Obtain the lock and return True after successfully acquiring the lock. # If the optional timeout parameter is not filled in, it will block until the lock is obtained. # Otherwise it will return False after timeout threadLock.acquire() print_time(self.name, self.counter, 5) # Release lock threadLock.release() def print_time(threadName, delay, counter): while counter: time.sleep(delay) print("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1 threadLock = threading.Lock() threads = [] #Create new thread thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # Start a new thread thread1.start() thread2.start() #Add thread to thread list threads.append(thread1) threads.append(thread2) # Wait for all threads to complete for t in threads: t.join() print("Exiting Main Thread")
(2) Thread synchronization queue queue
Queue is provided in python2.x, queue is provided in Python3.x
See import queue.
Python’s queue module provides synchronous, thread-safe queue classes, including FIFO (first in first out) queue Queue, LIFO (last in first out) queue LifoQueue, and priority queue PriorityQueue. These queues implement lock primitives and can be used directly in multi-threads. Queues can be used to achieve synchronization between threads.
Commonly used methods in the queue module:
- queue.qsize() returns the size of the queue
- queue.empty() Returns True if the queue is empty, False otherwise
- queue.full() Returns True if the queue is full, otherwise False
- queue.full corresponds to maxsize size
- queue.get([block[, timeout]]) gets the queue, timeout waiting time
- queue.get_nowait() is equivalent to Queue.get(False)
- queue.put(item) writes to the queue, timeout waiting time
- queue.put_nowait(item) is equivalent to Queue.put(item, False)
- queue.task_done() After completing a piece of work, the Queue.task_done() function sends a signal to the queue where the task has been completed.
- queue.join() actually means waiting until the queue is empty before performing other operations
Case 1:
import queue import threading import time exitFlag = 0 class myThread(threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print("Starting " + self.name) process_data(self.name, self.q) print("Exiting " + self.name) def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print("%s processing %s" % (threadName, data)) else: queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = queue.Queue(10) threads = [] threadID = 1 #Create new thread for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID + = 1 # Fill the queue queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # Wait for the queue to be cleared while not workQueue.empty(): pass # Notify the thread that it is time to exit exitFlag = 1 # Wait for all threads to complete for t in threads: t.join() print("Exiting Main Thread")
Case 2:
import time import threading import queue class Worker(threading.Thread): def __init__(self, name, queue): threading.Thread.__init__(self) self.queue = queue self.start() #Execute run() def run(self): #Loop to ensure that the next task is run while True: # Exit the thread if the queue is empty if self.queue.empty(): break # Get a queue data foo = self.queue.get() # Delay 1S to simulate what you want to do time.sleep(1) # Print print(self.getName() + " process " + str(foo)) # mission completed self.queue.task_done() # Queue queue = queue.Queue() # Join 100 task queues for i in range(100): queue.put(i) #Open 10 threads for i in range(10): threadName = 'Thread' + str(i) Worker(threadName, queue) #Close all threads after completion queue.join()
4 Thread Pool
Traditional multi-threading problem?
Traditional multi-threading solutions use the “instant creation, immediate destruction” strategy. Although the time to create a thread has been greatly shortened compared to creating a process, if the tasks submitted to the thread have a short execution time and are executed extremely frequently, the server will be in a state of constantly creating and destroying threads.
The running time of a thread can be divided into three parts: thread startup time, thread body running time and thread destruction time. In the context of multi-threaded processing, if threads cannot be reused, it means that each creation needs to go through 3 processes of starting, destroying and running. This will inevitably increase the system response time and reduce efficiency.
Is there an efficient solution? — Thread Pool
Basic principles of thread pool:
We put the task into the queue, and then open N threads. Each thread goes to the queue to fetch a task. After executing it, it tells the system that I have finished executing it, and then goes to fetch the next task from the queue until all the tasks in the queue are completed. The task is empty and the thread exits.
Use thread pool:
Since threads are created in advance and put into the thread pool, and are not destroyed after processing the current task but are arranged to process the next task, it is possible to avoid multiple thread creations, thus saving the overhead of thread creation and destruction, which can bring Better performance and system stability.
How much should the thread pool be set to?
The number of server CPU cores is limited, and the number of concurrent threads is limited. The more threads the better, and thread switching is expensive. If thread switching is too frequent, it will reduce performance.
During thread execution, the calculation time is divided into two parts:
- CPU calculation, occupying CPU
- It does not require CPU calculations, does not occupy the CPU, and waits for IO to return, such as recv(), accept(), sleep() and other operations. The specific operations are such as
Access cache, RPC call downstream services, access DB, and other operations that require network calls
So if the calculation time accounts for 50% and the waiting time is 50%, then in order to achieve the highest utilization rate, 2 threads can be opened:
If the working time is 2 seconds, after the CPU completes the calculation for 1 second, the thread needs 1 second to wait for IO. At this time, the CPU is idle. Then you can switch to another thread and let the CPU work for 1 second. After the thread waits for IO, it takes 1 second. 1 second. At this time, the CPU can switch back. The first thread has just completed the 1 second of IO waiting, and the CPU can continue to work, thus cyclically switching operations between the two threads.
So if the calculation time accounts for 20% and the waiting time is 80%, then in order to achieve the highest utilization rate, 5 threads can be opened:
It can be imagined that it takes 5 seconds to complete the task, the CPU takes 1 second, and the waiting time is 4 seconds. While the CPU is waiting for the thread, 4 more threads can be activated at the same time, thus maximizing the overlap between the CPU and IO waiting time.
Abstractly, the formula for calculating the number of thread settings is:
N-core server, through the single thread of executing business, analyzes that the local computing time is x and the waiting time is y, then the number of working threads (number of thread pool threads) is set to N*(x + y)/x, which can maximize the utilization of the CPU Maximize rate.
Due to the influence of GIL, python can only use 1 core, so N=1 is set here.
import queue import threading import time #Declare thread pool management class class WorkManager(object): def __init__(self, work_num=1000, thread_num=2): self.work_queue = queue.Queue() # Task queue self.threads = [] # Thread pool self.__init_work_queue(work_num) #Initialize the task queue and add tasks self.__init_thread_pool(thread_num) #Initialize the thread pool and create threads """ Initialize thread pool """ def __init_thread_pool(self, thread_num): for i in range(thread_num): #Create worker thread (object in thread pool) self.threads.append(Work(self.work_queue)) """ Initialize work queue """ def __init_work_queue(self, jobs_num): for i in range(jobs_num): self.add_job(do_job, i) """ Add a job to the queue """ def add_job(self, func, *args): self.work_queue.put((func, list(args))) # The task is added to the queue, and the synchronization mechanism is implemented inside the Queue. """ Wait for all threads to finish running """ def wait_allcomplete(self): for item in self.threads: if item.isAlive(): item.join() class Work(threading.Thread): def __init__(self, work_queue): threading.Thread.__init__(self) self.work_queue = work_queue self.start() def run(self): # Infinite loop, allowing the created thread to close and exit under certain conditions while True: try: do, args = self.work_queue.get(block=False) # The task is dequeued asynchronously, and the synchronization mechanism is implemented inside the Queue. do(args) self.work_queue.task_done() # Notify the system that the task is completed except: break # Specific tasks to be done def do_job(args): time.sleep(0.1) #Simulation processing time print(threading.current_thread()) print(list(args)) if __name__ == '__main__': start = time.time() work_manager = WorkManager(100, 10) # or work_manager = WorkManager(10000, 20) work_manager.wait_allcomplete() end = time.time() print("cost all time: %s" % (end - start))
The process oil system allocates resources, threads are scheduled by the CPU, and coroutines are controlled by the user.
5 Coroutine
Under python GIL, only one thread can be running at the same time, so for CPU-intensive programs, the switching overhead between threads becomes a drag, and programs with I/O as the bottleneck are coroutines. Good at:
Coroutines in Python have gone through a long development process. It roughly went through the following three stages:
- Initial generator variant yield/send
- Introduce @asyncio.coroutine and yield from
- The async/await keyword was introduced in recent Python 3.5 versions
(1) Start with yield
Let’s first look at a common code for calculating Fibonacci continuations.
def fibs(n): res = [0] * n index = 0 a = 0 b = 1 while index < n: res[index] = b a, b = b, a + b index + = 1 return res for fib_res in fibs(20): print(fib_res)
If we only need to get the nth digit of the Fibonacci sequence, or just hope to generate the Fibonacci sequence accordingly, then the above traditional method will consume more memory.
At this time, yield comes in handy.
def fib(n): index = 0 a = 0 b = 1 while index < n: yield b a, b = b, a + b index + = 1 for fib_res in fib(20): print(fib_res)
When a function contains a yield statement, Python automatically recognizes it as a generator. At this time, fib(20) does not actually call the function body, but generates a generator object instance using the function body.
Here, yield can retain the calculation site of the fib function, pause the calculation of fib and return b. When fib is put into a for…in loop, next(fib(20)) will be called every time the loop is executed, waking up the generator and executing the next yield statement until a StopIteration exception is thrown. This exception will be caught by the for loop, causing it to jump out of the loop.
(2) Send is coming
As you can see from the above program, currently only data flows from fib(20) to the outside for loop through yield; if data can be sent to fib(20), then coroutines can be implemented in Python.
Therefore, the generator in Python has a send function, and the yield expression also has a return value.
We use this feature to simulate the calculation of a slow Fibonacci sequence:
import time import random def stupid_fib(n): index = 0 a = 0 b = 1 while index < n: sleep_cnt = yield b print('let me think {0} secs'.format(sleep_cnt)) time.sleep(sleep_cnt) a, b = b, a + b index + = 1 print('-' * 10 + 'test yield send' + '-' * 10) N = 20 sfib = stupid_fib(N) fib_res = next(sfib) #The next() function must be executed for the first time to let the program control to the yield b position while True: print(fib_res) try: fib_res = sfib.send(random.uniform(0, 0.5)) except StopIteration: break
Python for concurrent programming
In the era of Python 2, high-performance network programming mainly used three libraries: Twisted, Tornado, and Gevent, but their asynchronous codes were neither compatible with each other nor portable.
asyncio is a standard library introduced in Python version 3.4, with built-in support for asynchronous IO.
The programming model of asyncio
is a message loop. We directly obtain a reference to EventLoop
from the asyncio
module, and then throw the coroutine that needs to be executed into EventLoop
for execution, thereby achieving asynchronous IO.
Python introduced the concept of coroutines in 3.4, but this is still based on generator objects.
Python 3.5 added the two keywords async and await, which are used to replace asyncio.coroutine
and yield from
respectively.
Python3.5 determines the syntax of coroutines. The following will briefly introduce the use of asyncio. Not only asyncio, tornado and gevent implement coroutines, but vloop also implements similar functions.
(1) Coroutine definition
Use asyncio
to implement Hello world
. The code is as follows:
import asyncio @asyncio.coroutine def hello(): print("Hello world!") # Asynchronously call asyncio.sleep(1)-->coroutine function: r = yield from asyncio.sleep(1) #This is another coroutine, not sleep print("Hello again!") # Get EventLoop (event looper): loop = asyncio.get_event_loop() # Execute coroutine loop.run_until_complete(hello()) loop.close()
@asyncio.coroutinemarks a generator as a coroutine type, and then we throw this
coroutine into
EventLoop for execution. hello()
will first print out Hello world!
, and then the yield from
syntax allows us to conveniently call another generator
. Since asyncio.sleep()
is also a coroutine
, the thread will not wait for asyncio.sleep()
, but directly interrupts and executes the next Message loop. When asyncio.sleep()
returns, the thread can get the return value from yield from
(here is None
), and then continue execution Next line of statements.
Think of asyncio.sleep(1)
as an IO operation that takes 1 second. During this period, the main thread does not wait, but executes other tasks in EventLoop
coroutine
can be executed, so concurrent execution can be achieved.
Let’s try using Task to encapsulate two coroutine
:
import threading import asyncio @asyncio.coroutine def hello(): print('Hello world! (%s)' % threading.currentThread()) yield from asyncio.sleep(1) print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
Observe the execution process:
Hello world! (<_MainThread(MainThread, started 140735195337472)>) Hello world! (<_MainThread(MainThread, started 140735195337472)>) (Pause for about 1 second) Hello again! (<_MainThread(MainThread, started 140735195337472)>) Hello again! (<_MainThread(MainThread, started 140735195337472)>)
It can be seen from the printed current thread name that the two coroutines
are executed concurrently by the same thread.
If asyncio.sleep()
is replaced by a real IO operation, multiple coroutine
can be executed concurrently by one thread.
asyncio case practice
We use asyncio
‘s asynchronous network connection to obtain the homepages of sina, sohu and 163 websites:
async_wget.py
import asyncio @asyncio.coroutine def wget(host): print('wget %s...' % host) connect = asyncio.open_connection(host, 80) #Waiting to open host:80 port reader, writer = yield from connect #Start linking. If the connection is successful, the operation objects of the Reader and writer are returned. header = 'GET/HTTP/1.0 Host: %s ' % host writer.write(header.encode('utf-8')) yield from writer.drain() while True: line = yield from reader.readline() if line == b' ': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
The result information is as follows:
wget www.sohu.com... wget www.sina.com.cn... wget www.163.com... (wait for some time) (Print out the header of sohu) www.sohu.com header > HTTP/1.1 200 OK www.sohu.com header > Content-Type: text/html ... (Print out the header of sina) www.sina.com.cn header > HTTP/1.1 200 OK www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT ... (Print out the header of 163) www.163.com header > HTTP/1.0 302 Moved Temporarily www.163.com header > Server: Cdn Cache Server V2.0 ...
It can be seen that 3 connections are completed concurrently by one thread through coroutine
.
(3) Use async/await
import asyncio import re async def browser(host, port=80): # Connect to host reader, writer = await asyncio.open_connection(host, port) print(host, port, 'Connection successful!') # Initiate / homepage request (HTTP protocol) # The request header sent must be two blank lines index_get = 'GET {} HTTP/1.1 Host:{} '.format('/', host) writer.write(index_get.encode()) await writer.drain() # Wait for data to be written to the connection (request sent is completed) # Start reading the response data header while True: line = await reader.readline() # Wait to read response data if line == b' ': break print(host, '<header>', line) # Read the response data body body = await reader.read() print(encoding) print(host, '<content>', body) if __name__ == '__main__': loop = asyncio.get_event_loop() tasks = [browser(host) for host in ['www.dushu.com', 'www.sina.com.cn', 'www.baidu.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close() print('---over---')
Summary
asyncio
provides complete asynchronous IO support;
Asynchronous operations need to be completed in coroutine
through yield from
;
Multiple coroutines
can be encapsulated into a set of Tasks and executed concurrently.