Kafka distributed cluster installation: ansible one-click installation, strong migration

Introduction to Kafka

Apache Kafka is a distributed stream processing platform. What exactly does this mean?
We know that the stream processing platform has the following three characteristics:

  1. Allows you to publish and subscribe to streaming records. This aspect is similar to message queues or enterprise messaging systems.
  2. It can store streaming records and has better fault tolerance.
  3. Streaming records can be processed as they are generated.
    What scenarios is Kafka suitable for?
    It can be used in two broad categories of applications:
  4. Construct real-time streaming data pipelines that reliably ingest data between systems or applications. (equivalent to message queue)
  5. Build real-time streaming applications to transform or influence these streaming data. (It is stream processing, through internal changes between kafka stream topic and topic)
    In order to understand how Kafka achieves the above-mentioned functions, starting from the following, we will explore the characteristics of Kafka in depth. .
    First some concepts:

    ? Kafka, as a cluster, runs on one or more servers.
    ? Kafka classifies stored streaming data through topics.
    ? Each record contains a key, a value and a timestamp.
    Kafka has four core APIs:
    ? The Producer API allows an application to publish a stream of data to one or more Kafka topics.
    ? The Consumer API allows an application to subscribe to one or more topics and process streaming data published to them.
    ? The Streams API allows an application to act as a stream processor, consuming input streams generated by one or more topics, and then producing an output stream to one or more topics, performing effective conversions on the input and output streams.
    ? The Connector API allows building and running reusable producers or consumers to connect Kafka topics to existing applications or data systems. For example, connect to a relational database and capture all changes to a table.

Distributed
Log partitions (distributed) on servers in the Kafka cluster. Each server shares these partitions when processing data and requests. Each partition is backed up on a configured server to ensure fault tolerance.
Each partition has one server as the “leader” and zero or more servers as follwers. The leader server handles all read and write requests to the partition, while followers only need to passively synchronize the data on the leader. When the leader goes down, one of the servers in the followers will automatically become the new leader. Each server will become the leader of some partitions and the follower of some partitions, so the load of the cluster is balanced.

Producer
Producers can publish data to selected topics. The producer is responsible for assigning records to which partition of the topic. Load balancing can be achieved simply using a round-robin approach, or it can be done based on some semantic partitioning function (for example, the key in the record). More about the use of partitions will be introduced below.

Consumer
Consumers are identified by a consumer group name, and each record published to a topic is assigned to a consumer instance in the subscribed consumer group. Consumer instances can be distributed among multiple processes or on multiple machines.

If all consumer instances are in the same consumer group, message logging will be load balanced to each consumer instance.

If all consumer instances are in different consumer groups, each message record will be broadcast to all consumer processes.

Some key terms of kafka:

? Producer: Producer, message generation and sending end.

? Broker: Kafka instance. Multiple brokers form a Kafka cluster. Usually one Kafka instance is deployed on a machine. If one instance fails, other instances will not be affected.

? Consumer: Consumer, pulls messages for consumption. A topic can be consumed by several consumers. Several consumers form a Consumer Group, that is, a consumer group. A message can only be consumed by one Consumer in the consumer group.

? Topic: Topic, the logical storage unit of server-side messages. A topic usually contains several Partition partitions.

? Partition: The partition of the topic is distributed and stored in each broker to achieve load balancing of publishing and subscription. Several partitions can be consumed by several Consumers at the same time, achieving high consumer throughput. A partition has multiple replicas (Replica). This is Kafka’s design in terms of reliability and availability, which will be highlighted later.

? Message: Message, or log message, is the data actually stored by the Kafka server. Each message consists of a key, a value and the message timestamp.

? Offset: Offset, the message position in the partition, maintained by Kafka itself. The Consumer must also save an offset when consuming to maintain the position of the consumed message.

Ansible one-click installation of kafka cluster configuration

First create ansible’s working directory, just create a directory at will.

mkdir /ansible/kafka/

Create directories and files as follows

[root@server151 ~]# tree /ansible/kafka/
/ansible/kafka/
├── all.yaml
├── hosts
└── install_kafka
    ├── defaults
    │ └── main.yml
    ├── tasks
    │ └── main.yml
    └── templates
        └── kafka.sh.j2

First configure the host list, and then modify your own host to the corresponding host.

[root@server151 ~]# cat /ansible/kafka/hosts
[kafka]
192.168.121.153 ip=153 IP=192.168.121.153
192.168.121.154 ip=154 IP=192.168.121.154
192.168.121.155 ip=155 IP=192.168.121.155

Then write a script to run the role

[root@server151 ~]# cat /ansible/kafka/all.yaml
---
- name: kafka
  hosts: kafka
  roles:
    - install_kafka

Then write the defaults variable, this is also the case, just modify it according to the host.

[root@server151 ~]# cat /ansible/kafka/install_kafka/defaults/main.yml
IP1: 192.168.121.153
IP2: 192.168.121.154
IP3: 192.168.121.155
ip1: 153
ip2: 154
ip3: 155

Then write the task to be executed

[root@server151 ~]# cat /ansible/kafka/install_kafka/tasks/main.yml
- name:tempalte shell
  template:
    src: templates/kafka.sh.j2
    dest: /root/kafka.sh
- name: run kafka.sh
  shell: "bash /root/kafka.sh"
- name: source profile
  shell: "source /etc/profile"

Then write a script to run on each host
I pulled the installation of jdk and kafka from my other hosts.
You can remove the installation section or change it to an installation method that suits your host.
You can install this according to your own machine, and you don’t need to change the configuration file anyway.

[root@server151 ~]# cat /ansible/kafka/install_kafka/templates/kafka.sh.j2
#!/bin/bash
wget http://192.168.121.133/jdk-8u321-linux-x64.tar.gz
tar -xf jdk-8u321-linux-x64.tar.gz -C /usr/local/
cat > /etc/profile.d/jdk.sh << EOF
JAVA_HOME=/usr/local/jdk1.8.0_321
JRE_HOME=\$JAVA_HOME/jre
CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar:\$JRE_HOME/lib
PATH=\$PATH:\$JAVA_HOME/bin:\$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASSPATH
EOF
source /etc/profile

wget http://192.168.121.133/kafka_2.13-2.5.0.tgz
tar -xf kafka_2.13-2.5.0.tgz -C /usr/local/
cat > /etc/profile.d/kafka.sh << EOF
PATH=/usr/local/kafka/bin/:${<!-- -->PATH}
EOF
source /etc/profile
#The previous ones are all about installing jdk and kafka. The following is the modification of the configuration file.
cat >> /usr/local/kafka_2.13-2.5.0/config/zookeeper.properties << EOF
tickTime=2000
initLimit=5
syncLimit=5
server.{<!-- -->{ip1}}={<!-- -->{IP1}}:2888:3888
server.{<!-- -->{ip2}}={<!-- -->{IP2}}:2888:3888
server.{<!-- -->{ip3}}={<!-- -->{IP3}}:2888:3888
EOF
mkdir /tmp/zookeeper
echo {<!-- -->{<!-- -->ip}} > /tmp/zookeeper/myid
sed -i 's/broker.id=0/broker.id={<!-- -->{ip}}/' /usr/local/kafka_2.13-2.5.0/config/server.properties
sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect={<!-- -->{IP}}:2181/' /usr/local/kafka_2.13-2.5.0/config /server.properties
echo "listeners=PLAINTEXT://{<!-- -->{IP}}:9092" >> /usr/local/kafka_2.13-2.5.0/config/server.properties

#Start zookeeper as daemon
/usr/local/kafka_2.13-2.5.0/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.13-2.5.0/config/zookeeper.properties
sleep 30
#Start kafka in daemon mode
/usr/local/kafka_2.13-2.5.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.13-2.5.0/config/server.properties

This is enough to configure the file, and then configure ssh mutual trust to let ansible connect to the controlled host in key mode

[root@server151 kafka]# ssh-keygen -t rsa
[root@server151 kafka]# ssh-copy-id 192.168.121.153
[root@server151 kafka]# ssh-copy-id 192.168.121.154
[root@server151 kafka]# ssh-copy-id 192.168.121.155

After ssh mutual trust is configured, you can start ansible to install the kafka cluster.
Remember to enter the /ansible/kafka directory before executing, otherwise you need to specify the complete path of the file

[root@server151 kafka]# ansible-playbook -i ./hosts all.yaml

Then go to any of the three hosts to create a theme test

[root@server153 ~]# /usr/local/kafka_2.13-2.5.0/bin/kafka-topics.sh --bootstrap-server 192.168.121.153:9092,192.168.121.154:9092,192.168.121.155 :9092 --create --topic nginxs --partitions 2 --replication-factor 3 --config cleanup.policy=delete --config retention.ms=36000000
Created topic nginxs.
[root@server153 ~]# /usr/local/kafka_2.13-2.5.0/bin/kafka-topics.sh --bootstrap-server 192.168.121.153:9092,192.168.121.154:9092,192.168.121.155:9092 - -describe
Topic: nginxs PartitionCount: 2 ReplicationFactor: 3 Configs: cleanup.policy=delete,segment.bytes=1073741824,retention.ms=36000000
Topic: nginxs Partition: 0 Leader: 155 Replicas: 155,154,153 Isr: 155,154,153
Topic: nginxs Partition: 1 Leader: 154 Replicas: 154,153,155 Isr: 154,153,155

You can see the created topic and all copies are complete, thus completing the cluster installation.