Asynchronous crawler
1. High performance asynchronous crawler
Purpose: Use asynchrony to achieve high-performance data crawling in Crawler China
2. The way of asynchronous crawler
2.1 Multithreading, multiprocessing?
from threading import Thread
2.1.1 Process and thread
- Process: A running program. Every time we execute a program, the operating system will automatically prepare some necessary resources for this program (such as: allocate memory, create a thread that can be executed)
- Thread: In the program, the execution process that can be directly scheduled by the CPU is the smallest unit that the operating system can perform calculation scheduling. It is included in the process and is the actual operating unit in the process.
2.1.2 Advantages and disadvantages of multi-threading and multi-process
- Benefits: Threads or processes can be opened separately for related blocking operations, and blocking operations can be executed asynchronously
- Disadvantage: Unable to open multi-thread or multi-process without limit
2.1.3 The use of multithreading
Writing 1: Commonly used by reptiles
from threading import Thread def fun(name): for i in range(50): print(f"{<!-- -->name},{<!-- -->i}") if __name__ == '__main__': # without using multithreading print("Do not use multithreading:") fun_1 = fun("Chow Yun Fat") fun_2 = fun("Hu Ge") fun_3 = fun("Jay Chou") print("*"*100) # create thread print("Using multi-threaded execution:") t_1 = Thread(target=fun,args=("Chow Yun Fat",)) # When passing parameters, the parameters passed after args must be a tuple type, so add another "," in the brackets t_2 = Thread(target=fun, args=("Hu Ge",)) t_3 = Thread(target=fun, args=("Jay Chou",)) t_1. start() t_2.start() t_3.start() # When using multi-threading, in addition to the several multi-threads we defined, there is also a main thread. print("I am the main thread ------------------------------------------------------------------------" ---------")
Writing 2: object-oriented
from threading import Thread # Inherit Thread parent class class Mythread(Thread): def __init__(self,name): super(Mythread, self).__init__() self.name=name def run(self): for i in range(50): print(f"{<!-- -->self.name},{<!-- -->i}") if __name__ == '__main__': t1 = Mythread("Jay Chou") t2 = Mythread("Hu Ge") t3 = Mythread("Chow Yun Fat") t1. start() t2. start() t3. start()
2.1.4 Use of multi-process
from multiprocessing import Process def func(name): for i in range(100): print(f"I am {<!-- -->name},{<!-- -->i}") if __name__ == '__main__': p1 = Process(target=func,args=("Jay Chou",)) p2 = Process(target=func, args=("Jet Li",)) p1. start() p2. start()
Multiprocessing is similar to multithreading.
2.1.5 Choice of multi-thread and multi-process
- Multi-threading: tasks are relatively unified and codes are similar
- Multi-process: Tasks are relatively independent and rarely overlap.
- IP proxy pool
- Grab proxy IP from various websites
- Verify that the proxy IP is available
- Prepare the external interface
- IP proxy pool
2.2 Thread pool, process pool
from concurrent.futures import ThreadPoolExecutor
2.2.1 Advantages and disadvantages of threaded process pool
- Benefits: It can reduce the system’s frequency of creating and destroying threads and processes, and reduce system overhead
- Disadvantages: There is an upper limit on the number of threads or processes in the pool
2.2.2 Use of thread pool
from concurrent.futures import ThreadPoolExecutor import time def func(name): for i in range(30): print(f"{<!-- -->name},{<!-- -->i}") if __name__ == '__main__': with ThreadPoolExecutor(5) as t: t.submit(func,"Jay Chou") t.submit(func, "Hu Ge") t.submit(func, "Stephen Chow")
At this time, through the output, we found that it is almost the same as multi-threading and multi-process, and there is no difference. By modifying the content in the main function, the thread pool is optimized, as follows:
if __name__ == '__main__': with ThreadPoolExecutor(5) as t: for i in range(10): t.submit(func,f"Jay Chou{<!-- -->i}")
Because in multithreading, each thread requires us to define. Therefore, when the number of threads is large, the overhead for the system is relatively large, but creating a thread pool is similar to a thread buffer. We can define the number of threads that exist at the same time, and other threads will be on standby in the pool to optimize system overhead.
But what happens when we use the thread pool to accept the return value?
from concurrent.futures import ThreadPoolExecutor import time def func(name): time. sleep(2) return name if __name__ == '__main__': with ThreadPoolExecutor(5) as t: t.submit(func,"Jay Chou") t.submit(func,"Hu Ge") t.submit(func,"Zhou Xingchi")
By executing this code, we found that the code was executed successfully, but there was no output, and the desired return value was not obtained
from concurrent.futures import ThreadPoolExecutor import time def func(name,t): time. sleep(t) return name def fn(res): print(res. result()) if __name__ == '__main__': with ThreadPoolExecutor(5) as t: t.submit(func,"Jay Chou",3).add_done_callback(fn) t.submit(func,"Hu Ge",1).add_done_callback(fn) t.submit(func,"Zhou Xingchi",4).add_done_callback(fn)
Through the .add_done_callback(fn)
method, the execution can be returned. The execution order is uncertain, and the order of the return value is uncertain. Who completes, who returns.
from concurrent.futures import ThreadPoolExecutor import time def func(name,t): time. sleep(t) print(f"I am {<!-- -->name}") return name def fn(res): print(res. result()) if __name__ == '__main__': with ThreadPoolExecutor(5) as t: result = t.map(func,["Jay Chou","Hu Ge","Zhou Xingchi"],[2,1,3]) # result is a generator print(result) for r in result: print(r) # The result returned at this time is when the task is packaged, the order of the three parameters is: Jay Chou, Hu Ge, Stephen Chow, not in chronological order # map returns in the same order as encapsulation.
Pay attention to distinguish between .add_done_callback(fn)
and .map()
2.2. Thread pool case
Use the thread pool to crawl all chapters and directories of novel games
import requests import os from lxml import etree from concurrent.futures import ThreadPoolExecutor def download(url): headers = {<!-- --> "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36" } resp =requests.get(url=url,headers=headers) et = etree. HTML(resp. text) title = et.xpath("/html/body/div[3]/div/div[1]/div[1]/div[1]/h1/text()") article = et.xpath("/html/body/div[3]/div/div[1]/div[1]/div[2]/p//text()") t="".join(title) s = "".join(article).replace("\r\r","\r") with open(f"Journey to the West/{<!-- -->t}.text","w",encoding="utf-8") as f: f. write(s) print(t,"over!!") if __name__ == '__main__': if not os.path.exists("Journey to the West"): os.mkdir("Journey to the West") with ThreadPoolExecutor(5) as t: for i in range(1,102): num = 480 + i url =f"https://www.gushicimingju.com/novel/xiyouji/{<!-- -->num}.html" t. submit(download, url) print("all over!!!")
2.3 single thread + asynchronous coordination
- event_loop: The event loop is equivalent to an infinite loop. We can register some functions to this event loop. When certain conditions are met, the functions will be executed cyclically.
- coroutine: Coroutine object, we can register the coroutine object into the event loop, it may be called by the event loop, we can use the async keyword to define a method, This method will not be executed immediately when called, but returns a coroutine object.
- task: task, which is a further encapsulation of the coroutine object, including each state of the task.
- future: Represents tasks that will be executed or not yet executed in the future. In fact, there is no essential difference from task.
- async : Defines a coroutine.
- await: used to suspend the execution of blocking methods
2.4 The use of multi-process and thread pool
Use multi-process and thread pool to crawl pictures
import requests from lxml import etree from urllib import parse from multiprocessing import Process, Queue from concurrent.futures import ThreadPoolExecutor import os # Process 1 crawls the url address of each picture # Process 2 downloads pictures according to the address of process 1 def Get_src(url,q): headers = {<!-- --> "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36" } resp = requests. get(url=url, headers=headers) resp. encoding = "utf-8" et = etree. HTML(resp. text) href_lst = et.xpath('//div[@class = "item_list infinite_scroll"]//div[@class="item_b clearfix"]//a/@href') for href in href_lst: child_href = parse.urljoin(url,href) resp1 = requests.get(url=child_href,headers=headers) resp1. encoding = "utf-8" et2 =etree.HTML(resp1.text) src = et2.xpath('//div[@class="big-pic"]/a/img/@src')[0] q. put(src) print(src + "Enter Queue") q. put("over") def Get(q): with ThreadPoolExecutor(5) as t: for i in range(2,10): url =f"https://www.umei.cc/weimeiitupian/keaitupian/index_{<!-- -->i}.htm" t.map(Get_src,[url],[q]) def Downpic(url): resp = requests. get(url=url) resp. encoding = "utf-8" name = url. split("/")[-1] with open("Youyou Gallery/" + name,"wb") as f: f. write(resp. content) print(name + "download complete") def Download(q): with ThreadPoolExecutor(5) as t: while 1: url = q. get() if url != "over": t. submit(Downpic, url) else: break if __name__ == '__main__': if not os.path.exists("Youyou Gallery"): os.mkdir("Yimei Gallery") q = Queue() p1 = Process(target=Get,args=(q,)) p2 = Process(target=Download, args=(q,)) p1. start() p2. start() print("over!!!")
Note:
from urllib import parse url = parse.urljoin(url,url2) # url1 = "https://www.umei.cc/weimeiitupian/keaitupian/" url2 = "/bizhitupian/xiaoqingxinbizhi/204461.htm" # ==>url = "https://www.umei.cc/bizhitupian/xiaoqingxinbizhi/204461.htm" # url1 = "https://www.umei.cc/weimeiitupian/keaitupian" url2 = "bizhitupian/xiaoqingxinbizhi/204461.htm" # ==>url = "https://www.umei.cc/weimeiitupian/bizhitupian/xiaoqingxinbizhi/204461.htm"
url processing module, use parse.urljoin(url1,url2)
to splice url1
url2
two addresses, when the address two ends with /
at the beginning, only keep the server URL of url1
, when url2
does not start with /
, only url1
to replace the last part of the .
The processes are independent of each other. When using process two to call the address parameter of process one, a third-party queue Queue is needed. This third-party queue needs to be imported, and q needs to be passed to the process as a parameter
The definition of the queue needs to be defined before the process
from multiprocessing import Process, Queue if __name__ == '__main__': q = Queue() p1 = Process(target=Get,args=(q,))