This program is generated by ChatGPT based on Python3.8. Test on Ubuntu18/Linux
Note proxy is optional, you can remove it.
# Version: V1.2 # Scroll page to get more links # Find the last page by checking the presence of <div class="article-list"> # Improve http timeout/retry/307redirect etc # Version: V1.1 # filter content and only download inside the tag of <div id="article_content"> # Version: V1.0 # Basic functionality of parsing URLs, downloading images, and converting HTML to Markdown import httpx import re import asyncio import os from bs4 import BeautifulSoup from urllib.parse import urljoin from html2text import html2text from urllib.parse import urlparse, urljoin from tenacity import retry, stop_after_attempt, wait_fixed BASE_URL = "https://blog.csdn.net/hushui/" FIRST_PAGE_URL = urljoin(BASE_URL, "article/list/1") PROXY = "http://127.0.0.1:3128" PATTERN = r"https://blog\.csdn\.net/hushui/article/details/\d + " @retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) async def download_image(url, directory): async with httpx.AsyncClient(verify=False, proxies=PROXY) as client: response = await client.get(url,timeout=10) if response.status_code == 200: filename = os.path.basename(urlparse(url).path) filepath = os.path.join(directory, filename) os.makedirs(os.path.dirname(filepath), exist_ok=True) # Create parent directories with open(filepath, "wb") as f: f.write(response.content) def sanitize_folder_name(name): # Trim the title before "_hushui" trimmed_title = name. split("_hushui")[0] # Replace invalid characters with a safe character like "-" invalid_chars = r'[\/":*?<>|]' sanitized_name = re.sub(invalid_chars, "-", trimmed_title) return sanitized_name. strip() @retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) async def process_url(url): async with httpx.AsyncClient(verify=False, proxies=PROXY) as client: response = await client.get(url,timeout=10) if response.status_code == 200: soup = BeautifulSoup(response. text, "html. parser") title = soup. title. string. strip() sanitized_title = sanitized_folder_name(title) url_path = urlparse(url).path # Extract the URL path url_prefix = url_path. split("/")[-1] # Extract the last string after the last "/" markdown_links = [] image_directory = f"{url_prefix}_{sanitized_title}" os.makedirs(image_directory, exist_ok=True) # Download image files image_urls = [] article_content = soup. find("div", id="article_content") if article_content: images = article_content. find_all("img") for image in images: image_url = urljoin(url, image. get("src")) parsed_image_url = urlparse(image_url) image_url_without_params = parsed_image_url.scheme + "://" + parsed_image_url.netloc + parsed_image_url.path image_urls.append(image_url_without_params) await download_image(image_url, image_directory) # Filter content based on <div id="article_content"> # Custom filtering logic for HTML to Markdown conversion filtered_tags = ["script", "style"] # Specify the tags to be filtered for tag in article_content. find_all(filtered_tags): tag. decompose() # Replace image URLs with local paths in Markdown for image_url in image_urls: image_filename = os.path.basename(urlparse(image_url).path) local_path = os.path.join(image_directory, image_filename) markdown_links.append(f"![Image]({local_path})") # Custom filtering logic for Markdown content # You can modify this section to filter out specific content based on your requirements html_content = article_content.encode_contents().decode() # Get the contents inside the <div id="article_content"> markdown_text = html2text(html_content) filtered_text = markdown_text # Placeholder for filtered Markdown text markdown_filename = os.path.join(image_directory, f"{url_prefix}_{sanitized_title}.md") # Create the parent directory if it doesn't exist os.makedirs(os.path.dirname(markdown_filename), exist_ok=True) with open(markdown_filename, "w") as f: f.write(filtered_text) print(f"Converted URL: {url} to Markdown: {markdown_filename}") else: print(f"No content found for URL: {url}") elif response.status_code == 307: # Handle the redirect redirect_url = response. headers. get("Location") if redirect_url: redirect_parsed = urlparse(redirect_url) if redirect_parsed.netloc: # Absolute URL absolute_url = redirect_url else: # Relative URL, combine with base URL absolute_url = urljoin(url, redirect_url) print(f"Received a 307 Temporary Redirect. Following redirect to: {absolute_url}") await process_url(absolute_url) # Make a new request to the redirect URL else: print("Received a 307 Temporary Redirect, but no redirect URL provided.") else: print(f"Failed to retrieve URL: {url} with response. status_code: {response. status_code}") async def scroll_page(): page_number = 1 current_page_url = FIRST_PAGE_URL while True: async with httpx.AsyncClient(verify=False, proxies=PROXY) as client: response = await client. get(current_page_url) if response.status_code == 200: soup = BeautifulSoup(response. text, "html. parser") article_list = soup. find("div", class_="article-list") if article_list: links = article_list.find_all("a", href=re.compile(PATTERN)) tasks = [process_url(urljoin(BASE_URL, link["href"])) for link in links] await asyncio. gather(*tasks) page_number += 1 current_page_url = urljoin(BASE_URL, f"article/list/{page_number}") print(f"Start page: {current_page_url}") else: print(f"Reached the last page: {current_page_url}") break else: print(f"Failed to retrieve URL: {current_page_url}") async def main(): await scroll_page() if __name__ == "__main__": asyncio. run(main())