Distributed real-time log analysis and intrusion detection system based on Flume + spark + Flask
Introduction
LogVision is a log analysis solution that integrates web log aggregation, distribution, real-time analysis, intrusion detection, data storage and visualization. Apache Flume is used for aggregation, Apache Kafka is used for distribution, Spark Streaming is used for real-time processing, Spark MLlib is used for intrusion detection, HDFS and Redis are used for data storage, and Flask, SocketIO, Echarts, and Bootstrap are used for visualization.
The following usage methods in this article are all for single-machine pseudo-distributed environments. You can adjust the configuration according to your needs to adapt to distributed deployment.
Project structure
- flask: Flask web backend
- spark: implementation of log analysis and intrusion detection
- flume: Flume configuration file
- log_gen: Simulation log generator
- datasets: test log data set
- images: pictures of README
Dependencies and versions
- Required for compilation and web side:
- Java 8, Scala 2.11.12, Python 3.8 (see requirements for package dependencies), sbt 1.3.8
- Needed in computing environment:
- Java 8, Apache Flume 1.9.0, Kafka 2.4, Spark 2.4.5, ZooKeeper 3.5.7, Hadoop 2.9.2, Redis 5.0.8
Instructions for use
Before starting, you need to modify the IP in the source code or configuration file to your own address. Specifically involving the flume configuration file, Spark main program, and Flask Web backend.
Compile Spark application
After installing Java8 and Scala11, initialize sbt
in the spark
directory:
sbt
Exit sbt shell
and use sbt-assembly
to compile and package the Spark project:
sbt assembly
Then rename the generated jar
package to logvision.jar
.
Environment preparation
You need a pseudo-distributed environment (the test environment is CentOS 7), and have completed the configuration and operation of all corresponding version component dependencies.
Use standalone.conf
in the flume
directory to start a Flume Agent.
Submit learning-datasets
in the datasets
folder to the following path:
/home/logv/learning-datasets
Submit the access_log
in the datasets
folder to the following path:
/home/logv/access_log
Intrusion detection model training and testing
Submit the jar
package to the Spark cluster and perform the generation and testing of the intrusion detection model:
spark-submit --class learning logvision.jar
You will see the following results:
The two tables represent the intrusion detection results of normal and abnormal data sets respectively. The following four tables can be used to judge the recognition accuracy. As shown in the figure, 250 pieces of normal test data were detected as 250 pieces as normal, and the recognition rate was 100%; 250 pieces of abnormal test data were detected as 240 pieces of abnormal data, and 10 pieces were normal, with an accuracy rate of 96%.
Start the visualization backend
Execute the following command in the flask
directory to download the dependency package:
pip3 install -r requirements.txt
Start Flask Web:
python3 app.py
Start real-time log generator
The real-time log generator in log_gen
can append specific line blocks in the sample log to the target log according to the incoming parameters (number of lines written each time, write interval time) to simulate real-time logs The generation process is for subsequent real-time processing.
java log_gen [log source] [target file] [number of lines appended each time] [time interval (seconds)]
Submit it to the environment, compile and run, and append 5 lines in the /home/logv/access_log
file to /home/logSrc
every 2 seconds:
javac log_gen.java java log_gen /home/logv/access_log /home/logSrc 5 2
Start analysis task
Submit the jar
package to the Spark cluster and perform real-time analysis tasks:
spark-submit --class streaming logvision.jar
View visualization results
At this point, you have completed the configuration of the back-end components. You can view the visual results of real-time log analysis by accessing the 5000
port of the web host through a browser:
Welcome Screen:
Part of the source code:
# coding=utf-8 import ast import time from kafka import KafkaConsumer import redis import requests from threading import Lock, Thread from flask import Flask, render_template, session, request from flask_socketio import SocketIO, emit async_mode = None app = Flask(__name__) app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app, async_mode=async_mode) thread=None thread_lock = Lock() #Configuration project time_interval = 1 kafka_bootstrap_servers = "10.0.0.222:9092" redis_con_pool = redis.ConnectionPool(host='10.0.0.222', port=6379, decode_responses=True) # Page routing and the ws interface of the corresponding page # system time @socketio.on('connect', namespace='/sys_time') def sys_time(): def loop(): while True: socketio.sleep(time_interval) current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) socketio.emit('sys_time', {<!-- -->'data': current_time}, namespace='/sys_time') socketio.start_background_task(target=loop) # Welcome page @app.route('/') @app.route('/welcome') def welcome(): return render_template('index.html', async_mode=socketio.async_mode) # Real-time log stream @socketio.on('connect', namespace='/log_stream') def log_stream(): def loop(): socketio.sleep(time_interval) consumer = KafkaConsumer("raw_log", bootstrap_servers=kafka_bootstrap_servers) cache = "" for msg in consumer: cache + = bytes.decode(msg.value) + "\ " if len(cache.split("\ ")) == 25: socketio.emit('log_stream', {<!-- -->'data': cache}, namespace='/log_stream') cache = "" socketio.start_background_task(target=loop) # Real-time log analysis page @app.route('/analysis') def analysis(): return render_template('analysis.html', async_mode=socketio.async_mode) # Real-time counter @socketio.on('connect', namespace='/count_board') def count_board(): def loop(): while True: socketio.sleep(time_interval) redis_con = redis.Redis(connection_pool=redis_con_pool) res = redis_con.zrange("statcode", 0, 40, withscores=True) #Total number of requests (number of log lines) host_count = redis_con.zscore("line", "count") #Number of successful requests (number of status codes belonging to normal) normal = ["200", "201", "202", "203", "204", "205", "206", "207"] success_count = 0 for i in res: if i[0] in normal: success_count + = int(i[1]) #Number of other requests (number of other status codes) other_count = 0 for i in res: other_count + = int(i[1]) other_count -= success_count # Number of visitors (number of different IPs) visitor_count = redis_con.zcard("host") # Number of resources (number of different URLs) url_count = redis_con.zcard("url") # Traffic size (sum of bytes, MB) traffic_sum = int(redis_con.zscore("traffic", "sum")) # Log size (MB) log_size = int(redis_con.zscore("size", "sum")) socketio.emit('count_board', {<!-- -->'host_count': host_count, 'success_count': success_count, 'other_count': other_count, 'visitor_count': visitor_count, 'url_count': url_count, 'traffic_sum': traffic_sum, 'log_size': log_size}, namespace='/count_board') socketio.start_background_task(target=loop) # Real-time popular locations @socketio.on('connect', namespace='/hot_geo') def hot_geo(): def loop(): while True: socketio.sleep(2) redis_con = redis.Redis(connection_pool=redis_con_pool) res = redis_con.zrevrange("host", 0, 50, withscores=True) data = [] for i in res: # Call the interface to obtain geographical coordinates req = requests.get("http://api.map.baidu.com/location/ip", {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt', 'ip': i[0], 'coor': 'bd09ll'}) body = eval(req.text) # Only display domestic positioning if body['status'] == 0: coor_x = body['content']['point']['x'] coor_y = body['content']['point']['y'] data.append({<!-- -->"name": i[0], "value": [coor_x, coor_y, i[1]]}) socketio.emit('hot_geo', {<!-- -->'data': data}, namespace='/hot_geo') socketio.start_background_task(target=loop) # Real-time ranking of popular resources @socketio.on('connect', namespace='/hot_url') def hot_url(): def loop(): while True: socketio.sleep(time_interval) redis_con = redis.Redis(connection_pool=redis_con_pool) res = redis_con.zrevrange("url", 0, 9, withscores=True) data = [] no=1 for i in res: data.append({<!-- -->"no": no, "url": i[0], "count": i[1]}) no + = 1 socketio.emit('hot_url', {<!-- -->'data': data}, namespace='/hot_url') socketio.start_background_task(target=loop) # Real-time popular IP ranking @socketio.on('connect', namespace='/hot_ip') def hot_ip(): def loop(): while True: socketio.sleep(time_interval) redis_con = redis.Redis(connection_pool=redis_con_pool) res = redis_con.zrevrange("host", 0, 13, withscores=True) data = [] no=1 for i in res: # Call the interface to obtain geographical coordinates req = requests.get("http://api.map.baidu.com/location/ip", {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt', 'ip': i[0], 'coor': 'bd09ll'}) body = eval(req.text) # Only display domestic positioning if body['status'] == 0: address = body['content']['address'] data.append({<!-- -->"no": no, "ip": i[0], "address": address, "count": i[1]}) no + = 1 socketio.emit('hot_ip', {<!-- -->'data': data}, namespace='/hot_ip') socketio.start_background_task(target=loop) # Real-time status code ratio @socketio.on('connect', namespace='/status_code_pie') def status_code_pie(): def loop(): while True: socketio.sleep(time_interval) redis_con = redis.Redis(connection_pool=redis_con_pool) res = redis_con.zrevrange("statcode", 0, 100, withscores=True) data = [] legend = [] for i in res: if i[0] != 'foo': data.append({<!-- -->"value": i[1], "name": i[0]}) legend.append(i[0]) socketio.emit('status_code_pie', {<!-- -->'legend': legend, 'data': data}, namespace='/status_code_pie') socketio.start_background_task(target=loop) # Real-time request mode ratio @socketio.on('connect', namespace='/req_method_pie') def req_method_pie(): def loop(): while True: socketio.sleep(time_interval) redis_con = redis.Redis(connection_pool=redis_con_pool) res = redis_con.zrevrange("reqmt", 0, 100, withscores=True) data = [] legend = [] for i in res: if i[0] != 'foo': data.append({<!-- -->"value": i[1], "name": i[0]}) legend.append(i[0]) socketio.emit('req_method_pie', {<!-- -->'legend': legend, 'data': data}, namespace='/req_method_pie') socketio.start_background_task(target=loop) # Real-time request count (in chronological order) @socketio.on('connect', namespace='/req_count_timeline') def req_count_timeline(): def loop(): while True: socketio.sleep(time_interval) redis_con = redis.Redis(connection_pool=redis_con_pool) res = dict(redis_con.zrange("datetime", 0, 10000000, withscores=True)) data = [] date = [] # Sort by time for i in sorted(res): datetime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i) / 1000)) data.append(res[i]) date.append(datetime) socketio.emit('req_count_timeline', {<!-- -->"data": data, "date": date}, namespace='/req_count_timeline') socketio.start_background_task(target=loop) # Sort by number of IP requests @socketio.on('connect', namespace='/ip_ranking') def timestamp_count_timeline(): def loop(): while True: socketio.sleep(time_interval) redis_con = redis.Redis(connection_pool=redis_con_pool) res = redis_con.zrevrange("host", 0, 50, withscores=True) ip = [] count = [] for i in res: ip.append(i[0]) count.append(i[1]) socketio.emit('ip_ranking', {<!-- -->"ip": ip, "count": count}, namespace='/ip_ranking') socketio.start_background_task(target=loop) @app.route('/id') def id(): return render_template("id.html", async_mode=socketio.async_mode) #Exception request count @socketio.on('connect', namespace='/bad_count') def bad_count(): def loop(): while True: socketio.sleep(time_interval) redis_con = redis.Redis(connection_pool=redis_con_pool) res = int(redis_con.zscore("bad", "bad")) socketio.emit('bad_count', {<!-- -->"data": res}, namespace='/bad_count') socketio.start_background_task(target=loop) #Normal request count @socketio.on('connect', namespace='/good_count') def bad_count(): def loop(): while True: socketio.sleep(time_interval) redis_con = redis.Redis(connection_pool=redis_con_pool) res = int(redis_con.zscore("good", "good")) socketio.emit('good_count', {<!-- -->"data": res}, namespace='/good_count') socketio.start_background_task(target=loop) #Request geotag normally @socketio.on('connect', namespace='/good_geo') def good_geo(): def loop(): while True: socketio.sleep(time_interval) consumer = KafkaConsumer("good_result", bootstrap_servers=kafka_bootstrap_servers) data = [] for msg in consumer: result = ast.literal_eval(bytes.decode(msg.value)) for record in result: if record['host'] != "foo": # Call the interface to obtain geographical coordinates req = requests.get("http://api.map.baidu.com/location/ip", {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt', 'ip': record['host'], 'coor': 'bd09ll'}) body = eval(req.text) # Only display domestic positioning if body['status'] == 0: coor_x = body['content']['point']['x'] coor_y = body['content']['point']['y'] datetime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(record['timestamp']) / 1000)) data.append({<!-- -->"name": record['host'], "value": [coor_x, coor_y, record['url'], datetime, record['req_method'], record['protocol'], record['status_code']]}) socketio.emit('good_geo', {<!-- -->"data": data}, namespace='/good_geo') socketio.start_background_task(target=loop) # Exception request geotag @socketio.on('connect', namespace='/bad_geo') def bad_geo(): def loop(): while True: socketio.sleep(time_interval) consumer = KafkaConsumer("bad_result", bootstrap_servers=kafka_bootstrap_servers) data = [] for msg in consumer: result = ast.literal_eval(bytes.decode(msg.value)) for record in result: if record['host'] != "foo": # Call the interface to obtain geographical coordinates req = requests.get("http://api.map.baidu.com/location/ip", {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt', 'ip': record['host'], 'coor': 'bd09ll'}) body = eval(req.text) # Only display domestic positioning if body['status'] == 0: coor_x = body['content']['point']['x'] coor_y = body['content']['point']['y'] datetime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(record['timestamp']) / 1000)) data.append({<!-- -->"name": record['host'], "value": [coor_x, coor_y, record['url'], datetime, record['req_method'], record['protocol'], record['status_code']]}) socketio.emit('bad_geo', {<!-- -->"data": data}, namespace='/bad_geo') socketio.start_background_task(target=loop) # Real-time intrusion classification count (in chronological order) @socketio.on('connect', namespace='/url_cate_count_timeline') def url_cate_count_timeline(): def loop(): while True: socketio.sleep(time_interval) redis_con = redis.Redis(connection_pool=redis_con_pool) good_res = dict(redis_con.zrange("goodts", 0, 10000000, withscores=True)) bad_res = dict(redis_con.zrange("badts", 0, 10000000, withscores=True)) # Find the union of the timestamps of normal and abnormal results and sort them. Regenerate the corresponding normal and abnormal counts date = [] date_ts = [] good_date = [] bad_date = [] good_data = [] bad_data = [] # Find the union and sort for i in good_res: good_date.append(i) for j in bad_res: bad_date.append(j) for k in sorted(list(set(good_date) | set(bad_date))): date_ts.append(k) # Generate the corresponding count for t in date_ts: if t in good_res: good_data.append(good_res[t]) else: good_data.append(0) if t in bad_res: bad_data.append(bad_res[t]) else: bad_data.append(0) # Convert timestamp to string for ts in date_ts: date.append(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(ts) / 1000))) socketio.emit('url_cate_count_timeline', {<!-- -->"date": date, "good_data": good_data, "bad_data": bad_data}, namespace='/url_cate_count_timeline') socketio.start_background_task(target=loop) # Overview of real-time exception requests @socketio.on('connect', namespace='/bad_detail') def bad_detail(): def loop(): while True: socketio.sleep(time_interval) consumer = KafkaConsumer("bad_result", bootstrap_servers=kafka_bootstrap_servers) data = [] for msg in consumer: result = ast.literal_eval(bytes.decode(msg.value)) for record in result: if record['host'] != "foo": # Call the interface to obtain geographical coordinates req = requests.get("http://api.map.baidu.com/location/ip", {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt', 'ip': record['host'], 'coor': 'bd09ll'}) body = eval(req.text) # Only display domestic positioning if body['status'] == 0: address = body['content']['address'] datetime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(record['timestamp']) / 1000)) data.append({<!-- -->"host": record['host'], "address": address, "url": record['url'], "datetime": datetime, "req_method": record['req_method'], "protocol": record['protocol'], "status_code": record['status_code'], "pred": record['prediction'], 'prob': record['probability']['values']}) socketio.emit('bad_detail', {<!-- -->"data": data}, namespace='/bad_detail') socketio.start_background_task(target=loop) # Real-time normal request overview @socketio.on('connect', namespace='/good_detail') def good_detail(): def loop(): while True: socketio.sleep(time_interval) consumer = KafkaConsumer("good_result", bootstrap_servers=kafka_bootstrap_servers) data = [] for msg in consumer: result = ast.literal_eval(bytes.decode(msg.value)) for record in result: if record['host'] != "foo": # Call the interface to obtain geographical coordinates req = requests.get("http://api.map.baidu.com/location/ip", {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt', 'ip': record['host'], 'coor': 'bd09ll'}) body = eval(req.text) # Only display domestic positioning if body['status'] == 0: address = body['content']['address'] datetime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(record['timestamp']) / 1000)) data.append({<!-- -->"host": record['host'], "address": address, "url": record['url'], "datetime": datetime, "req_method": record['req_method'], "protocol": record['protocol'], "status_code": record['status_code'], "pred": record['prediction'], 'prob': record['probability']['values']}) socketio.emit('good_detail', {<!-- -->"data": data}, namespace='/good_detail') socketio.start_background_task(target=loop) @app.route('/about') def about(): return render_template("about.html", async_mode=socketio.async_mode) if __name__ == '__main__': socketio.run(app, host="0.0.0.0", port=5000, debug=True)