The principle and implementation of Python multithreading

Python multi-threading principles and practice

Purpose:

(1) Understand the principle of python thread execution

(2) Master multi-thread programming and thread synchronization

(3) Understand the use of thread pools

1 Basic concepts of threads
1.1 What is a thread?

Thread refers to an execution unit within a process and is also a schedulable entity within the process.

Difference from process:
(1) Address space: an execution unit within a process; a process has at least one thread; they share the address space of the process; and the process has its own independent address space;
(2) Resource ownership: The process is the unit of resource allocation and ownership. Threads in the same process share the resources of the process.
(3) Threads are the basic unit of processor scheduling, but processes are not.
(4) Both can be executed concurrently.

In short, a program has at least one process, and a process has at least one thread.

The division scale of threads is smaller than that of processes, making multi-threaded programs highly concurrency.
In addition, the process has an independent memory unit during execution, and multiple threads share memory, thus greatly improving the running efficiency of the program.

1.2 What is the relationship between threads and processes?

A process is an execution process of an application on a processor. It is a dynamic concept. Threads are part of a process, and a process contains multiple threads. Running.

Multiple threads can share global variables, but multiple processes cannot. In multi-threading, the process numbers of all sub-threads are the same; in multi-processes, different sub-processes have different process numbers.

A process is a running activity of a program with certain independent functions on a certain data set. A process is an independent unit for resource allocation and scheduling in the system.
A thread is an entity of a process and is the basic unit of CPU scheduling and dispatch. It is a basic unit that is smaller than a process and can run independently. The thread itself basically does not own system resources, only a few resources that are essential during operation. (such as a program counter, a set of registers and a stack), but it can share all resources owned by the process with other threads belonging to the same process.
A thread can create and destroy another thread; multiple threads in the same process can execute concurrently.

2 Python thread module

Python mainly implements multi-threading support through the two modules thread and threading. Python’s thread module is a relatively low-level module. Python’s threading module encapsulates threads and can be used more conveniently. However, python (cpython) cannot use threading to fully utilize CPU resources due to the existence of GIL. If you want to fully utilize the computing power of multi-core CPU, you need to use the multiprocessing module (there will be many problems when using it under Windows).

2.1 How to create a thread

The start_new_thread() function in the functional thread module used in Python 2.x to generate new threads has been abandoned in python3.x.

There are two ways to create a new thread through the threading module in python3.x: one is through threading.Thread(Target=executable Method) – that is, passing an executable method (or object) to the Thread object; the second is to inherit threading .Thread defines a subclass and overrides the run() method. In the second method, the only method that must be rewritten is run()

(1) Create multi-threads through threading.Thread

import threading
import time
def target():
    print("the current threading %s is running"
       %(threading.current_thread().name))
    time.sleep(1)
    print("the current threading %s is ended"%(threading.current_thread().name))

print("the current threading %s is running"%(threading.current_thread().name))
## Part belonging to thread t
t = threading.Thread(target=target)
t.start()
## Part belonging to thread t
t.join() # join blocks the current thread (the current thread here is the main thread). The main thread does not end until Thread-1 ends.
print("the current threading %s is ended"%(threading.current_thread().name))

(2) Create multi-threads by inheriting threading.Thread and defining subclasses

Use the Threading module to create a thread, inherit directly from threading.Thread, and then override the init method and run method:

import threading
import time

class myThread(threading.Thread): # Inherit the parent class threading.Thread
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter

   def run(self): # Write the code to be executed into the run function. The thread will run the run function directly after it is created.
      print("Starting " + self.name)
      print_time(self.name, self.counter, 5)
      print("Exiting " + self.name)


def print_time(threadName, delay, counter):
   while counter:
      time.sleep(delay)
      print("%s process at: %s" % (threadName, time.ctime(time.time())))
      counter -= 1


#Create new thread
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# Start thread
thread1.start()
thread2.start()

# Wait for the thread to end
thread1.join()
thread2.join()

print("Exiting Main Thread")

From the above case, we can know that the execution order of thread1 and thread2 is out of order. To make it orderly, thread synchronization is required

3 Synchronization between threads

If multiple threads jointly modify a certain data, unpredictable results may occur. In order to ensure the correctness of the data, multiple threads need to be synchronized.

Simple thread synchronization can be achieved using the Thread object’s Lock and Rlock. Both objects have acquire and release methods. For data that requires only one thread to operate at a time, the operation can be placed between the acquire and release methods. between.

It should be noted that Python has a GIL (Global Interpreter Lock) mechanism. Any thread must obtain this global lock before running. Whenever 100 bytecodes are executed, the global lock will be released and switching to other threads for execution. .

3.1 Thread synchronization problem

There are four ways to achieve multi-thread synchronization:

Lock mechanism, semaphore, conditional judgment and synchronization queue.

Below I mainly focus on two synchronization mechanisms: lock mechanism and synchronization queue.

(1) Lock mechanism

Lock class of threading, use the acquire function of this class to lock, and the realease function to unlock

import threading
import time
class myThread(threading.Thread):
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter
   def run(self):
      print("Starting " + self.name)
      # Obtain the lock and return True after successfully acquiring the lock.
      # If the optional timeout parameter is not filled in, it will block until the lock is obtained.
      # Otherwise it will return False after timeout
      threadLock.acquire()
      print_time(self.name, self.counter, 5)
      # Release lock
      threadLock.release()
def print_time(threadName, delay, counter):
   while counter:
      time.sleep(delay)
      print("%s: %s" % (threadName, time.ctime(time.time())))
      counter -= 1

threadLock = threading.Lock()
threads = []
#Create new thread
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# Start a new thread
thread1.start()
thread2.start()
#Add thread to thread list
threads.append(thread1)
threads.append(thread2)
# Wait for all threads to complete
for t in threads:
   t.join()

print("Exiting Main Thread")

(2) Thread synchronization queue queue

Queue is provided in python2.x, queue is provided in Python3.x

See import queue.

Python’s queue module provides synchronous, thread-safe queue classes, including FIFO (first in first out) queue Queue, LIFO (last in first out) queue LifoQueue, and priority queue PriorityQueue. These queues implement lock primitives and can be used directly in multi-threads. Queues can be used to achieve synchronization between threads.

Commonly used methods in the queue module:

  • queue.qsize() returns the size of the queue
  • queue.empty() Returns True if the queue is empty, False otherwise
  • queue.full() Returns True if the queue is full, otherwise False
  • queue.full corresponds to maxsize size
  • queue.get([block[, timeout]]) gets the queue, timeout waiting time
  • queue.get_nowait() is equivalent to Queue.get(False)
  • queue.put(item) writes to the queue, timeout waiting time
  • queue.put_nowait(item) is equivalent to Queue.put(item, False)
  • queue.task_done() After completing a piece of work, the Queue.task_done() function sends a signal to the queue where the task has been completed.
  • queue.join() actually means waiting until the queue is empty before performing other operations

Case 1:

import queue
import threading
import time

exitFlag = 0

class myThread(threading.Thread):
   def __init__(self, threadID, name, q):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.q = q

   def run(self):
      print("Starting " + self.name)
      process_data(self.name, self.q)
      print("Exiting " + self.name)

def process_data(threadName, q):
   while not exitFlag:
      queueLock.acquire()
      if not workQueue.empty():
         data = q.get()
         queueLock.release()
         print("%s processing %s" % (threadName, data))
      else:
         queueLock.release()
      time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1

#Create new thread
for tName in threadList:
   thread = myThread(threadID, tName, workQueue)
   thread.start()
   threads.append(thread)
   threadID + = 1

# Fill the queue
queueLock.acquire()
for word in nameList:
   workQueue.put(word)
queueLock.release()

# Wait for the queue to be cleared
while not workQueue.empty():
   pass

# Notify the thread that it is time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
   t.join()
print("Exiting Main Thread")

Case 2:

import time
import threading
import queue

class Worker(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.start() #Execute run()

    def run(self):
        #Loop to ensure that the next task is run
        while True:
            # Exit the thread if the queue is empty
            if self.queue.empty():
                break
            # Get a queue data
            foo = self.queue.get()
            # Delay 1S to simulate what you want to do
            time.sleep(1)
            # Print
            print(self.getName() + " process " + str(foo))
            # mission completed
            self.queue.task_done()


# Queue
queue = queue.Queue()
# Join 100 task queues
for i in range(100):
    queue.put(i)
#Open 10 threads
for i in range(10):
    threadName = 'Thread' + str(i)
    Worker(threadName, queue)
#Close all threads after completion
queue.join()
4 Thread Pool

Traditional multi-threading problem?

Traditional multi-threading solutions use the “instant creation, immediate destruction” strategy. Although the time to create a thread has been greatly shortened compared to creating a process, if the tasks submitted to the thread have a short execution time and are executed extremely frequently, the server will be in a state of constantly creating and destroying threads.

The running time of a thread can be divided into three parts: thread startup time, thread body running time and thread destruction time. In the context of multi-threaded processing, if threads cannot be reused, it means that each creation needs to go through 3 processes of starting, destroying and running. This will inevitably increase the system response time and reduce efficiency.

Is there an efficient solution? — Thread Pool

Basic principles of thread pool:

We put the task into the queue, and then open N threads. Each thread goes to the queue to fetch a task. After executing it, it tells the system that I have finished executing it, and then goes to fetch the next task from the queue until all the tasks in the queue are completed. The task is empty and the thread exits.

Use thread pool:
Since threads are created in advance and put into the thread pool, and are not destroyed after processing the current task but are arranged to process the next task, it is possible to avoid multiple thread creations, thus saving the overhead of thread creation and destruction, which can bring Better performance and system stability.

How much should the thread pool be set to?

The number of server CPU cores is limited, and the number of concurrent threads is limited. The more threads the better, and thread switching is expensive. If thread switching is too frequent, it will reduce performance.

During thread execution, the calculation time is divided into two parts:

  • CPU calculation, occupying CPU
  • It does not require CPU calculations, does not occupy the CPU, and waits for IO to return, such as recv(), accept(), sleep() and other operations. The specific operations are such as
    Access cache, RPC call downstream services, access DB, and other operations that require network calls

So if the calculation time accounts for 50% and the waiting time is 50%, then in order to achieve the highest utilization rate, 2 threads can be opened:
If the working time is 2 seconds, after the CPU completes the calculation for 1 second, the thread needs 1 second to wait for IO. At this time, the CPU is idle. Then you can switch to another thread and let the CPU work for 1 second. After the thread waits for IO, it takes 1 second. 1 second. At this time, the CPU can switch back. The first thread has just completed the 1 second of IO waiting, and the CPU can continue to work, thus cyclically switching operations between the two threads.

So if the calculation time accounts for 20% and the waiting time is 80%, then in order to achieve the highest utilization rate, 5 threads can be opened:
It can be imagined that it takes 5 seconds to complete the task, the CPU takes 1 second, and the waiting time is 4 seconds. While the CPU is waiting for the thread, 4 more threads can be activated at the same time, thus maximizing the overlap between the CPU and IO waiting time.

Abstractly, the formula for calculating the number of thread settings is:
N-core server, through the single thread of executing business, analyzes that the local computing time is x and the waiting time is y, then the number of working threads (number of thread pool threads) is set to N*(x + y)/x, which can maximize the utilization of the CPU Maximize rate.
Due to the influence of GIL, python can only use 1 core, so N=1 is set here.

import queue
import threading
import time

#Declare thread pool management class
class WorkManager(object):
   def __init__(self, work_num=1000, thread_num=2):
      self.work_queue = queue.Queue() # Task queue
      self.threads = [] # Thread pool
      self.__init_work_queue(work_num) #Initialize the task queue and add tasks
      self.__init_thread_pool(thread_num) #Initialize the thread pool and create threads

   """
      Initialize thread pool
   """
   def __init_thread_pool(self, thread_num):
      for i in range(thread_num):
         #Create worker thread (object in thread pool)
         self.threads.append(Work(self.work_queue))


   """
      Initialize work queue
   """
   def __init_work_queue(self, jobs_num):
      for i in range(jobs_num):
         self.add_job(do_job, i)

   """
      Add a job to the queue
   """
   def add_job(self, func, *args):
      self.work_queue.put((func, list(args))) # The task is added to the queue, and the synchronization mechanism is implemented inside the Queue.

   """
      Wait for all threads to finish running
   """
   def wait_allcomplete(self):
      for item in self.threads:
         if item.isAlive(): item.join()


class Work(threading.Thread):
   def __init__(self, work_queue):
      threading.Thread.__init__(self)
      self.work_queue = work_queue
      self.start()

   def run(self):
      # Infinite loop, allowing the created thread to close and exit under certain conditions
      while True:
         try:
            do, args = self.work_queue.get(block=False) # The task is dequeued asynchronously, and the synchronization mechanism is implemented inside the Queue.
            do(args)
            self.work_queue.task_done() # Notify the system that the task is completed
         except:
            break

# Specific tasks to be done
def do_job(args):
   time.sleep(0.1) #Simulation processing time
   print(threading.current_thread())
   print(list(args))


if __name__ == '__main__':
   start = time.time()
   work_manager = WorkManager(100, 10) # or work_manager = WorkManager(10000, 20)
   work_manager.wait_allcomplete()
   end = time.time()
   print("cost all time: %s" % (end - start))

The process oil system allocates resources, threads are scheduled by the CPU, and coroutines are controlled by the user.

5 Coroutine

Under python GIL, only one thread can be running at the same time, so for CPU-intensive programs, the switching overhead between threads becomes a drag, and programs with I/O as the bottleneck are coroutines. Good at:

Coroutines in Python have gone through a long development process. It roughly went through the following three stages:

  1. Initial generator variant yield/send
  2. Introduce @asyncio.coroutine and yield from
  3. The async/await keyword was introduced in recent Python 3.5 versions

(1) Start with yield

Let’s first look at a common code for calculating Fibonacci continuations.

def fibs(n):
   res = [0] * n
   index = 0
   a = 0
   b = 1
   while index < n:
      res[index] = b
      a, b = b, a + b
      index + = 1
   return res


for fib_res in fibs(20):
   print(fib_res)

If we only need to get the nth digit of the Fibonacci sequence, or just hope to generate the Fibonacci sequence accordingly, then the above traditional method will consume more memory.

At this time, yield comes in handy.

def fib(n):
   index = 0
   a = 0
   b = 1
   while index < n:
      yield b
      a, b = b, a + b
      index + = 1

for fib_res in fib(20):
   print(fib_res)

When a function contains a yield statement, Python automatically recognizes it as a generator. At this time, fib(20) does not actually call the function body, but generates a generator object instance using the function body.

Here, yield can retain the calculation site of the fib function, pause the calculation of fib and return b. When fib is put into a for…in loop, next(fib(20)) will be called every time the loop is executed, waking up the generator and executing the next yield statement until a StopIteration exception is thrown. This exception will be caught by the for loop, causing it to jump out of the loop.

(2) Send is coming

As you can see from the above program, currently only data flows from fib(20) to the outside for loop through yield; if data can be sent to fib(20), then coroutines can be implemented in Python.

Therefore, the generator in Python has a send function, and the yield expression also has a return value.

We use this feature to simulate the calculation of a slow Fibonacci sequence:

import time
import random

def stupid_fib(n):
   index = 0
   a = 0
   b = 1
   while index < n:
      sleep_cnt = yield b
      print('let me think {0} secs'.format(sleep_cnt))
      time.sleep(sleep_cnt)
      a, b = b, a + b
      index + = 1


print('-' * 10 + 'test yield send' + '-' * 10)
N = 20
sfib = stupid_fib(N)
fib_res = next(sfib) #The next() function must be executed for the first time to let the program control to the yield b position
while True:
   print(fib_res)
   try:
      fib_res = sfib.send(random.uniform(0, 0.5))
   except StopIteration:
      break
Python for concurrent programming

In the era of Python 2, high-performance network programming mainly used three libraries: Twisted, Tornado, and Gevent, but their asynchronous codes were neither compatible with each other nor portable.

asyncio is a standard library introduced in Python version 3.4, with built-in support for asynchronous IO.

The programming model of asyncio is a message loop. We directly obtain a reference to EventLoop from the asyncio module, and then throw the coroutine that needs to be executed into EventLoop for execution, thereby achieving asynchronous IO.

Python introduced the concept of coroutines in 3.4, but this is still based on generator objects.

Python 3.5 added the two keywords async and await, which are used to replace asyncio.coroutine and yield from respectively.

Python3.5 determines the syntax of coroutines. The following will briefly introduce the use of asyncio. Not only asyncio, tornado and gevent implement coroutines, but vloop also implements similar functions.

(1) Coroutine definition

Use asyncio to implement Hello world. The code is as follows:

import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # Asynchronously call asyncio.sleep(1)-->coroutine function:
    r = yield from asyncio.sleep(1) #This is another coroutine, not sleep
    print("Hello again!")

# Get EventLoop (event looper):
loop = asyncio.get_event_loop()
# Execute coroutine
loop.run_until_complete(hello())
loop.close()

@asyncio.coroutinemarks a generator as a coroutine type, and then we throw this coroutine into EventLoop for execution. hello() will first print out Hello world!, and then the yield from syntax allows us to conveniently call another generator . Since asyncio.sleep() is also a coroutine, the thread will not wait for asyncio.sleep(), but directly interrupts and executes the next Message loop. When asyncio.sleep() returns, the thread can get the return value from yield from (here is None), and then continue execution Next line of statements.

Think of asyncio.sleep(1) as an IO operation that takes 1 second. During this period, the main thread does not wait, but executes other tasks in EventLoop coroutine can be executed, so concurrent execution can be achieved.

Let’s try using Task to encapsulate two coroutine:

import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Observe the execution process:

Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(Pause for about 1 second)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)

It can be seen from the printed current thread name that the two coroutines are executed concurrently by the same thread.

If asyncio.sleep() is replaced by a real IO operation, multiple coroutine can be executed concurrently by one thread.

asyncio case practice

We use asyncio‘s asynchronous network connection to obtain the homepages of sina, sohu and 163 websites:

async_wget.py

import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80) #Waiting to open host:80 port
    reader, writer = yield from connect #Start linking. If the connection is successful, the operation objects of the Reader and writer are returned.
    header = 'GET/HTTP/1.0
Host: %s

' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'
':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

The result information is as follows:

wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(wait for some time)
(Print out the header of sohu)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(Print out the header of sina)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(Print out the header of 163)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...

It can be seen that 3 connections are completed concurrently by one thread through coroutine.

(3) Use async/await

import asyncio
import re

async def browser(host, port=80):
    # Connect to host
    reader, writer = await asyncio.open_connection(host, port)
    print(host, port, 'Connection successful!')

    # Initiate / homepage request (HTTP protocol)
    # The request header sent must be two blank lines
    index_get = 'GET {} HTTP/1.1
Host:{}

'.format('/', host)
    writer.write(index_get.encode())

    await writer.drain() # Wait for data to be written to the connection (request sent is completed)

    # Start reading the response data header
    while True:
        line = await reader.readline() # Wait to read response data
        if line == b'
':
            break

        print(host, '<header>', line)

    # Read the response data body
    body = await reader.read()
    print(encoding)
    print(host, '<content>', body)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    tasks = [browser(host) for host in ['www.dushu.com', 'www.sina.com.cn', 'www.baidu.com']]

    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

    print('---over---')
Summary

asyncio provides complete asynchronous IO support;

Asynchronous operations need to be completed in coroutine through yield from;

Multiple coroutines can be encapsulated into a set of Tasks and executed concurrently.