Python implements log collection, monitors program status and uses Kafka to send messages
This article mainly uses Python to monitor the running status of the program and collect the generated error log files. Finally, the collected log data is sent using Kafka to facilitate other services to pull and process it.
1. Prepare a demo program to generate logs
The example code is as follows:
import os import time log_file_path = "C:/Users/15224/Desktop/log_file.log" with open(log_file_path, 'w') as file: file.write("Start Writing.\\ ") # Get the current process ID and print it print(f"Current process ID: {os.getpid()}") print("----------------------------------") ERROR_MES = '''Log [http-nio-8180-exec-43] ERROR c.r.f.w.e.GlobalExceptionHandler - [handleRuntimeException,96] - Requesting address '/captchaImage', an unknown exception occurred. java.lang.NullPointerException: null at sun.awt.FontConfiguration.getVersion(FontConfiguration.java:1264) at sun.awt.FontConfiguration.readFontConfigFile(FontConfiguration.java:219) at sun.awt.FontConfiguration.init(FontConfiguration.java:107)END''' def simulated_program(): for i in range(100): print(f"iteration {i}") with open(log_file_path, 'a') as file: if i % 2 == 0: file.write(f"Log INFO {i}\\ ") else: file.write(ERROR_MES + f"{i}\\ ") time.sleep(1) simulated_program()
The function of this code is mainly used to simulate the log file data generated during normal program execution. In this program, log files with log types ERROR and INFO are alternately written to a custom folder. The collector will retrieve the specified log files from this folder.
Run the program and check whether logs are generated in the corresponding folder:
Successfully generate log files starting with Log prefix
2. Implement log collection program
The example code is as follows:
from confluent_kafka import Producer import psutil import time import json import os key_msg = "ERROR" # Which logs need to be collected? Here are the logs containing ERROR. log_prefix = "Log" # Log prefix. Each log tested starts with Log. Example: Log [http-nio-8180-exec-43] ERROR c.r.f.w.e.GlobalExceptionHandler... # Decide to monitor the application based on application name or process ID, choose one of the two process_name = "process_name" process_id = 8888 log_file_path = "C:/Users/15224/Desktop/log_file.log" # Replace this value with the path to store the log last_position = 0 #Record the position where the log was last read log_list = [] log_map = {} # Kafka server configuration and create Kafka producer bootstrap_servers = '127.0.0.1:9092' topic = 'log-topic' producer = Producer({'bootstrap.servers': bootstrap_servers}) def delivery_report(err, msg): if err is not None: print(f'Message delivery failed: {err}') else: print(f'Message delivered to {msg.topic()} [{msg.partition()}]') def send_map_to_kafka(data): # Convert dictionary to JSON format data_str = json.dumps(data) # Send data to Kafka topic producer.produce(topic, data_str, callback=delivery_report) # Flush buffer and trigger message delivery callback producer.flush() # Determine the status based on the application name def check_process_status(process_name): for proc in psutil.process_iter(['pid', 'name']): if process_name.lower() in proc.info['name'].lower(): return True return False while True: if check_process_status(process_name): print(f"{process_name} is running.") else: print(f"{process_name} is not running.") # If you want to judge based on the process ID, comment out the above and open this comment # while True: # if psutil.pid_exists(process_id): # print(f"Process with PID {process_id} is running.") #else: # print(f"Process with PID {process_id} is not running.") if os.path.exists(log_file_path): with open(log_file_path, 'rb') as file: file.seek(last_position) new_data = file.read().decode('utf-8', errors='ignore') # Read the new content from the last position to the end of the file new_lines = new_data.splitlines() # Split the read content into lines # print(new_lines) capture=False for line in new_lines: if key_msg in line: log_list.append(line) capture=True elif line.startswith(log_prefix) and capture: capture=False continue elif capture: log_list.append(line) if log_list: # If there is ERROR information in log_list log_map = {"lastLog": log_list} # Send data to Kafka send_map_to_kafka(log_map) print("LatestLogMap:", log_map) print("---------------------------------------------\ ") log_list = [] # Clear the list last_position = file.tell() # Update the position of the file pointer to the end of the file else: print(f"Log file {log_file_path} does not exist.") time.sleep(5) # Execute once every 5 seconds
This code mainly implements monitoring the running status of the current program based on the program name and obtaining the specified log files from the log folder. The log files read each time are the latest ones. After being obtained, these log files are encapsulated into a Map. Sent to a specific Kafka topic.
Note: If you monitor the status according to the program name, please ensure that your program name is unique
Before running the program, you need to start the Kafka service and create the corresponding topic
1) Start Kafka:
Find your Kafka installation directory, enter bin, enter windows, and run cmd under windows
Kafka strongly relies on Zookeeper, start Zookeeper first
zookeeper-server-start.bat ../../config/zookeeper.properties
Run cmd again in the windows directory to start Kafka
kafka-server-start.bat ../../config/server.properties
Successfully started Kafka
2) Create a Kafka topic
Also open the command line window in the windows directory and enter:
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic log-topic
where log-topic is your customized topic name
3) Create a Kafka consumer and subscribe to the topic
Open a command line window in the windows directory and enter:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic log-topic
4) Start the program
The program runs successfully, collects logs containing ERROR from the log files in the specified folder, and sends them to the Kafka topic.
The consumer also successfully received the message
At this point, we have implemented the use of Python to monitor the status of a specific program, and collected the error log files of the program and sent them to the Kakfa topic.