5.1 Multi-thread and multi-process

Asynchronous crawler

1. High performance asynchronous crawler

Purpose: Use asynchrony to achieve high-performance data crawling in Crawler China

2. The way of asynchronous crawler

2.1 Multithreading, multiprocessing?

from threading import Thread

2.1.1 Process and thread

  • Process: A running program. Every time we execute a program, the operating system will automatically prepare some necessary resources for this program (such as: allocate memory, create a thread that can be executed)
  • Thread: In the program, the execution process that can be directly scheduled by the CPU is the smallest unit that the operating system can perform calculation scheduling. It is included in the process and is the actual operating unit in the process.

2.1.2 Advantages and disadvantages of multi-threading and multi-process

  1. Benefits: Threads or processes can be opened separately for related blocking operations, and blocking operations can be executed asynchronously
  2. Disadvantage: Unable to open multi-thread or multi-process without limit

2.1.3 The use of multithreading

Writing 1: Commonly used by reptiles

from threading import Thread

def fun(name):
    for i in range(50):
        print(f"{<!-- -->name},{<!-- -->i}")

if __name__ == '__main__':
    # without using multithreading
    print("Do not use multithreading:")
    fun_1 = fun("Chow Yun Fat")
    fun_2 = fun("Hu Ge")
    fun_3 = fun("Jay Chou")
    print("*"*100)
    # create thread
    print("Using multi-threaded execution:")
    t_1 = Thread(target=fun,args=("Chow Yun Fat",))
    # When passing parameters, the parameters passed after args must be a tuple type, so add another "," in the brackets
    t_2 = Thread(target=fun, args=("Hu Ge",))
    t_3 = Thread(target=fun, args=("Jay Chou",))

    t_1. start()
    t_2.start()
    t_3.start()

    # When using multi-threading, in addition to the several multi-threads we defined, there is also a main thread.
    print("I am the main thread ------------------------------------------------------------------------" ---------")

Writing 2: object-oriented

from threading import Thread
# Inherit Thread parent class
class Mythread(Thread):
    def __init__(self,name):
        super(Mythread, self).__init__()
        self.name=name

    def run(self):
        for i in range(50):
            print(f"{<!-- -->self.name},{<!-- -->i}")

if __name__ == '__main__':
    t1 = Mythread("Jay Chou")
    t2 = Mythread("Hu Ge")
    t3 = Mythread("Chow Yun Fat")

    t1. start()
    t2. start()
    t3. start()

2.1.4 Use of multi-process

from multiprocessing import Process


def func(name):
    for i in range(100):
        print(f"I am {<!-- -->name},{<!-- -->i}")

if __name__ == '__main__':
    p1 = Process(target=func,args=("Jay Chou",))
    p2 = Process(target=func, args=("Jet Li",))

    p1. start()
    p2. start()

Multiprocessing is similar to multithreading.

2.1.5 Choice of multi-thread and multi-process

  • Multi-threading: tasks are relatively unified and codes are similar
  • Multi-process: Tasks are relatively independent and rarely overlap.
    • IP proxy pool
      • Grab proxy IP from various websites
      • Verify that the proxy IP is available
      • Prepare the external interface

2.2 Thread pool, process pool

from concurrent.futures import ThreadPoolExecutor

2.2.1 Advantages and disadvantages of threaded process pool

  1. Benefits: It can reduce the system’s frequency of creating and destroying threads and processes, and reduce system overhead
  2. Disadvantages: There is an upper limit on the number of threads or processes in the pool

2.2.2 Use of thread pool

from concurrent.futures import ThreadPoolExecutor
import time

def func(name):
    for i in range(30):
        print(f"{<!-- -->name},{<!-- -->i}")

if __name__ == '__main__':
    with ThreadPoolExecutor(5) as t:
        t.submit(func,"Jay Chou")
        t.submit(func, "Hu Ge")
        t.submit(func, "Stephen Chow")

At this time, through the output, we found that it is almost the same as multi-threading and multi-process, and there is no difference. By modifying the content in the main function, the thread pool is optimized, as follows:

if __name__ == '__main__':
    with ThreadPoolExecutor(5) as t:
        for i in range(10):
            t.submit(func,f"Jay Chou{<!-- -->i}")

Because in multithreading, each thread requires us to define. Therefore, when the number of threads is large, the overhead for the system is relatively large, but creating a thread pool is similar to a thread buffer. We can define the number of threads that exist at the same time, and other threads will be on standby in the pool to optimize system overhead.

But what happens when we use the thread pool to accept the return value?

from concurrent.futures import ThreadPoolExecutor
import time

def func(name):
    time. sleep(2)
    return name


if __name__ == '__main__':
    with ThreadPoolExecutor(5) as t:
        t.submit(func,"Jay Chou")
        t.submit(func,"Hu Ge")
        t.submit(func,"Zhou Xingchi")

By executing this code, we found that the code was executed successfully, but there was no output, and the desired return value was not obtained

from concurrent.futures import ThreadPoolExecutor
import time

def func(name,t):
    time. sleep(t)
    return name


def fn(res):
    print(res. result())

if __name__ == '__main__':
    with ThreadPoolExecutor(5) as t:
        t.submit(func,"Jay Chou",3).add_done_callback(fn)
        t.submit(func,"Hu Ge",1).add_done_callback(fn)
        t.submit(func,"Zhou Xingchi",4).add_done_callback(fn)

Through the .add_done_callback(fn) method, the execution can be returned. The execution order is uncertain, and the order of the return value is uncertain. Who completes, who returns.

from concurrent.futures import ThreadPoolExecutor
import time

def func(name,t):
    time. sleep(t)
    print(f"I am {<!-- -->name}")
    return name


def fn(res):
    print(res. result())

if __name__ == '__main__':
    with ThreadPoolExecutor(5) as t:
        result = t.map(func,["Jay Chou","Hu Ge","Zhou Xingchi"],[2,1,3])
        # result is a generator
        print(result)
        for r in result:
            print(r)
        # The result returned at this time is when the task is packaged, the order of the three parameters is: Jay Chou, Hu Ge, Stephen Chow, not in chronological order
        # map returns in the same order as encapsulation.

Pay attention to distinguish between .add_done_callback(fn) and .map()

2.2. Thread pool case

Use the thread pool to crawl all chapters and directories of novel games

import requests
import os
from lxml import etree
from concurrent.futures import ThreadPoolExecutor


def download(url):
    headers = {<!-- -->
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
    }
    resp =requests.get(url=url,headers=headers)
    et = etree. HTML(resp. text)
    title = et.xpath("/html/body/div[3]/div/div[1]/div[1]/div[1]/h1/text()")
    article = et.xpath("/html/body/div[3]/div/div[1]/div[1]/div[2]/p//text()")
    t="".join(title)
    s = "".join(article).replace("\r\r","\r")
    with open(f"Journey to the West/{<!-- -->t}.text","w",encoding="utf-8") as f:
        f. write(s)
    print(t,"over!!")

if __name__ == '__main__':
    if not os.path.exists("Journey to the West"):
        os.mkdir("Journey to the West")
    with ThreadPoolExecutor(5) as t:
        for i in range(1,102):
            num = 480 + i
            url =f"https://www.gushicimingju.com/novel/xiyouji/{<!-- -->num}.html"
            t. submit(download, url)
    print("all over!!!")

2.3 single thread + asynchronous coordination

  1. event_loop: The event loop is equivalent to an infinite loop. We can register some functions to this event loop. When certain conditions are met, the functions will be executed cyclically.
  2. coroutine: Coroutine object, we can register the coroutine object into the event loop, it may be called by the event loop, we can use the async keyword to define a method, This method will not be executed immediately when called, but returns a coroutine object.
  3. task: task, which is a further encapsulation of the coroutine object, including each state of the task.
  4. future: Represents tasks that will be executed or not yet executed in the future. In fact, there is no essential difference from task.
  5. async : Defines a coroutine.
  6. await: used to suspend the execution of blocking methods

2.4 The use of multi-process and thread pool

Use multi-process and thread pool to crawl pictures

import requests
from lxml import etree
from urllib import parse
from multiprocessing import Process, Queue
from concurrent.futures import ThreadPoolExecutor
import os
# Process 1 crawls the url address of each picture

# Process 2 downloads pictures according to the address of process 1


def Get_src(url,q):
    headers = {<!-- -->
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
    }
    resp = requests. get(url=url, headers=headers)
    resp. encoding = "utf-8"
    et = etree. HTML(resp. text)
    href_lst = et.xpath('//div[@class = "item_list infinite_scroll"]//div[@class="item_b clearfix"]//a/@href')
    for href in href_lst:
        child_href = parse.urljoin(url,href)
        resp1 = requests.get(url=child_href,headers=headers)
        resp1. encoding = "utf-8"
        et2 =etree.HTML(resp1.text)
        src = et2.xpath('//div[@class="big-pic"]/a/img/@src')[0]
        q. put(src)
        print(src + "Enter Queue")
    q. put("over")

def Get(q):
    with ThreadPoolExecutor(5) as t:
        for i in range(2,10):
            url =f"https://www.umei.cc/weimeiitupian/keaitupian/index_{<!-- -->i}.htm"
            t.map(Get_src,[url],[q])

def Downpic(url):
    resp = requests. get(url=url)
    resp. encoding = "utf-8"
    name = url. split("/")[-1]
    with open("Youyou Gallery/" + name,"wb") as f:
        f. write(resp. content)
    print(name + "download complete")

def Download(q):
    with ThreadPoolExecutor(5) as t:
        while 1:
            url = q. get()
            if url != "over":
                t. submit(Downpic, url)
            else:
                break


if __name__ == '__main__':

    if not os.path.exists("Youyou Gallery"):
        os.mkdir("Yimei Gallery")
    q = Queue()
    p1 = Process(target=Get,args=(q,))
    p2 = Process(target=Download, args=(q,))
    p1. start()
    p2. start()
    print("over!!!")

Note:

from urllib import parse
url = parse.urljoin(url,url2)
# url1 = "https://www.umei.cc/weimeiitupian/keaitupian/" url2 = "/bizhitupian/xiaoqingxinbizhi/204461.htm"
# ==>url = "https://www.umei.cc/bizhitupian/xiaoqingxinbizhi/204461.htm"
# url1 = "https://www.umei.cc/weimeiitupian/keaitupian" url2 = "bizhitupian/xiaoqingxinbizhi/204461.htm"
# ==>url = "https://www.umei.cc/weimeiitupian/bizhitupian/xiaoqingxinbizhi/204461.htm"

url processing module, use parse.urljoin(url1,url2) to splice url1 url2 two addresses, when the address two ends with / at the beginning, only keep the server URL of url1, when url2 does not start with /, only url1 to replace the last part of the .

The processes are independent of each other. When using process two to call the address parameter of process one, a third-party queue Queue is needed. This third-party queue needs to be imported, and q needs to be passed to the process as a parameter

The definition of the queue needs to be defined before the process

from multiprocessing import Process, Queue
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=Get,args=(q,))