Python built-in module queue, queue

1. Introduction

The data between multiple threads is shared. When multiple threads exchange data, the security and consistency of the data cannot be guaranteed. Therefore, when multiple threads need to exchange data, a queue appears, and the queue can It perfectly solves the data exchange between threads and ensures the security and consistency of data between threads.

Second, queue type
  1. queue.Queue(maxsize=0) FIFO
    import random
    
    
    queue_data = [1,2,3,4,5,6]
    random.shuffle(queue_data)
    print('Original data:', queue_data)
    
    queue_test = queue. Queue()
    
    def queuePut(value):
        queue_test. put(value)
    
    for i in queue_data:
        queuePut(i)
    
    for i in range(len(queue_data)):
        print(queue_test. get())
    
  2. queue.LifoQueue(maxsize=0) first in last out
    import queue
    import random
    
    
    queue_data = [1,2,3,4,5,6]
    random.shuffle(queue_data)
    print('Original data:', queue_data)
    
    queue_test = queue.LifoQueue()
    
    def queuePut(value):
        queue_test. put(value)
    
    for i in queue_data:
        queuePut(i)
    
    for i in range(len(queue_data)):
        print(queue_test. get())
    
    
  3. queue.PriorityQueue(maxsize=0) The lower the priority queue level, the earlier it comes out
    import queue
    import random
    
    
    queue_data = [1,2,3,4,5,6]
    random.shuffle(queue_data)
    print('Original data:', queue_data)
    
    queue_test = queue.PriorityQueue()
    
    def queuePut(value):
        queue_test. put(value)
    
    for i in queue_data:
        queuePut(i)
    
    for i in range(len(queue_data)):
        print(queue_test. get())
    
  4. queue.SimpleQueue(maxsize=0) Simplified queue, without the function of tracking tasks, only has empty, get, put 3 methods
3. Queue method
  1. queue.qsize() returns the size of the queue
  2. queue.empty() returns True if the queue is empty, otherwise False
  3. queue.full() returns True if the queue is full, otherwise False
  4. queue.get(block,timeout) Get the queue, block: take the value from the queue, if the value cannot be obtained, the program will not end, timeout: when the value of the block is true, how many seconds timeout is used to wait
  5. queue.put(value,block,timeout) write into the queue, value: the value written into the queue, block: if the queue is full, it will wait if you put a value in the queue again, timeout: when the value of the block is true When, timeout is used to wait for how many seconds
  6. queue.get_nowait() is equivalent to queue.get(False)
  7. queue.put_nowait(value) is equivalent to queue.put(value, False)
  8. queue.join() blocks the calling thread until all tasks in the queue are processed, which actually means waiting until the queue is empty before performing other operations
  9. queue.task_done() After completing a piece of work, the queue.task_done() function sends a signal to the queue that the task has completed
  10. queue.full corresponds to the size of maxsize
Fourth, queue + thread, queue + thread pool

Case: Simple two-person dialogue

  1. Queue + thread will lead to frequent creation of threads, and if the main thread is not closed, other threads will always open
    import queue
    import threading
    import time
    
    class ChatSend(threading. Thread):
        def __init__(self, name, message_queue):
            super().__init__()
            self.name = name
            self. message_queue = message_queue
    
        def run(self):
            '''Use task tracking in the queue'''
            message = input(f'{self.name}:')
            self. message_queue. put({
                'name': self.name,
                'message': message,
                'time': time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
            })
            self. message_queue. join()
    
    class ChatReceive(threading. Thread):
        def __init__(self, name, message_queue):
            super().__init__()
            self.name = name
            self. message_queue = message_queue
    
        def run(self):
            message_info = self. message_queue. get()
            print(f'\
    {message_info["time"]}')
            print(f'{self.name} received {message_info["name"]} message: {message_info["message"]}\
    ')
            self. message_queue. task_done()
    
    def chat(chat_person1, chat_person2):
        message_queue = queue. Queue()
    
        print(f'----------{chat_person1} and {chat_person2} are talking----------')
    
        while True:
            send1 = ChatSend(chat_person1, message_queue)
            receive1 = ChatReceive(chat_person2, message_queue)
    
            send1. start()
            receive1. start()
            receive1. join()
    
            send2 = ChatSend(chat_person2, message_queue)
            receive2 = ChatReceive(chat_person1, message_queue)
    
            send2. start()
            receive2. start()
            receive2. join()
    
    
    chat('Zhang San', 'Li Si')
    
  2. queue + thread + lock
    import queue
    import threading
    import time
    
    class ChatSend(threading. Thread):
        def __init__(self, name, message_queue, lock=None):
            super().__init__()
            self.name = name
            self. message_queue = message_queue
            self. lock = lock
    
        def run(self):
            self. lock. acquire()
            message = input(f'{self.name}:')
            self. message_queue. put({
                'name': self.name,
                'message': message,
                'time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
            })
            self. lock. notify()
            self. lock. release()
            
    class ChatReceive(threading. Thread):
        def __init__(self, name, message_queue, lock=None):
            super().__init__()
            self.name = name
            self. message_queue = message_queue
            self. lock = lock
    
        def run(self):
            self. lock. acquire()
            self. lock. wait()
            message_info = self. message_queue. get()
            print(f'\
    {message_info["time"]}')
            print(f'{self.name} received {message_info["name"]} message: {message_info["message"]}\
    ')
            self. lock. release()
    
    
    def chat(chat_person1, chat_person2):
        message_queue = queue. Queue()
        lock = threading. Condition()
    
        print(f'----------{chat_person1} and {chat_person2} are talking----------')
    
        while True:
            receive1 = ChatReceive(chat_person2, message_queue, lock)
            send1 = ChatSend(chat_person1, message_queue, lock)
    
            receive1. start()
            send1. start()
            receive1. join()
    
            receive2 = ChatReceive(chat_person1, message_queue, lock)
            send2 = ChatSend(chat_person2, message_queue, lock)
    
            receive2. start()
            send2. start()
            receive2. join()
    
    
    chat('Zhang San', 'Li Si')
    
  3. Queue + pool, will not create threads frequently
    import queue
    import threading
    import time
    from concurrent.futures import ThreadPoolExecutor,wait
    
    lock = threading. Condition()
    
    class ChatSend:
        def __init__(self, name, message_queue):
            self. send_person = name
            self. message_queue = message_queue
    
        def run(self):
            message = input(f'{self. send_person}:')
            self. message_queue. put({
                'name': self. send_person,
                'message': message,
                'time': time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
            })
            self. message_queue. join()
    
    class ChatReceive:
        def __init__(self, name, message_queue):
            self.receive_person = name
            self. message_queue = message_queue
    
        def run(self):
            message_info = self. message_queue. get()
            print(f'\
    {message_info["time"]}')
            print(f'{message_info["name"]} says to {self.receive_person}: {message_info["message"]}\
    ')
            self. message_queue. task_done()
    
    
    class Chat(ThreadPoolExecutor):
        def __init__(self, person1, person2):
            ThreadPoolExecutor.__init__(self, max_workers=2)
    
            message_queue = queue. Queue()
    
            print(f'----------{person1} and {person2} are talking----------')
    
            self.send1 = ChatSend(person1, message_queue)
            self.receive1 = ChatReceive(person2, message_queue)
    
            self.send2 = ChatSend(person2, message_queue)
            self.receive2 = ChatReceive(person1, message_queue)
    
        def start(self):
            while True:
                self. submit(self. send1. run)
                self. submit(self. receive1. run)
                self. submit(self. send2. run)
                self. submit(self. receive2. run)
    
    
    chat = Chat('Zhang San', 'Li Si')
    chat. start()