One trick to teach you how to control the number of threads in python multithreading

Good morning, good afternoon and good evening, everyone? ~Welcome to this article

If you have any questions/need information, you can click on the business card at the end of the article to get the source code.

I was using Python’s multi-threaded crawler to crawl a picture website. After turning on multi-threading, there was no limit on the number of threads.

In other words, if you download 1,000 pictures, 1,000 sub-threads will be opened at one time to download at the same time.

Now I want to control the number of threads: for example, only download 5 pictures at a time. When the download is completed, download another 5 pictures until all are completed.

After checking some information, I found that in python, the threading module provides the Semaphore class and the BoundedSemaphore class to limit the number of threads.

The official website gives examples as follows:

Semaphores are typically used to protect resources with limited capacity, such as database servers.

Bounded semaphores should be used in any situation where the resource size is fixed.

Before spawning any worker threads, your main thread will initialize the semaphore:

maxconnections = 5
#...
pool_sema = BoundedSemaphore(value=maxconnections)

After generation, the worker thread will call the acquisition and release methods of the semaphore when it needs to connect to the server:

with pool_sema:
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()

Multi-threaded crawler before transformation

First post the original code

# -*- coding:utf-8 -*-
'''
Have a question and no one has an answer? The editor has created a Python learning and communication QQ group: 702813599
Looking for like-minded friends to help each other, there are also good video learning tutorials and PDF e-books in the group!
'''
import requests
from requests.exceptions import RequestException
import os, time
import re
from lxml import etree
import threading

lock = threading.Lock()
def get_html(url):
    """
    Define a method to obtain the response content of a url page
    :param url: the url to access
    :return: response content
    """
    response = requests.get(url, timeout=10)
    # print(response.status_code)
    try:
        if response.status_code == 200:

            # print(response.text)
            return response.text
        else:
             return None
    exceptRequestException:
        print("Request failed")
        # return None


def parse_html(html_text):
    """
    Define a method to parse the page content and extract the image URL
    :param html_text:
    :return: A collection of image URLs for a page
    """
    html = etree.HTML(html_text)

    if len(html) > 0:
        img_src = html.xpath("//img[@class='photothumb lazy']/@data-original") # Element extraction method
        # print(img_src)
        return img_src

    else:
        print("Failed to parse page elements")

def get_image_pages(url):
    """
    Get all page numbers of the queried image results
    :param url: query image url
    :return: total number of pages
    """

    html_text = get_html(url) # Get the search url response content
    # print(html_text)
    if html_text is not None:
        html = etree.HTML(html_text) # Generate XPath parsing object
        last_page = html.xpath("//div[@class='pages']//a[last()]/@href") # Extract the href link of the last page
        print(last_page)
        if last_page:
            max_page = re.compile(r'(\d + )', re.S).search(last_page[0]).group() # Use regular expressions to extract the page number in the link
            print(max_page)
            print(type(max_page))
            return int(max_page) # Convert the string page number to an integer and return
        else:
            print("No data yet")
            return None
    else:
        print("Query result failed")


def get_all_image_url(page_number):
    """
    Get the download url of all images
    :param page_number: crawl page number
    :return: A collection of all image URLs
    """

    base_url = 'https://imgbin.com/free-png/naruto/'
    image_urls = []

    x = 1 # Define an identifier used to number each image URL, increasing from 1
    for i in range(1, page_number):
        url = base_url + str(i) # Traverse the request url based on the page number
        try:
            html = get_html(url) # Parse the content of each page
            if html:
                data = parse_html(html) # Extract the image url in the page
                # print(data)
                # time.sleep(3)
                if data:
                    for j in data:
                        image_urls.append({<!-- -->
                            'name': x,
                            'value': j
                        })
                        x + = 1 # Every time an image URL is extracted, the identifier x increases by 1
        except RequestException as f:
            print("Error encountered:", f)
            continue
    # print(image_urls)
    return image_urls

def get_image_content(url):
    """Request image url and return binary content"""
    # print("Downloading", url)
    try:
        r = requests.get(url, timeout=15)
        if r.status_code == 200:
            return r.content
        return None
    exceptRequestException:
        return None

def main(url, image_name):
    """
    Main function: implement the function of downloading pictures
    :param url: image url
    :param image_name: image name
    :return:
    """
    semaphore.acquire() # Lock, limit the number of threads
    print('Current sub-thread: {}'.format(threading.current_thread().name))
    save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
    try:
        file_path = '{0}/{1}.jpg'.format(save_path, image_name)
        if not os.path.exists(file_path): # Determine whether the file exists, crawl it if it does not exist
            with open(file_path, 'wb') as f:
                f.write(get_image_content(url))
                f.close()

                print('The {}th file was saved successfully'.format(image_name))

        else:
            print("{}th file already exists".format(image_name))

        semaphore.release() # Unlock imgbin-multi-threading-rewrite run method.py

    except FileNotFoundError as f:
        print("An error was encountered while downloading the {}th file, the url is: {}:".format(image_name, url))
        print("Error:", f)
        raise

    except TypeError as e:
        print("An error was encountered while downloading the {}th file, the url is: {}:".format(image_name, url))
        print("Error:", e)

class MyThread(threading.Thread):
    """Inherit the Thread class and override the run method to create a new process"""
    def __init__(self, func, args):
        """

        :param func: the function name to be called in the run method
        :param args: parameters required by func function
        """
        threading.Thread.__init__(self)
        self.func = func
        self.args = args

    def run(self):
        print('Current sub-thread: {}'.format(threading.current_thread().name))
        self.func(self.args[0], self.args[1])
        # Call func function
        # Because the func function here is actually the main() function mentioned above, it requires 2 parameters; args is passed in a parameter tuple, which is disassembled and passed in


if __name__ == '__main__':
    start = time.time()
    print('This is the main thread: {}'.format(threading.current_thread().name))

    urls = get_all_image_url(5) # Get the list of all image urls
    thread_list = [] # Define a list and add threads to it
    semaphore = threading.BoundedSemaphore(5) # or use the Semaphore method
    for t in urls:
        # print(i)

        m = MyThread(main, (t["value"], t["name"])) # Call the MyThread class and get an instance

        thread_list.append(m)

    for m in thread_list:

        m.start() # Call the start() method to start execution

    for m in thread_list:
        m.join() # The child thread calls the join() method, causing the main thread to wait for the child thread to finish running before exiting.


    end = time.time()
    print(end-start)
    # get_image_pages("https://imgbin.com/free-png/Naruto")

Modify the code

1. Lines 8 and 9 below indicate calling the BoundedSemaphore class of threading, initializing the semaphore to 5, and assigning the result to the variable pool_sema
'''
Have a question and no one has an answer? The editor has created a Python learning and communication QQ group: 702813599
Looking for like-minded friends to help each other, there are also good video learning tutorials and PDF e-books in the group!
'''
if __name__ == '__main__':
    start = time.time()
    print('This is the main thread: {}'.format(threading.current_thread().name))

    urls = get_all_image_url(5) # Get the list of all image urls
    thread_list = [] # Define a list and add threads to it

    max_connections = 5 # Define the maximum number of threads
    pool_sema = threading.BoundedSemaphore(max_connections) # or use the Semaphore method
    for t in urls:
        # print(i)

        m = MyThread(main, (t["value"], t["name"])) # Call the MyThread class and get an instance

        thread_list.append(m)

    for m in thread_list:

        m.start() # Call the start() method to start execution

    for m in thread_list:
        m.join() # The child thread calls the join() method, causing the main thread to wait for the child thread to finish running before exiting.


    end = time.time()
    print(end-start)
2. Modify the main() function
(1) Method 1: Implemented through the with statement, add with pool_sema on line 9

Using the with statement to acquire a lock, condition variable or semaphore is equivalent to calling acquire(); after leaving the with block, release() will be called automatically

def main(url, image_name):
    """
    Main function: implement the function of downloading pictures
    :param url: image url
    :param image_name: image name
    :return:
    """

    with pool_sema:
        print('Current sub-thread: {}'.format(threading.current_thread().name))
        save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
        try:
            file_path = '{0}/{1}.jpg'.format(save_path, image_name)
            if not os.path.exists(file_path): # Determine whether the file exists, crawl it if it does not exist
                with open(file_path, 'wb') as f:
                    f.write(get_image_content(url))
                    f.close()

                    print('The {}th file was saved successfully'.format(image_name))

            else:
                print("{}th file already exists".format(image_name))



        except FileNotFoundError as f:
            print("An error was encountered while downloading the {}th file, the url is: {}:".format(image_name, url))
            print("Error:", f)
            raise

        except TypeError as e:
            print("An error was encountered while downloading the {}th file, the url is: {}:".format(image_name, url))
            print("Error:", e)
(2) Method 2: Use acquire() and release() directly

Line 8 below calls acquire() and line 24 calls release()

'''
Have a question and no one has an answer? The editor has created a Python learning and communication QQ group: 702813599
Looking for like-minded friends to help each other, there are also good video learning tutorials and PDF e-books in the group!
'''
def main(url, image_name):
    """
    Main function: implement the function of downloading pictures
    :param url: image url
    :param image_name: image name
    :return:
    """
    pool_sema.acquire() # Lock, limit the number of threads
    # with pool_sema:
    print('Current sub-thread: {}'.format(threading.current_thread().name))
    save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
    try:
        file_path = '{0}/{1}.jpg'.format(save_path, image_name)
        if not os.path.exists(file_path): # Determine whether the file exists, crawl it if it does not exist
            with open(file_path, 'wb') as f:
                f.write(get_image_content(url))
                f.close()

                print('The {}th file was saved successfully'.format(image_name))

        else:
            print("{}th file already exists".format(image_name))

        pool_sema.release() # Unlock imgbin-multi-threading-override run method.py

    except FileNotFoundError as f:
        print("An error was encountered while downloading the {}th file, the url is: {}:".format(image_name, url))
        print("Error:", f)
        raise

    except TypeError as e:
        print("An error was encountered while downloading the {}th file, the url is: {}:".format(image_name, url))
        print("Error:", e)

The final effect is the same, 5 threads are enabled each time, and the next batch is started after completion.

Epilogue

Okay, that’s almost all today’s sharing!

If you have any questions about what you want to see in the next article, please leave a message in the comment area! I will update when you see it (? ?_?)?

If you like it, please follow the blogger, or like, favorite and comment on my article! ! !

Finally, let’s spread the word~ For more source codes, information, materials, answers, and exchanges click on the business card below to get it