Introduction to Kafka
Apache Kafka is a distributed stream processing platform. What exactly does this mean?
We know that the stream processing platform has the following three characteristics:
- Allows you to publish and subscribe to streaming records. This aspect is similar to message queues or enterprise messaging systems.
- It can store streaming records and has better fault tolerance.
- Streaming records can be processed as they are generated.
What scenarios is Kafka suitable for?
It can be used in two broad categories of applications: - Construct real-time streaming data pipelines that reliably ingest data between systems or applications. (equivalent to message queue)
- Build real-time streaming applications to transform or influence these streaming data. (It is stream processing, through internal changes between kafka stream topic and topic)
In order to understand how Kafka achieves the above-mentioned functions, starting from the following, we will explore the characteristics of Kafka in depth. .
First some concepts:
? Kafka, as a cluster, runs on one or more servers.
? Kafka classifies stored streaming data through topics.
? Each record contains a key, a value and a timestamp.
Kafka has four core APIs:
? The Producer API allows an application to publish a stream of data to one or more Kafka topics.
? The Consumer API allows an application to subscribe to one or more topics and process streaming data published to them.
? The Streams API allows an application to act as a stream processor, consuming input streams generated by one or more topics, and then producing an output stream to one or more topics, performing effective conversions on the input and output streams.
? The Connector API allows building and running reusable producers or consumers to connect Kafka topics to existing applications or data systems. For example, connect to a relational database and capture all changes to a table.
Distributed
Log partitions (distributed) on servers in the Kafka cluster. Each server shares these partitions when processing data and requests. Each partition is backed up on a configured server to ensure fault tolerance.
Each partition has one server as the “leader” and zero or more servers as follwers. The leader server handles all read and write requests to the partition, while followers only need to passively synchronize the data on the leader. When the leader goes down, one of the servers in the followers will automatically become the new leader. Each server will become the leader of some partitions and the follower of some partitions, so the load of the cluster is balanced.
Producer
Producers can publish data to selected topics. The producer is responsible for assigning records to which partition of the topic. Load balancing can be achieved simply using a round-robin approach, or it can be done based on some semantic partitioning function (for example, the key in the record). More about the use of partitions will be introduced below.
Consumer
Consumers are identified by a consumer group name, and each record published to a topic is assigned to a consumer instance in the subscribed consumer group. Consumer instances can be distributed among multiple processes or on multiple machines.
If all consumer instances are in the same consumer group, message logging will be load balanced to each consumer instance.
If all consumer instances are in different consumer groups, each message record will be broadcast to all consumer processes.
Some key terms of kafka:
? Producer: Producer, message generation and sending end.
? Broker: Kafka instance. Multiple brokers form a Kafka cluster. Usually one Kafka instance is deployed on a machine. If one instance fails, other instances will not be affected.
? Consumer: Consumer, pulls messages for consumption. A topic can be consumed by several consumers. Several consumers form a Consumer Group, that is, a consumer group. A message can only be consumed by one Consumer in the consumer group.
? Topic: Topic, the logical storage unit of server-side messages. A topic usually contains several Partition partitions.
? Partition: The partition of the topic is distributed and stored in each broker to achieve load balancing of publishing and subscription. Several partitions can be consumed by several Consumers at the same time, achieving high consumer throughput. A partition has multiple replicas (Replica). This is Kafka’s design in terms of reliability and availability, which will be highlighted later.
? Message: Message, or log message, is the data actually stored by the Kafka server. Each message consists of a key, a value and the message timestamp.
? Offset: Offset, the message position in the partition, maintained by Kafka itself. The Consumer must also save an offset when consuming to maintain the position of the consumed message.
Ansible one-click installation of kafka cluster configuration
First create ansible’s working directory, just create a directory at will.
mkdir /ansible/kafka/
Create directories and files as follows
[root@server151 ~]# tree /ansible/kafka/ /ansible/kafka/ ├── all.yaml ├── hosts └── install_kafka ├── defaults │ └── main.yml ├── tasks │ └── main.yml └── templates └── kafka.sh.j2
First configure the host list, and then modify your own host to the corresponding host.
[root@server151 ~]# cat /ansible/kafka/hosts [kafka] 192.168.121.153 ip=153 IP=192.168.121.153 192.168.121.154 ip=154 IP=192.168.121.154 192.168.121.155 ip=155 IP=192.168.121.155
Then write a script to run the role
[root@server151 ~]# cat /ansible/kafka/all.yaml --- - name: kafka hosts: kafka roles: - install_kafka
Then write the defaults variable, this is also the case, just modify it according to the host.
[root@server151 ~]# cat /ansible/kafka/install_kafka/defaults/main.yml IP1: 192.168.121.153 IP2: 192.168.121.154 IP3: 192.168.121.155 ip1: 153 ip2: 154 ip3: 155
Then write the task to be executed
[root@server151 ~]# cat /ansible/kafka/install_kafka/tasks/main.yml - name:tempalte shell template: src: templates/kafka.sh.j2 dest: /root/kafka.sh - name: run kafka.sh shell: "bash /root/kafka.sh" - name: source profile shell: "source /etc/profile"
Then write a script to run on each host
I pulled the installation of jdk and kafka from my other hosts.
You can remove the installation section or change it to an installation method that suits your host.
You can install this according to your own machine, and you don’t need to change the configuration file anyway.
[root@server151 ~]# cat /ansible/kafka/install_kafka/templates/kafka.sh.j2 #!/bin/bash wget http://192.168.121.133/jdk-8u321-linux-x64.tar.gz tar -xf jdk-8u321-linux-x64.tar.gz -C /usr/local/ cat > /etc/profile.d/jdk.sh << EOF JAVA_HOME=/usr/local/jdk1.8.0_321 JRE_HOME=\$JAVA_HOME/jre CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar:\$JRE_HOME/lib PATH=\$PATH:\$JAVA_HOME/bin:\$JRE_HOME/bin export JAVA_HOME JRE_HOME CLASSPATH EOF source /etc/profile wget http://192.168.121.133/kafka_2.13-2.5.0.tgz tar -xf kafka_2.13-2.5.0.tgz -C /usr/local/ cat > /etc/profile.d/kafka.sh << EOF PATH=/usr/local/kafka/bin/:${<!-- -->PATH} EOF source /etc/profile #The previous ones are all about installing jdk and kafka. The following is the modification of the configuration file. cat >> /usr/local/kafka_2.13-2.5.0/config/zookeeper.properties << EOF tickTime=2000 initLimit=5 syncLimit=5 server.{<!-- -->{ip1}}={<!-- -->{IP1}}:2888:3888 server.{<!-- -->{ip2}}={<!-- -->{IP2}}:2888:3888 server.{<!-- -->{ip3}}={<!-- -->{IP3}}:2888:3888 EOF mkdir /tmp/zookeeper echo {<!-- -->{<!-- -->ip}} > /tmp/zookeeper/myid sed -i 's/broker.id=0/broker.id={<!-- -->{ip}}/' /usr/local/kafka_2.13-2.5.0/config/server.properties sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect={<!-- -->{IP}}:2181/' /usr/local/kafka_2.13-2.5.0/config /server.properties echo "listeners=PLAINTEXT://{<!-- -->{IP}}:9092" >> /usr/local/kafka_2.13-2.5.0/config/server.properties #Start zookeeper as daemon /usr/local/kafka_2.13-2.5.0/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.13-2.5.0/config/zookeeper.properties sleep 30 #Start kafka in daemon mode /usr/local/kafka_2.13-2.5.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.13-2.5.0/config/server.properties
This is enough to configure the file, and then configure ssh mutual trust to let ansible connect to the controlled host in key mode
[root@server151 kafka]# ssh-keygen -t rsa [root@server151 kafka]# ssh-copy-id 192.168.121.153 [root@server151 kafka]# ssh-copy-id 192.168.121.154 [root@server151 kafka]# ssh-copy-id 192.168.121.155
After ssh mutual trust is configured, you can start ansible to install the kafka cluster.
Remember to enter the /ansible/kafka directory before executing, otherwise you need to specify the complete path of the file
[root@server151 kafka]# ansible-playbook -i ./hosts all.yaml
Then go to any of the three hosts to create a theme test
[root@server153 ~]# /usr/local/kafka_2.13-2.5.0/bin/kafka-topics.sh --bootstrap-server 192.168.121.153:9092,192.168.121.154:9092,192.168.121.155 :9092 --create --topic nginxs --partitions 2 --replication-factor 3 --config cleanup.policy=delete --config retention.ms=36000000 Created topic nginxs. [root@server153 ~]# /usr/local/kafka_2.13-2.5.0/bin/kafka-topics.sh --bootstrap-server 192.168.121.153:9092,192.168.121.154:9092,192.168.121.155:9092 - -describe Topic: nginxs PartitionCount: 2 ReplicationFactor: 3 Configs: cleanup.policy=delete,segment.bytes=1073741824,retention.ms=36000000 Topic: nginxs Partition: 0 Leader: 155 Replicas: 155,154,153 Isr: 155,154,153 Topic: nginxs Partition: 1 Leader: 154 Replicas: 154,153,155 Isr: 154,153,155
You can see the created topic and all copies are complete, thus completing the cluster installation.