Graduation project – distributed real-time log analysis and intrusion detection system based on Flume+spark+Flask

Distributed real-time log analysis and intrusion detection system based on Flume + spark + Flask

Introduction

LogVision is a log analysis solution that integrates web log aggregation, distribution, real-time analysis, intrusion detection, data storage and visualization. Apache Flume is used for aggregation, Apache Kafka is used for distribution, Spark Streaming is used for real-time processing, Spark MLlib is used for intrusion detection, HDFS and Redis are used for data storage, and Flask, SocketIO, Echarts, and Bootstrap are used for visualization.

The following usage methods in this article are all for single-machine pseudo-distributed environments. You can adjust the configuration according to your needs to adapt to distributed deployment.

Project structure

  • flask: Flask web backend
  • spark: implementation of log analysis and intrusion detection
  • flume: Flume configuration file
  • log_gen: Simulation log generator
  • datasets: test log data set
  • images: pictures of README

Dependencies and versions

  • Required for compilation and web side:
    • Java 8, Scala 2.11.12, Python 3.8 (see requirements for package dependencies), sbt 1.3.8
  • Needed in computing environment:
    • Java 8, Apache Flume 1.9.0, Kafka 2.4, Spark 2.4.5, ZooKeeper 3.5.7, Hadoop 2.9.2, Redis 5.0.8

Instructions for use

Before starting, you need to modify the IP in the source code or configuration file to your own address. Specifically involving the flume configuration file, Spark main program, and Flask Web backend.

Compile Spark application

After installing Java8 and Scala11, initialize sbt in the spark directory:

sbt

Exit sbt shell and use sbt-assembly to compile and package the Spark project:

sbt assembly

Then rename the generated jar package to logvision.jar.

Environment preparation

You need a pseudo-distributed environment (the test environment is CentOS 7), and have completed the configuration and operation of all corresponding version component dependencies.
Use standalone.conf in the flume directory to start a Flume Agent.
Submit learning-datasets in the datasets folder to the following path:

/home/logv/learning-datasets

Submit the access_log in the datasets folder to the following path:

/home/logv/access_log

Intrusion detection model training and testing

Submit the jar package to the Spark cluster and perform the generation and testing of the intrusion detection model:

spark-submit --class learning logvision.jar

You will see the following results:

The two tables represent the intrusion detection results of normal and abnormal data sets respectively. The following four tables can be used to judge the recognition accuracy. As shown in the figure, 250 pieces of normal test data were detected as 250 pieces as normal, and the recognition rate was 100%; 250 pieces of abnormal test data were detected as 240 pieces of abnormal data, and 10 pieces were normal, with an accuracy rate of 96%.

Start the visualization backend

Execute the following command in the flask directory to download the dependency package:

pip3 install -r requirements.txt

Start Flask Web:

python3 app.py

Start real-time log generator

The real-time log generator in log_gen can append specific line blocks in the sample log to the target log according to the incoming parameters (number of lines written each time, write interval time) to simulate real-time logs The generation process is for subsequent real-time processing.

java log_gen [log source] [target file] [number of lines appended each time] [time interval (seconds)]

Submit it to the environment, compile and run, and append 5 lines in the /home/logv/access_log file to /home/logSrc every 2 seconds:

javac log_gen.java
java log_gen /home/logv/access_log /home/logSrc 5 2

Start analysis task

Submit the jar package to the Spark cluster and perform real-time analysis tasks:

spark-submit --class streaming logvision.jar

View visualization results

At this point, you have completed the configuration of the back-end components. You can view the visual results of real-time log analysis by accessing the 5000 port of the web host through a browser:
Welcome Screen:

Part of the source code:

# coding=utf-8
import ast
import time
from kafka import KafkaConsumer
import redis
import requests

from threading import Lock, Thread
from flask import Flask, render_template, session, request
from flask_socketio import SocketIO, emit

async_mode = None
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, async_mode=async_mode)

thread=None
thread_lock = Lock()

#Configuration project
time_interval = 1
kafka_bootstrap_servers = "10.0.0.222:9092"
redis_con_pool = redis.ConnectionPool(host='10.0.0.222', port=6379, decode_responses=True)


# Page routing and the ws interface of the corresponding page
# system time
@socketio.on('connect', namespace='/sys_time')
def sys_time():
    def loop():
        while True:
            socketio.sleep(time_interval)
            current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            socketio.emit('sys_time',
                          {<!-- -->'data': current_time},
                          namespace='/sys_time')

    socketio.start_background_task(target=loop)


# Welcome page
@app.route('/')
@app.route('/welcome')
def welcome():
    return render_template('index.html', async_mode=socketio.async_mode)


# Real-time log stream
@socketio.on('connect', namespace='/log_stream')
def log_stream():
    def loop():
        socketio.sleep(time_interval)
        consumer = KafkaConsumer("raw_log", bootstrap_servers=kafka_bootstrap_servers)
        cache = ""
        for msg in consumer:
            cache + = bytes.decode(msg.value) + "\
"
            if len(cache.split("\
")) == 25:
                socketio.emit('log_stream',
                              {<!-- -->'data': cache},
                              namespace='/log_stream')
                cache = ""

    socketio.start_background_task(target=loop)


# Real-time log analysis page
@app.route('/analysis')
def analysis():
    return render_template('analysis.html', async_mode=socketio.async_mode)


# Real-time counter
@socketio.on('connect', namespace='/count_board')
def count_board():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrange("statcode", 0, 40, withscores=True)

            #Total number of requests (number of log lines)
            host_count = redis_con.zscore("line", "count")

            #Number of successful requests (number of status codes belonging to normal)
            normal = ["200", "201", "202", "203", "204", "205", "206", "207"]
            success_count = 0
            for i in res:
                if i[0] in normal:
                    success_count + = int(i[1])

            #Number of other requests (number of other status codes)
            other_count = 0
            for i in res:
                other_count + = int(i[1])
            other_count -= success_count

            # Number of visitors (number of different IPs)
            visitor_count = redis_con.zcard("host")

            # Number of resources (number of different URLs)
            url_count = redis_con.zcard("url")

            # Traffic size (sum of bytes, MB)
            traffic_sum = int(redis_con.zscore("traffic", "sum"))

            # Log size (MB)
            log_size = int(redis_con.zscore("size", "sum"))

            socketio.emit('count_board',
                          {<!-- -->'host_count': host_count,
                           'success_count': success_count,
                           'other_count': other_count,
                           'visitor_count': visitor_count,
                           'url_count': url_count,
                           'traffic_sum': traffic_sum,
                           'log_size': log_size},
                          namespace='/count_board')

    socketio.start_background_task(target=loop)


# Real-time popular locations
@socketio.on('connect', namespace='/hot_geo')
def hot_geo():
    def loop():
        while True:
            socketio.sleep(2)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("host", 0, 50, withscores=True)
            data = []

            for i in res:
                # Call the interface to obtain geographical coordinates
                req = requests.get("http://api.map.baidu.com/location/ip",
                                   {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                    'ip': i[0],
                                    'coor': 'bd09ll'})
                body = eval(req.text)

                # Only display domestic positioning
                if body['status'] == 0:
                    coor_x = body['content']['point']['x']
                    coor_y = body['content']['point']['y']

                    data.append({<!-- -->"name": i[0], "value": [coor_x, coor_y, i[1]]})

            socketio.emit('hot_geo',
                          {<!-- -->'data': data},
                          namespace='/hot_geo')

    socketio.start_background_task(target=loop)


# Real-time ranking of popular resources
@socketio.on('connect', namespace='/hot_url')
def hot_url():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("url", 0, 9, withscores=True)
            data = []
            no=1

            for i in res:
                data.append({<!-- -->"no": no, "url": i[0], "count": i[1]})
                no + = 1

            socketio.emit('hot_url',
                          {<!-- -->'data': data},
                          namespace='/hot_url')

    socketio.start_background_task(target=loop)


# Real-time popular IP ranking
@socketio.on('connect', namespace='/hot_ip')
def hot_ip():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("host", 0, 13, withscores=True)
            data = []
            no=1

            for i in res:
                # Call the interface to obtain geographical coordinates
                req = requests.get("http://api.map.baidu.com/location/ip",
                                   {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                    'ip': i[0],
                                    'coor': 'bd09ll'})
                body = eval(req.text)

                # Only display domestic positioning
                if body['status'] == 0:
                    address = body['content']['address']

                    data.append({<!-- -->"no": no, "ip": i[0], "address": address, "count": i[1]})
                    no + = 1

            socketio.emit('hot_ip',
                          {<!-- -->'data': data},
                          namespace='/hot_ip')

    socketio.start_background_task(target=loop)


# Real-time status code ratio
@socketio.on('connect', namespace='/status_code_pie')
def status_code_pie():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("statcode", 0, 100, withscores=True)
            data = []
            legend = []

            for i in res:
                if i[0] != 'foo':
                    data.append({<!-- -->"value": i[1], "name": i[0]})
                    legend.append(i[0])

            socketio.emit('status_code_pie',
                          {<!-- -->'legend': legend, 'data': data},
                          namespace='/status_code_pie')

    socketio.start_background_task(target=loop)


# Real-time request mode ratio
@socketio.on('connect', namespace='/req_method_pie')
def req_method_pie():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("reqmt", 0, 100, withscores=True)
            data = []
            legend = []

            for i in res:
                if i[0] != 'foo':
                    data.append({<!-- -->"value": i[1], "name": i[0]})
                    legend.append(i[0])

            socketio.emit('req_method_pie',
                          {<!-- -->'legend': legend, 'data': data},
                          namespace='/req_method_pie')

    socketio.start_background_task(target=loop)


# Real-time request count (in chronological order)
@socketio.on('connect', namespace='/req_count_timeline')
def req_count_timeline():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = dict(redis_con.zrange("datetime", 0, 10000000, withscores=True))
            data = []
            date = []

            # Sort by time
            for i in sorted(res):
                datetime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i) / 1000))
                data.append(res[i])
                date.append(datetime)

            socketio.emit('req_count_timeline',
                          {<!-- -->"data": data, "date": date},
                          namespace='/req_count_timeline')

    socketio.start_background_task(target=loop)


# Sort by number of IP requests
@socketio.on('connect', namespace='/ip_ranking')
def timestamp_count_timeline():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("host", 0, 50, withscores=True)
            ip = []
            count = []

            for i in res:
                ip.append(i[0])
                count.append(i[1])

            socketio.emit('ip_ranking',
                          {<!-- -->"ip": ip, "count": count},
                          namespace='/ip_ranking')

    socketio.start_background_task(target=loop)


@app.route('/id')
def id():
    return render_template("id.html", async_mode=socketio.async_mode)


#Exception request count
@socketio.on('connect', namespace='/bad_count')
def bad_count():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = int(redis_con.zscore("bad", "bad"))

            socketio.emit('bad_count',
                          {<!-- -->"data": res},
                          namespace='/bad_count')

    socketio.start_background_task(target=loop)


#Normal request count
@socketio.on('connect', namespace='/good_count')
def bad_count():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = int(redis_con.zscore("good", "good"))

            socketio.emit('good_count',
                          {<!-- -->"data": res},
                          namespace='/good_count')

    socketio.start_background_task(target=loop)


#Request geotag normally
@socketio.on('connect', namespace='/good_geo')
def good_geo():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("good_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # Call the interface to obtain geographical coordinates
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # Only display domestic positioning
                        if body['status'] == 0:
                            coor_x = body['content']['point']['x']
                            coor_y = body['content']['point']['y']
                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({<!-- -->"name": record['host'], "value": [coor_x, coor_y,
                                                                           record['url'],
                                                                           datetime,
                                                                           record['req_method'],
                                                                           record['protocol'],
                                                                           record['status_code']]})
                            socketio.emit('good_geo',
                                          {<!-- -->"data": data},
                                          namespace='/good_geo')

    socketio.start_background_task(target=loop)


# Exception request geotag
@socketio.on('connect', namespace='/bad_geo')
def bad_geo():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("bad_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # Call the interface to obtain geographical coordinates
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # Only display domestic positioning
                        if body['status'] == 0:
                            coor_x = body['content']['point']['x']
                            coor_y = body['content']['point']['y']
                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({<!-- -->"name": record['host'], "value": [coor_x, coor_y,
                                                                           record['url'],
                                                                           datetime,
                                                                           record['req_method'],
                                                                           record['protocol'],
                                                                           record['status_code']]})
                            socketio.emit('bad_geo',
                                          {<!-- -->"data": data},
                                          namespace='/bad_geo')

    socketio.start_background_task(target=loop)


# Real-time intrusion classification count (in chronological order)
@socketio.on('connect', namespace='/url_cate_count_timeline')
def url_cate_count_timeline():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            good_res = dict(redis_con.zrange("goodts", 0, 10000000, withscores=True))
            bad_res = dict(redis_con.zrange("badts", 0, 10000000, withscores=True))

            # Find the union of the timestamps of normal and abnormal results and sort them. Regenerate the corresponding normal and abnormal counts
            date = []
            date_ts = []
            good_date = []
            bad_date = []

            good_data = []
            bad_data = []
            # Find the union and sort
            for i in good_res:
                good_date.append(i)
            for j in bad_res:
                bad_date.append(j)
            for k in sorted(list(set(good_date) | set(bad_date))):
                date_ts.append(k)

            # Generate the corresponding count
            for t in date_ts:
                if t in good_res:
                    good_data.append(good_res[t])
                else:
                    good_data.append(0)
                if t in bad_res:
                    bad_data.append(bad_res[t])
                else:
                    bad_data.append(0)
            # Convert timestamp to string
            for ts in date_ts:
                date.append(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(ts) / 1000)))

            socketio.emit('url_cate_count_timeline',
                          {<!-- -->"date": date, "good_data": good_data, "bad_data": bad_data},
                          namespace='/url_cate_count_timeline')

    socketio.start_background_task(target=loop)


# Overview of real-time exception requests
@socketio.on('connect', namespace='/bad_detail')
def bad_detail():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("bad_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # Call the interface to obtain geographical coordinates
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # Only display domestic positioning
                        if body['status'] == 0:
                            address = body['content']['address']

                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({<!-- -->"host": record['host'], "address": address, "url": record['url'],
                                         "datetime": datetime, "req_method": record['req_method'],
                                         "protocol": record['protocol'], "status_code": record['status_code'],
                                         "pred": record['prediction'], 'prob': record['probability']['values']})

                            socketio.emit('bad_detail',
                                          {<!-- -->"data": data},
                                          namespace='/bad_detail')
    socketio.start_background_task(target=loop)


# Real-time normal request overview
@socketio.on('connect', namespace='/good_detail')
def good_detail():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("good_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # Call the interface to obtain geographical coordinates
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {<!-- -->'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # Only display domestic positioning
                        if body['status'] == 0:
                            address = body['content']['address']

                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({<!-- -->"host": record['host'], "address": address, "url": record['url'],
                                         "datetime": datetime, "req_method": record['req_method'],
                                         "protocol": record['protocol'], "status_code": record['status_code'],
                                         "pred": record['prediction'], 'prob': record['probability']['values']})

                            socketio.emit('good_detail',
                                          {<!-- -->"data": data},
                                          namespace='/good_detail')
    socketio.start_background_task(target=loop)


@app.route('/about')
def about():
    return render_template("about.html", async_mode=socketio.async_mode)


if __name__ == '__main__':
    socketio.run(app, host="0.0.0.0", port=5000, debug=True)

Get the complete project via private message