ZooKeeper+Kafka+ELK+Filebeat cluster construction to realize large batch log collection and display

The general process: After collecting the logs of the nginx server (web-filebeat) through filebeat, store them in the cache server kafka, and then logstash to the kafka server to retrieve the corresponding logs. After processing, they are written to the elasticsearch server and displayed on kibana.

1. Cluster environment preparation

4c/8G/100G 10.10.200.33 Kafka + ZooKeeper + ES + Filebeat + ES-head
4c/8G/100G 10.10.200.34 Kafka + ZooKeeper + ES + Kibana
4c/8G/100G 10.10.200.35 Kafka + ZooKeeper + ES + Logstash

2. Build a zookeeper cluster

Prerequisites: Modify the time zone, turn off the firewall, install JAVA environment variables, and modify the host name of the three machines respectively.

[root@kf-zk-es-fb_es-head logs]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
10.10.200.33 kf-zk-es-fb_es-head
10.10.200.34 kf-zk-es-kibana
10.10.200.35 kf-zk-es-logstash
[root@kf-zk-es-fb_es-head logs]# java -version
openjdk version "1.8.0_382"
OpenJDK Runtime Environment (build 1.8.0_382-b05)
OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode)
[root@kf-zk-es-fb_es-head logs]# cat /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64/
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=$PATH:$JRE_HOME/bin:$JAVA_HOME/bin

2.1 Install zookeeper (synchronization of three machines)

[root@kf-zk-es-fb_es-head ~]# wget http://dlcdn.apache.org/zookeeper/zookeeper-3.8.3/apache-zookeeper-3.8.3-bin.tar.gz
[root@kf-zk-es-fb_es-head ~]# tar -zxvf apache-zookeeper-3.8.3-bin.tar.gz -C /usr/local/
[root@kf-zk-es-fb_es-head ~]# mv /usr/local/apache-zookeeper-3.8.3-bin/ /usr/local/zookeeper-3.8.3
[root@kf-zk-es-fb_es-head ~]# cp /usr/local/zookeeper-3.8.0/conf/zoo_sample.cfg /usr/local/zookeeper-3.8.0/conf/zoo.cfg
Modify configuration file
Set the local IP to 0.0.0.0
Node one:
[root@kf-zk-es-fb_es-head ~]# vim /usr/local/zookeeper-3.8.3/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-3.8.3/data
dataLogDir=/usr/local/zookeeper-3.8.3/logs
clientPort=2181
server.1=0.0.0.0:2888:3888
server.2=10.10.200.34:2888:3888
server.3=10.10.200.35:2888:3888
Node two:
[root@kf-zk-es-kibana ~]# egrep -v '^#|^$' /usr/local/zookeeper-3.8.3/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-3.8.3/data
dataLogDir=/usr/local/zookeeper-3.8.3/logs
clientPort=2181
server.1=10.10.200.33:2888:3888
server.2=0.0.0.0:2888:3888
server.3=10.10.200.35:2888:3888
Node three:
[root@kf-zk-es-logstash ~]# egrep -v '^#|^$' /usr/local/zookeeper-3.8.3/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-3.8.3/data
dataLogDir=/usr/local/zookeeper-3.8.3/logs
clientPort=2181
server.1=10.10.200.33:2888:3888
server.2=10.10.200.34:2888:3888
server.3=0.0.0.0:2888:3888

Among them, dataDir and dataLogDir need to be created manually, otherwise an error will be reported when starting the service. Create a new myid file after the directory is completed:
Node one:
[root@kf-zk-es-fb_es-head data]# pwd
/usr/local/zookeeper-3.8.3/data
[root@kf-zk-es-fb_es-head data]# cat myid
1
Node two:
[root@kf-zk-es-kibana data]# cat myid
2
Node three:
[root@kf-zk-es-logstash data]# cat myid
3
Mainly to correspond to server.1.2.3 of their respective configuration files

2.2 Configure the zookeeper startup script

vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)
echo "---------- zookeeper start ----------------"
$ZK_HOME/bin/zkServer.sh start
;;
stop)
echo "---------- zookeeper stop ----------------"
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo "---------- zookeeper restart ------------"
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo "---------- zookeeper status ----------------"
$ZK_HOME/bin/zkServer.sh status
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac

Set up auto-start at power on:
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper

2.3 Start zookeeper

Node one:
[root@kf-zk-es-logstash data]# service start zookeeper
[root@kf-zk-es-fb_es-head data]# service zookeeper status
----------zookeeper status------------
/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.8.3/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower #Slave node

Node two:
[root@kf-zk-es-logstash data]# service start zookeeper
[root@kf-zk-es-kibana data]# service zookeeper status
----------zookeeper status------------
/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.8.3/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader #Master node

Node three:
[root@kf-zk-es-logstash data]# service start zookeeper
[root@kf-zk-es-logstash data]# service zookeeper status
----------zookeeper status------------
/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.8.3/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower #Slave node

3. Deploy Kafka

3.1 Install Kafka

All the following steps must be executed on three nodes. You can download the package and scp it.
[root@kf-zk-es-fb_es-head ~]# wget http://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz
[root@kf-zk-es-fb_es-head ~]# tar xf kafka_2.13-2.8.2.tgz -C /usr/local/
[root@kf-zk-es-fb_es-head ~]# mv kafka_2.13-2.8.2 kafka
[root@kf-zk-es-fb_es-head ~]# cp /usr/local/kafka/config/server.properties /usr/local/kafka/config/server.properties_bak

Edit configuration file:
Node one:
[root@kf-zk-es-fb_es-head ~]# vim /usr/local/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://10.10.200.33:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.10.200.33:2181,10.10.200.34:2181,10.10.200.35:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

Node two:
[root@kf-zk-es-kibana ~]# egrep -v '^#|^$' /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://10.10.200.34:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.10.200.33:2181,10.10.200.34:2181,10.10.200.35:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

Node three:
[root@kf-zk-es-logstash ~]# egrep -v '^#|^$' /usr/local/kafka/config/server.properties
broker.id=3
listeners=PLAINTEXT://10.10.200.35:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.10.200.33:2181,10.10.200.34:2181,10.10.200.35:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

Configure environment variables:
[root@kf-zk-es-logstash ~]# tail -2 /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@kf-zk-es-logstash ~]# source /etc/profile

3.2 Start Kafka

Three nodes are executed simultaneously
[root@kf-zk-es-logstash ~]# sh /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@kf-zk-es-logstash ~]# netstat -lntup|grep 9092
tcp6 0 0 10.10.200.35:9092 :::* LISTEN 7474/java

3.3 Common Kafka commands

#View all topics in the current server
kafka-topics.sh --list --zookeeper 10.10.200.33:2181,10.10.200.34:2181,10.10.200.35:2181
#View details of a topic
kafka-topics.sh --describe --zookeeper 10.10.200.33:2181,10.10.200.34:2181,10.10.200.35:2181
#make an announcement
kafka-console-producer.sh --broker-list 10.10.200.33:2181,10.10.200.34:2181,10.10.200.35:2181 --topic test
#consumingnews
kafka-console-consumer.sh --bootstrap-server 10.10.200.33:2181,10.10.200.34:2181,10.10.200.35:2181 --topic test --from-beginning
--from-beginning will read all previous data in the topic
#Modify the number of partitions
kafka-topics.sh --zookeeper 10.10.200.33:2181,10.10.200.34:2181,10.10.200.35:2181 --alter --topic test --partitions 6
#Delete topic
kafka-topics.sh --delete --zookeeper 10.10.200.33:2181,10.10.200.34:2181,10.10.200.35:2181 --topic test

3.4 Kafka command to create topic

[root@kf-zk-es-fb_es-head ~]# kafka-topics.sh --create --zookeeper 10.10.200.33:2181,10.10.200.34:2181,10.10.200.35:2181 --partitions 3 --replication-factor 1 --topic nginx_access
Created topic test.

--zookeeper: Define the zookeeper cluster server address. If there are multiple IP addresses separated by commas, generally use one IP.
--replication-factor: Define the number of partition copies, 1 represents a single copy, 2 is recommended
--partitions: define the number of partitions
--topic: define topic name

[root@kf-zk-es-fb_es-head ~]# kafka-topics.sh --create --zookeeper 10.10.200.33:2181,10.10.200.34:2181,10.10.200.35:2181 --partitions 3 --replication -factor 1 --topic nginx_error

View topic information
[root@kf-zk-es-fb_es-head ~]# sh /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.10.200.33:2181
Topic: nginx_access PartitionCount: 3 ReplicationFactor: 1 Configs:
        Topic: nginx_access Partition: 0 Leader: 1 Replicas: 1 Isr: 1
        Topic: nginx_access Partition: 1 Leader: 2 Replicas: 2 Isr: 2
        Topic: nginx_access Partition: 2 Leader: 3 Replicas: 3 Isr: 3
Topic: nginx_error PartitionCount: 3 ReplicationFactor: 1 Configs:
        Topic: nginx_error Partition: 0 Leader: 2 Replicas: 2 Isr: 2
        Topic: nginx_error Partition: 1 Leader: 3 Replicas: 3 Isr: 3
        Topic: nginx_error Partition: 2 Leader: 1 Replicas: 1 Isr: 1


3.5 Test Kafka-topic

[root@kf-zk-es-fb_es-head ~]# kafka-console-producer.sh --broker-list 10.10.200.33:9092,10.10.200.34:9092,10.10.200.35:9092 --topic nginx_access
>1
>hello
>my
>name
>is
>world

[root@kf-zk-es-kibana ~]# kafka-console-consumer.sh --bootstrap-server 10.10.200.33:9092,10.10.200.34:9092,10.10.200.35:9092 --topic nginx_access --from- beginning
1
hello
my
name
is
world

3.6 Summary of Kafka issues

To restart the service, you need to kill the process first. If an error occurs when re-creating the topic, you need to delete /tmp/kafka-logs/meta.properties to start normally. In addition, the connection properties of zookeeper must be written in the configuration file: zookeeper.connect=10.10.200.33:2181,10.10.200.34 :2181,10.10.200.35:2181

4. Build Elastic search and configure it

Install and modify the configuration files on all three hosts and then start them

Taking node one as an example, the other two machines execute simultaneously:
[root@kf-zk-es-fb_es-head ~]# wget http://dl.elasticsearch.cn/elasticsearch/elasticsearch-7.9.2-x86_64.rpm
[root@kf-zk-es-fb_es-head ~]# rpm -ivh elasticsearch-7.9.2-x86_64.rpm
[root@kf-zk-es-fb_es-head ~]# cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml_bak
[root@kf-zk-es-fb_es-head elasticsearch]# egrep -v '^#|^$' /etc/elasticsearch/elasticsearch.yml
cluster.name: my-application
node.name: node1
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200
discovery.seed_hosts: ["10.10.200.33","10.10.200.34","10.10.200.35"]
cluster.initial_master_nodes: ["node1","node2","node3"]

5. Start Elastic search and set it to start automatically at boot.

Three computers execute simultaneously
[root@kf-zk-es-fb_es-head ~]# systemctl start elasticsearch
[root@kf-zk-es-fb_es-head ~]# systemctl status elasticsearch
● elasticsearch.service – Elasticsearch
   Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled; vendor preset: disabled)
   Active: active (running) since Fri 2023-11-10 09:27:14 CST; 8s ago
[root@kf-zk-es-fb_es-head ~]# systemctl enable elasticsearch
Created symlink from /etc/systemd/system/multi-user.target.wants/elasticsearch.service to /usr/lib/systemd/system/elasticsearch.service.

6. Page access

7. Deploy logstash to consume kafka data and write it to ES

Download address: http://dl.elasticsearch.cn/logstash/logstash-7.9.2.rpm
[root@kf-zk-es-logstash ~]# rpm -vih logstash-7.9.2.rpm
[root@kf-zk-es-logstash ~]# vim /etc/logstash/conf.d/logstash.conf
input {
       kafka {
       codec => "json"
       topics => ["nginx_access","nginx_error"]
       bootstrap_servers => "10.10.200.33:9092,10.10.200.34:9092,10.10.200.35:9092"
       max_poll_interval_ms => "3000000"
       session_timeout_ms => "6000"
       heartbeat_interval_ms => "2000"
       auto_offset_reset => "latest"
       group_id => "logstash"
       type => "logs"
     }
}
output {
        elasticsearch {
        hosts => ["http://10.10.200.33:9200","http://10.10.200.34:9200","http://10.10.200.35:9200"]
        index => "%{[fields][log_topics]}-%{ + YYYY-MM-dd}"
        }
}

[root@kf-zk-es-logstash ~]# cd /usr/share/logstash/
[root@kf-zk-es-logstash logstash]# mkdir config
[root@kf-zk-es-logstash logstash]# cp /etc/logstash/pipelines.yml config/
[root@kf-zk-es-logstash logstash]# ln -s /usr/share/logstash/bin/logstash /usr/local/bin/
[root@kf-zk-es-logstash logstash]# logstash -t
If no error is reported, logstash is started.
[root@kf-zk-es-logstash logstash]# systemctl start logstash & amp; & amp; systemctl enable logstash
Created symlink from /etc/systemd/system/multi-user.target.wants/logstash.service to /etc/systemd/system/logstash.service.
Check whether the log contains Kafka consumption information
[root@kf-zk-es-logstash logstash]# tail -f /var/log/logstash/logstash-plain.log

9. Deploy Filebeat

http://dl.elasticsearch.cn/filebeat/filebeat-7.9.2-x86_64.rpm

[root@kf-zk-es-fb_es-head ~]# rpm -vih filebeat-7.9.2-x86_64.rpm
[root@kf-zk-es-fb_es-head ~]# vim /etc/filebeat/filebeat.yml
Configure collection of log information
filebeat.inputs:
- type: log
  enabled: true
  paths:
  #Collect log address
    -/usr/local/logs/access.log #This log is the path log of nginx. nginx has been installed on this machine in advance.
  fields:
    log_topics: nginx_access

- type: log
  enabled: true
  paths:
  #Collect log address
    -/usr/local/logs/error.log #This log is the path log of nginx. nginx has been installed on this machine in advance.
  fields:
    log_topics: nginx_error

output.kafka: #Only add the output of kafka and comment out the elasticsearch one, otherwise an error will be reported
  #Configure Kafka address
    hosts: ["10.10.200.33:9092","10.10.200.34:9092","10.10.200.35:9092"]
    #This Topic must be consistent with Kafka
    topic: '%{[fields][log_topics]}'

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

setup.template.settings:
  index.number_of_shards: 1

setup.kibana:

Start filebeat
[root@kf-zk-es-fb_es-head ~]# systemctl start filebeat

10. Install kibana to display log information

Download address: http://dl.elasticsearch.cn/kibana/kibana-7.9.2-x86_64.rpm
[root@kf-zk-es-kibana ~]# rpm -ivh kibana-7.9.2-x86_64.rpm
[root@kf-zk-es-kibana ~]# vim /etc/kibana/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://10.10.200.33:9200","http://10.10.200.34:9200","http://10.10.200.35:9200"]
kibana.index: ".kibana"
Start service
[root@kf-zk-es-kibana ~]# systemctl start kibana & amp; & amp; systemctl enable kibana
Created symlink from /etc/systemd/system/multi-user.target.wants/kibana.service to /etc/systemd/system/kibana.service.
[root@node2 ~]# journalctl -u kibana #Check if startup is normal

After confirming that the port is started, visit the web page:

Click to create index. If you are prompted: You’ll need to index some data into Elasticsearch before you can create an index pattern. Learn how, the reason is because there is no data or index. You can execute the following command in the background of the machine to simulate creating new data. :

[root@node1 ~]# curl -H "Content-Type: application/json" -XPOST 'http://10.10.200.33:9200/ruizhi-log-2023-11-13/test-log' - d '{"code":200,"message":"test"}'
{"_index":"ruizhi-log-2023-11-13","_type":"test-log","_id":"mfRRx4sBE7vjRIscdoqf","_version":1,"result":"created"," _shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1}

After that, refresh the web page, as shown below:

Select two nginx, and then simulate access to nginx:

Refresh this page (you can also append some content to the log file using the background command line)

In the output of the consumption message command of background Kafka, you can see that the nginx log has been transferred to Kafka:

The logs are also synchronized on kibana, and the contents are exactly the same.