Python implements log collection and sends data using Kafka

Python implements log collection, monitors program status and uses Kafka to send messages

This article mainly uses Python to monitor the running status of the program and collect the generated error log files. Finally, the collected log data is sent using Kafka to facilitate other services to pull and process it.

1. Prepare a demo program to generate logs

The example code is as follows:

import os
import time

log_file_path = "C:/Users/15224/Desktop/log_file.log"
with open(log_file_path, 'w') as file:
    file.write("Start Writing.\\
")

# Get the current process ID and print it
print(f"Current process ID: {os.getpid()}")
print("----------------------------------")

ERROR_MES = '''Log [http-nio-8180-exec-43] ERROR c.r.f.w.e.GlobalExceptionHandler - [handleRuntimeException,96] - Requesting address '/captchaImage', an unknown exception occurred.
java.lang.NullPointerException: null
at sun.awt.FontConfiguration.getVersion(FontConfiguration.java:1264)
at sun.awt.FontConfiguration.readFontConfigFile(FontConfiguration.java:219)
at sun.awt.FontConfiguration.init(FontConfiguration.java:107)END'''


def simulated_program():
    for i in range(100):
        print(f"iteration {i}")
        with open(log_file_path, 'a') as file:
            if i % 2 == 0:
                file.write(f"Log INFO {i}\\
")
            else:
                file.write(ERROR_MES + f"{i}\\
")
        time.sleep(1)


simulated_program()

The function of this code is mainly used to simulate the log file data generated during normal program execution. In this program, log files with log types ERROR and INFO are alternately written to a custom folder. The collector will retrieve the specified log files from this folder.

Run the program and check whether logs are generated in the corresponding folder:

Successfully generate log files starting with Log prefix

2. Implement log collection program

The example code is as follows:

from confluent_kafka import Producer
import psutil
import time
import json
import os

key_msg = "ERROR" # Which logs need to be collected? Here are the logs containing ERROR.
log_prefix = "Log" # Log prefix. Each log tested starts with Log. Example: Log [http-nio-8180-exec-43] ERROR c.r.f.w.e.GlobalExceptionHandler...

# Decide to monitor the application based on application name or process ID, choose one of the two
process_name = "process_name"
process_id = 8888

log_file_path = "C:/Users/15224/Desktop/log_file.log" # Replace this value with the path to store the log
last_position = 0 #Record the position where the log was last read
log_list = []
log_map = {}

# Kafka server configuration and create Kafka producer
bootstrap_servers = '127.0.0.1:9092'
topic = 'log-topic'
producer = Producer({'bootstrap.servers': bootstrap_servers})


def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')


def send_map_to_kafka(data):
    # Convert dictionary to JSON format
    data_str = json.dumps(data)
    # Send data to Kafka topic
    producer.produce(topic, data_str, callback=delivery_report)
    # Flush buffer and trigger message delivery callback
    producer.flush()


# Determine the status based on the application name
def check_process_status(process_name):
    for proc in psutil.process_iter(['pid', 'name']):
        if process_name.lower() in proc.info['name'].lower():
            return True
    return False


while True:
    if check_process_status(process_name):
        print(f"{process_name} is running.")
    else:
        print(f"{process_name} is not running.")

    # If you want to judge based on the process ID, comment out the above and open this comment
# while True:
# if psutil.pid_exists(process_id):
# print(f"Process with PID {process_id} is running.")
#else:
# print(f"Process with PID {process_id} is not running.")

    if os.path.exists(log_file_path):
        with open(log_file_path, 'rb') as file:
            file.seek(last_position)
            new_data = file.read().decode('utf-8', errors='ignore') # Read the new content from the last position to the end of the file
            new_lines = new_data.splitlines() # Split the read content into lines
            # print(new_lines)
            capture=False
            for line in new_lines:
                if key_msg in line:
                    log_list.append(line)
                    capture=True
                elif line.startswith(log_prefix) and capture:
                    capture=False
                    continue
                elif capture:
                    log_list.append(line)
            if log_list: # If there is ERROR information in log_list
                log_map = {"lastLog": log_list}
                # Send data to Kafka
                send_map_to_kafka(log_map)
                print("LatestLogMap:", log_map)
                print("---------------------------------------------\ ")
                log_list = [] # Clear the list
            last_position = file.tell() # Update the position of the file pointer to the end of the file

    else:
        print(f"Log file {log_file_path} does not exist.")

    time.sleep(5) # Execute once every 5 seconds

This code mainly implements monitoring the running status of the current program based on the program name and obtaining the specified log files from the log folder. The log files read each time are the latest ones. After being obtained, these log files are encapsulated into a Map. Sent to a specific Kafka topic.

Note: If you monitor the status according to the program name, please ensure that your program name is unique

Before running the program, you need to start the Kafka service and create the corresponding topic

1) Start Kafka:

Find your Kafka installation directory, enter bin, enter windows, and run cmd under windows

Kafka strongly relies on Zookeeper, start Zookeeper first

zookeeper-server-start.bat ../../config/zookeeper.properties

Run cmd again in the windows directory to start Kafka

kafka-server-start.bat ../../config/server.properties

Successfully started Kafka

2) Create a Kafka topic

Also open the command line window in the windows directory and enter:

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic log-topic

where log-topic is your customized topic name

3) Create a Kafka consumer and subscribe to the topic

Open a command line window in the windows directory and enter:

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic log-topic

4) Start the program

The program runs successfully, collects logs containing ERROR from the log files in the specified folder, and sends them to the Kafka topic.

The consumer also successfully received the message

At this point, we have implemented the use of Python to monitor the status of a specific program, and collected the error log files of the program and sent them to the Kakfa topic.