1. Introduction
The data between multiple threads is shared. When multiple threads exchange data, the security and consistency of the data cannot be guaranteed. Therefore, when multiple threads need to exchange data, a queue appears, and the queue can It perfectly solves the data exchange between threads and ensures the security and consistency of data between threads.
Second, queue type
- queue.Queue(maxsize=0) FIFO
import random queue_data = [1,2,3,4,5,6] random.shuffle(queue_data) print('Original data:', queue_data) queue_test = queue. Queue() def queuePut(value): queue_test. put(value) for i in queue_data: queuePut(i) for i in range(len(queue_data)): print(queue_test. get())
- queue.LifoQueue(maxsize=0) first in last out
import queue import random queue_data = [1,2,3,4,5,6] random.shuffle(queue_data) print('Original data:', queue_data) queue_test = queue.LifoQueue() def queuePut(value): queue_test. put(value) for i in queue_data: queuePut(i) for i in range(len(queue_data)): print(queue_test. get())
- queue.PriorityQueue(maxsize=0) The lower the priority queue level, the earlier it comes out
import queue import random queue_data = [1,2,3,4,5,6] random.shuffle(queue_data) print('Original data:', queue_data) queue_test = queue.PriorityQueue() def queuePut(value): queue_test. put(value) for i in queue_data: queuePut(i) for i in range(len(queue_data)): print(queue_test. get())
- queue.SimpleQueue(maxsize=0) Simplified queue, without the function of tracking tasks, only has empty, get, put 3 methods
3. Queue method
- queue.qsize() returns the size of the queue
- queue.empty() returns True if the queue is empty, otherwise False
- queue.full() returns True if the queue is full, otherwise False
- queue.get(block,timeout) Get the queue, block: take the value from the queue, if the value cannot be obtained, the program will not end, timeout: when the value of the block is true, how many seconds timeout is used to wait
- queue.put(value,block,timeout) write into the queue, value: the value written into the queue, block: if the queue is full, it will wait if you put a value in the queue again, timeout: when the value of the block is true When, timeout is used to wait for how many seconds
- queue.get_nowait() is equivalent to queue.get(False)
- queue.put_nowait(value) is equivalent to queue.put(value, False)
- queue.join() blocks the calling thread until all tasks in the queue are processed, which actually means waiting until the queue is empty before performing other operations
- queue.task_done() After completing a piece of work, the queue.task_done() function sends a signal to the queue that the task has completed
- queue.full corresponds to the size of maxsize
Fourth, queue + thread, queue + thread pool
Case: Simple two-person dialogue
- Queue + thread will lead to frequent creation of threads, and if the main thread is not closed, other threads will always open
import queue import threading import time class ChatSend(threading. Thread): def __init__(self, name, message_queue): super().__init__() self.name = name self. message_queue = message_queue def run(self): '''Use task tracking in the queue''' message = input(f'{self.name}:') self. message_queue. put({ 'name': self.name, 'message': message, 'time': time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) }) self. message_queue. join() class ChatReceive(threading. Thread): def __init__(self, name, message_queue): super().__init__() self.name = name self. message_queue = message_queue def run(self): message_info = self. message_queue. get() print(f'\ {message_info["time"]}') print(f'{self.name} received {message_info["name"]} message: {message_info["message"]}\ ') self. message_queue. task_done() def chat(chat_person1, chat_person2): message_queue = queue. Queue() print(f'----------{chat_person1} and {chat_person2} are talking----------') while True: send1 = ChatSend(chat_person1, message_queue) receive1 = ChatReceive(chat_person2, message_queue) send1. start() receive1. start() receive1. join() send2 = ChatSend(chat_person2, message_queue) receive2 = ChatReceive(chat_person1, message_queue) send2. start() receive2. start() receive2. join() chat('Zhang San', 'Li Si')
- queue + thread + lock
import queue import threading import time class ChatSend(threading. Thread): def __init__(self, name, message_queue, lock=None): super().__init__() self.name = name self. message_queue = message_queue self. lock = lock def run(self): self. lock. acquire() message = input(f'{self.name}:') self. message_queue. put({ 'name': self.name, 'message': message, 'time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) }) self. lock. notify() self. lock. release() class ChatReceive(threading. Thread): def __init__(self, name, message_queue, lock=None): super().__init__() self.name = name self. message_queue = message_queue self. lock = lock def run(self): self. lock. acquire() self. lock. wait() message_info = self. message_queue. get() print(f'\ {message_info["time"]}') print(f'{self.name} received {message_info["name"]} message: {message_info["message"]}\ ') self. lock. release() def chat(chat_person1, chat_person2): message_queue = queue. Queue() lock = threading. Condition() print(f'----------{chat_person1} and {chat_person2} are talking----------') while True: receive1 = ChatReceive(chat_person2, message_queue, lock) send1 = ChatSend(chat_person1, message_queue, lock) receive1. start() send1. start() receive1. join() receive2 = ChatReceive(chat_person1, message_queue, lock) send2 = ChatSend(chat_person2, message_queue, lock) receive2. start() send2. start() receive2. join() chat('Zhang San', 'Li Si')
- Queue + pool, will not create threads frequently
import queue import threading import time from concurrent.futures import ThreadPoolExecutor,wait lock = threading. Condition() class ChatSend: def __init__(self, name, message_queue): self. send_person = name self. message_queue = message_queue def run(self): message = input(f'{self. send_person}:') self. message_queue. put({ 'name': self. send_person, 'message': message, 'time': time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) }) self. message_queue. join() class ChatReceive: def __init__(self, name, message_queue): self.receive_person = name self. message_queue = message_queue def run(self): message_info = self. message_queue. get() print(f'\ {message_info["time"]}') print(f'{message_info["name"]} says to {self.receive_person}: {message_info["message"]}\ ') self. message_queue. task_done() class Chat(ThreadPoolExecutor): def __init__(self, person1, person2): ThreadPoolExecutor.__init__(self, max_workers=2) message_queue = queue. Queue() print(f'----------{person1} and {person2} are talking----------') self.send1 = ChatSend(person1, message_queue) self.receive1 = ChatReceive(person2, message_queue) self.send2 = ChatSend(person2, message_queue) self.receive2 = ChatReceive(person1, message_queue) def start(self): while True: self. submit(self. send1. run) self. submit(self. receive1. run) self. submit(self. send2. run) self. submit(self. receive2. run) chat = Chat('Zhang San', 'Li Si') chat. start()