Python, 3.8 spider scrapy for CSDN blog, convert to Markdown

This program is generated by ChatGPT based on Python3.8. Test on Ubuntu18/Linux

Note proxy is optional, you can remove it.

# Version: V1.2
# Scroll page to get more links
# Find the last page by checking the presence of <div class="article-list">
# Improve http timeout/retry/307redirect etc

# Version: V1.1
# filter content and only download inside the tag of <div id="article_content">

# Version: V1.0
# Basic functionality of parsing URLs, downloading images, and converting HTML to Markdown

import httpx
import re
import asyncio
import os
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from html2text import html2text

from urllib.parse import urlparse, urljoin
from tenacity import retry, stop_after_attempt, wait_fixed

BASE_URL = "https://blog.csdn.net/hushui/"
FIRST_PAGE_URL = urljoin(BASE_URL, "article/list/1")
PROXY = "http://127.0.0.1:3128"
PATTERN = r"https://blog\.csdn\.net/hushui/article/details/\d + "

@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def download_image(url, directory):
    async with httpx.AsyncClient(verify=False, proxies=PROXY) as client:
        response = await client.get(url,timeout=10)
        if response.status_code == 200:
            filename = os.path.basename(urlparse(url).path)
            filepath = os.path.join(directory, filename)
            os.makedirs(os.path.dirname(filepath), exist_ok=True) # Create parent directories
            with open(filepath, "wb") as f:
                f.write(response.content)


def sanitize_folder_name(name):
    # Trim the title before "_hushui"
    trimmed_title = name. split("_hushui")[0]
    # Replace invalid characters with a safe character like "-"
    invalid_chars = r'[\/":*?<>|]'
    sanitized_name = re.sub(invalid_chars, "-", trimmed_title)
    return sanitized_name. strip()

@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def process_url(url):
    async with httpx.AsyncClient(verify=False, proxies=PROXY) as client:
        response = await client.get(url,timeout=10)
        if response.status_code == 200:
            soup = BeautifulSoup(response. text, "html. parser")
            title = soup. title. string. strip()
            sanitized_title = sanitized_folder_name(title)
            url_path = urlparse(url).path # Extract the URL path
            url_prefix = url_path. split("/")[-1] # Extract the last string after the last "/"
            markdown_links = []
            image_directory = f"{url_prefix}_{sanitized_title}"
            os.makedirs(image_directory, exist_ok=True)

            # Download image files
            image_urls = []
            article_content = soup. find("div", id="article_content")
            if article_content:
                images = article_content. find_all("img")
                for image in images:
                    image_url = urljoin(url, image. get("src"))
                    parsed_image_url = urlparse(image_url)
                    image_url_without_params = parsed_image_url.scheme + "://" + parsed_image_url.netloc + parsed_image_url.path
                    image_urls.append(image_url_without_params)
                    await download_image(image_url, image_directory)

                # Filter content based on <div id="article_content">
                # Custom filtering logic for HTML to Markdown conversion
                filtered_tags = ["script", "style"] # Specify the tags to be filtered
                for tag in article_content. find_all(filtered_tags):
                    tag. decompose()

                # Replace image URLs with local paths in Markdown
                for image_url in image_urls:
                    image_filename = os.path.basename(urlparse(image_url).path)
                    local_path = os.path.join(image_directory, image_filename)
                    markdown_links.append(f"![Image]({local_path})")

                # Custom filtering logic for Markdown content
                # You can modify this section to filter out specific content based on your requirements
                html_content = article_content.encode_contents().decode() # Get the contents inside the <div id="article_content">
                markdown_text = html2text(html_content)
                filtered_text = markdown_text # Placeholder for filtered Markdown text

                markdown_filename = os.path.join(image_directory, f"{url_prefix}_{sanitized_title}.md")

                # Create the parent directory if it doesn't exist
                os.makedirs(os.path.dirname(markdown_filename), exist_ok=True)

                with open(markdown_filename, "w") as f:
                    f.write(filtered_text)

                print(f"Converted URL: {url} to Markdown: {markdown_filename}")
            else:
                print(f"No content found for URL: {url}")
        elif response.status_code == 307:
            # Handle the redirect
            redirect_url = response. headers. get("Location")
            if redirect_url:
                
                redirect_parsed = urlparse(redirect_url)
                if redirect_parsed.netloc:
                    # Absolute URL
                    absolute_url = redirect_url
                else:
                    # Relative URL, combine with base URL
                    absolute_url = urljoin(url, redirect_url)
                print(f"Received a 307 Temporary Redirect. Following redirect to: {absolute_url}")
                await process_url(absolute_url) # Make a new request to the redirect URL
            else:
                print("Received a 307 Temporary Redirect, but no redirect URL provided.")
        else:
            print(f"Failed to retrieve URL: {url} with response. status_code: {response. status_code}")

 
async def scroll_page():
    page_number = 1
    current_page_url = FIRST_PAGE_URL
    while True:
        async with httpx.AsyncClient(verify=False, proxies=PROXY) as client:
            response = await client. get(current_page_url)
            if response.status_code == 200:
                soup = BeautifulSoup(response. text, "html. parser")
                article_list = soup. find("div", class_="article-list")
                if article_list:
                    links = article_list.find_all("a", href=re.compile(PATTERN))
                    tasks = [process_url(urljoin(BASE_URL, link["href"])) for link in links]
                    await asyncio. gather(*tasks)
                    page_number += 1
                    current_page_url = urljoin(BASE_URL, f"article/list/{page_number}")
                    print(f"Start page: {current_page_url}")
                else:
                    print(f"Reached the last page: {current_page_url}")
                    break
            else:
                print(f"Failed to retrieve URL: {current_page_url}")


async def main():
    await scroll_page()


if __name__ == "__main__":
    asyncio. run(main())