Middleware (3) – Kafka (2)

Kafka

    • 6. Efficient reading and writing & amp; Zookeeper function
      • 6.1 Efficient reading and writing of Kafka
      • 6.2 The role of zookeeper in Kafka
    • 7. Affairs
      • 7.1 Producer transaction
      • 7.2 Consumer affairs
    • 8. API producer process
    • 9. Call kafka through python
      • 9.1 Installing plugins
      • 9.2 Producer and Consumer
      • 9.3 Advanced Operations for Consumers
        • 1. Initialization parameters
        • 2. Manual commit
        • 3. View the remaining amount of Kafka accumulation
      • 9.4 topic operation
        • 1. Get all topics
        • 2. Create and delete topic
        • 3. Obtain topic configuration information
        • 4. Obtain consumer group information
    • 10. Kafka monitoring Eagle
    • 11. Common Interview Questions

6. Efficient reading and writing & amp; Zookeeper function

6.1 Efficient reading and writing of Kafka

Write to disk sequentially
The production data of Kafka’s producer needs to be written into the log file. The writing process is to append to the end of the file and write sequentially. According to the data on the official website, the same disk can be written sequentially at 600M/s, while random writing is only 200K /s, which is related to the mechanical structure of the disk. The reason why sequential writing is fast is because it saves a lot of head addressing time.

Zero Copy Technology

Kafka’s zero-copy technology

NIC: Network Interface Controller Network Interface Controller

  • regular read

    Here’s a regular read operation:

    • The operating system reads data from a disk file into the page cache of kernel space
    • Application reads data from kernel space into user space buffer
    • The application writes the read data back to the kernel space and puts it into the socket buffer
    • The operating system copies the data from the socket buffer to the network card interface, and at this time the data is sent to the consumer through the network
  • zero copy technology

    The zero-copy technology only copies the data of the disk file to the page cache once, and then sends the data directly from the page cache to the network (the same page cache can be used when sending to different subscribers), thus avoiding duplication Copy operation.

If there are 10 consumers, in the traditional way, the number of data copies is 4*10=40 times, while using “zero copy technology” only needs 1 + 10=11 times, one is copied from the disk to the page cache, and 10 times means Each of the 10 consumers reads the page cache once.

6.2 The role of zookeeper in Kafka

A broker in the Kafka cluster will be elected as the Controller, responsible for managing the cluster broker’s on-line and offline, all topic partition copy assignments, and leader election. Controller’s work management depends on zookeeper

7. Business

Kafka has introduced transaction support since version 0.11. Transactions can ensure that Kafka can produce and consume cross-partition sessions on the basis of Exactly Once semantics, either all succeed or all fail.

7.1 Producer affairs

In order to transact across partitions and sessions, it is necessary to introduce a globally unique Transaction ID, and bind the PID (which can be understood as Producer ID) obtained by the Producer with the Transaction ID, so that when the Producer restarts, the ongoing Transaction can be passed. ID to get the original PID.

In order to manage Transaction, Kafka introduces a new component Transaction Coordinator. Producer interacts with Transaction Coordinator to obtain the task status corresponding to Transaction ID. Transaction Coordinator is also responsible for writing transaction information into an internal Topic, so that even if the entire service is restarted , since the transaction state is saved, the state of the transaction in progress can be restored and thus continue.

7.2 Consumer affairs

For Consumer, the guarantee of transaction is relatively weaker than that of Producer, especially the Commit information cannot be guaranteed to be accurately consumed. This is because Consumer can access any information through offset, and different Segment Files have different declaration cycles, and messages of the same transaction It may happen that it is deleted after restarting.

8. API producer process

Kafka’s Producer sends messages asynchronously. In the process of message sending, two threads are involved: the main thread and the Sender thread, and a thread shared variable RecordAccumulator. The main thread sends the message to the RecordAccumulator, and the Sender thread keeps Pull messages from RecordAccumulator and send them to Kafka broker.

9. Call kafka through python

The kafka plug-in is used here, the plug-in that can operate kafka and pykafka, etc.

9.1 Install plugin

Plug-in download address: https://pypi.org/project/kafka/#files

pip install kafka-1.3.5-py2.py3-none-any.whl
pip install kafka_python-2.0.2-py2.py3-none-any.whl

The python2.7 version used by the author

9.2 Producer and Consumer

Simple demo of producer and consumer

#coding:utf-8
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json

def producer_demo():
    # Assume that the produced message is a key-value pair (not necessarily a key-value pair), and the serialization method is json
    producer = KafkaProducer(
        bootstrap_servers=['192.168.71.251:9092'],
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    # send three messages
    for i in range(0, 3):
        future = producer. send(
            'demo1',
            key='count_num', # the same key value will be sent to the same partition
            value=str(i),
            partition=1) # send message to partition 1
        print("send {}". format(str(i)))
        try:
            future.get(timeout=10) # Monitor whether the sending is successful
        except kafka_errors: # send failed to throw kafka_errors
            traceback. format_exc()


def consumer_demo():
    consumer = KafkaConsumer(
        'demo1',
        bootstrap_servers='192.168.71.251:9092',
        group_id='test'
    )
    for message in consumer:
        print("receive, key: {}, value: {}". format(
            json.loads(message.key.decode()),
            json. loads(message. value. decode())
            )
        )

if __name__ == '__main__':
    producer_demo()
    #consumer_demo()

Here we suggest two terminals, or two jupyter notebook pages to verify.

Execute the consumer first:

consumer_demo()

Then execute the producer:

producer_demo()

You will see the following output:

>>> producer_demo()
send 0
send 1
send 2


>>> consumer_demo()
receive, key: count_num, value: 0
receive, key: count_num, value: 1
receive, key: count_num, value: 2

9.3 Consumer advanced operations

1. Initialization parameters

List some important parameters when Kafka Consumer is initialized:

  • topic
    The name of the topic, indicating that the current consumer listens to and reads messages from that topic.

  • group_id
    High concurrency requires multiple consumers to cooperate, and the consumption progress is managed by group_id. For example, consumer A and consumer B use the same group_id during initialization. During consumption, after a message is consumed by consumer A, it will be marked in Kafka, and this message will not be consumed by B again (provided that A commits correctly after consumption).

  • key_deserializer, value_deserializer
    Consistent with the parameters in the producer, it is automatically parsed.

  • auto_offset_reset
    When the consumer starts, there may already be unconsumed messages accumulated in the message queue. Sometimes the requirement is to start reading from the last unconsumed position (then this parameter is set to earliest), and sometimes the requirement is to start reading from the current moment Data generated later and previously generated will no longer be consumed (then this parameter is set to latest).

    earliest: Automatically reset to the earliest offset.
    latest: Seems to be reset to the latest offset.
    none: If there is no earlier offset, an exception is thrown to the consumer, telling the consumer that no such offset has been found in the entire consumer group.

  • enable_auto_commit, auto_commit_interval_ms
    Whether to automatically commit, after the current consumer consumes the data, commit is required before the consumed information can be sent back to the control center of the message queue. After enable_auto_commit is set to True, the consumer will automatically commit, and the time interval between two commits is auto_commit_interval_ms.

2. Manual commit

def consumer_demo():
    consumer = KafkaConsumer(
        'demo1',
        bootstrap_servers='192.168.71.251:9092',
        group_id='test',
        enable_auto_commit=False,
#auto_commit_interval_ms=5000
    )
    for message in consumer:
        print("receive, key: {}, value: {}". format(
            json.loads(message.key.decode()),
            json. loads(message. value. decode())
            )
        )
        consumer.commit()

3. View the remaining amount of kafka accumulation

In an online environment, it is necessary to ensure that the consumer’s consumption rate is greater than the producer’s production rate, so it is necessary to detect whether the remaining accumulation in Kafka is increasing or decreasing. You can use the following code to observe the remaining amount of queue messages

#coding:utf-8
from kafka import TopicPartition
from kafka import KafkaProducer, KafkaConsumer


topic = 'demo1'
#consumer = KafkaConsumer(topic, **kwargs)
consumer = KafkaConsumer(
        'demo1',
        bootstrap_servers='192.168.71.251:9092',
        group_id='test',
        enable_auto_commit=False,
#auto_commit_interval_ms=5000
    )
partitions = [TopicPartition(topic, p) for p in consumer. partitions_for_topic(topic)]
print partitions
print("start to cal offset:")

#total
toff = consumer. end_offsets(partitions)
toff = [(key.partition, toff[key]) for key in toff.keys()]
toff. sort()
print("total offset: {}". format(str(toff)))

# current
coff = [(x.partition, consumer.committed(x)) for x in partitions]
coff. sort()
print("current offset: {}". format(str(coff)))

# cal sum and left
toff_sum = sum([x[1] for x in toff])
cur_sum = sum([x[1] for x in coff if x[1] is not None])
left_sum = toff_sum - cur_sum
print("kafka left: {}". format(left_sum))
-------------------------------------------------- ----------------------------------------------
[TopicPartition(topic='demo1', partition=0), TopicPartition(topic='demo1', partition=1)]
start to cal offset:
total offset: [(0, 5), (1, 49)]
current offset: [(0, 5), (1, 49)]
kafka left: 0

9.4 topic operation

1. Get all topics

#coding:utf-8
from kafka import KafkaConsumer, TopicPartition


servers=['192.168.71.251:9092']
# Get topic list and topic partition list
def retrieve_topics():
    consumer = KafkaConsumer(bootstrap_servers=servers)
    print(consumer. topics())

# Get the partition list of topic
def retrieve_partitions(topic):
    consumer = KafkaConsumer(bootstrap_servers=servers)
    print(consumer. partitions_for_topic(topic))

# Get the current offset of the partition corresponding to the Consumer Group
def retrieve_partition_offset():
    consumer = KafkaConsumer(bootstrap_servers=servers,
                             group_id='kafka-group-id')
    tp = TopicPartition('kafka-topic', 0)
    consumer. assign([tp])
    print("starting offset is ", consumer. position(tp))

retrieve_topics()
retrieve_partitions('demo1')
retrieve_partition_offset()
-------------------------------------------------- -------------------------
set([u'demo', u'kafka_demo', u'demo1'])
set([0, 1])
('starting offset is ', 0)

2. Create and delete topic

#coding:utf-8
from kafka import KafkaConsumer, TopicPartition


servers=['192.168.71.251:9092']

from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError

admin = KafkaAdminClient(bootstrap_servers=servers)

# Get topic list and topic partition list
def retrieve_topics():
    consumer = KafkaConsumer(bootstrap_servers=servers)
    print(consumer. topics())

# Get the partition list of topic
def retrieve_partitions(topic):
    consumer = KafkaConsumer(bootstrap_servers=servers)
    print(consumer. partitions_for_topic(topic))


# create topic
def create_topic():
    try:
        new_topic = NewTopic("create-topic", 6, 1)#num_partitions 6, replication_factor: 1
        admin. create_topics ([new_topic])
        print "create 'create-topic'"
    except TopicAlreadyExistsError as e:
        print(e. message)

# delete topic
def delete_topic():
    admin.delete_topics(["create-topic"])
    print "delete 'create-topic'"
if __name__ == '__main__':
    create_topic()
    retrieve_topics()
    retrieve_partitions("create-topic")
    delete_topic()
    retrieve_topics()
-------------------------------------------------- ---------------------------------------------
create 'create-topic'
set([u'demo', u'kafka_demo', u'demo1', u'create-topic', u'kafka-topic'])
set([0, 1, 2, 3, 4, 5])
delete 'create-topic'
set([u'demo', u'kafka_demo', u'demo1', u'kafka-topic'])

3. Get topic configuration information

#coding:utf-8


from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType

servers=['192.168.71.251:9092']

admin = KafkaAdminClient(bootstrap_servers=servers)
# Get topic configuration information
def get_topic_config():
    resource_config = ConfigResource(ConfigResourceType. TOPIC, "demo1")
    config_entries = admin. describe_configs([resource_config])
    print(config_entries)

get_topic_config()
-------------------------------------------------- -----------------------------------
[DescribeConfigsResponse_v2(throttle_time_ms=0, resources=[(error_code=0, error_message=u'', resource_type=2, resource_name=u'demo1', config_entries=[(config_names=u'compression.type', config_value=u'producer ', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'leader.replication.throttled.replicas', config_value=u'', read_only=False, config_source=5, is_sensitive= False, config_synonyms=[]), (config_names=u'message.downconversion.enable', config_value=u'true', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u' min.insync.replicas', config_value=u'1', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'segment.jitter.ms', config_value=u'0' , read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'cleanup.policy', config_value=u'delete', read_only=False, config_source=5, is_sensitive=False, config_synonyms= []), (config_names=u'flush.ms', config_value=u'9223372036854775807', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'follower.replication.throttled. replicas', config_value=u'', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'segment.bytes', config_value=u'1073741824', read_only=False, config_source= 5, is_sensitive=False, config_synonyms=[]), (config_names=u'retention.ms', config_value=u'604800000', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names= u'flush.messages', config_value=u'9223372036854775807', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'message.format.version', config_value=u'3.0- IV1', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'file.delete.delay.ms', config_value=u'60000', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'max.compaction.lag.ms', config_value=u'9223372036854775807', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), ( config_names=u'max.message.bytes', config_value=u'1048588', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'min.compaction.lag.ms', config_value=u'0', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'message.timestamp.type', config_value=u'CreateTime', read_only=False, config_source= 5, is_sensitive=False, config_synonyms=[]), (config_names=u'preallocate', config_value=u'false', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u' min.cleanable.dirty.ratio', config_value=u'0.5', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'index.interval.bytes', config_value=u' 4096', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'unclean.leader.election.enable', config_value=u'false', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'retention.bytes', config_value=u'-1', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u 'delete.retention.ms', config_value=u'86400000', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'segment.ms', config_value=u'604800000', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names=u'message.timestamp.difference.max.ms', config_value=u'9223372036854775807', read_only=False, config_source=5, is_sensitive =False, config_synonyms=[]), (config_names=u'segment.index.bytes', config_value=u'10485760', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[])])])]

4. Get consumer group information

#coding:utf-8
from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType

servers=['192.168.71.251:9092']

admin = KafkaAdminClient(bootstrap_servers=servers)
# Get consumer group information
def get_consumer_group():
    # show all consumer groups
    print(admin. list_consumer_groups())

    # Display the offsets of the consumer group
    print(admin. list_consumer_group_offsets("test"))

get_consumer_group()
-------------------------------------------------- -----------------
[(u'test-consumer-group', u'consumer'), (u'test', u'consumer')]
{<!-- -->TopicPartition(topic=u'demo1', partition=0): OffsetAndMetadata(offset=5, metadata=u''), TopicPartition(topic=u'demo1', partition=1): OffsetAndMetadata (offset=49, metadata=u'')}

10. Kafka monitoring Eagle

Eagle

Eagle is an open source visualization and management software that allows querying, visualization, alerting, and exploration of metrics stored anywhere. In short, Eagle provides you with tools to convert Kafka cluster data into beautiful graphs and visualizations.

It is essentially a web application running on tomcat.

For specific installation, please refer to the installation of kafka monitoring tool eagle (high-speed download link is attached)

11. Common interview questions

What do ISR(InSyncRepli), OSR(OutSyncRepli), AR(AllRepli) in Kafka stand for?

ISR: A collection of followers whose rate and leader difference is less than 10s
OSR: The difference between the rate and the leader is greater than 10s follwer
AR: follower of all partitions

What do HW, LEO, etc. in Kafka represent?

HW: High Water high water level, determined according to the lowest LEO in the same partition (Log End Offset)
LEO: Maximum Offset for each partition

How does Kafka reflect the sequence of messages?

In each partition, each message has an offset, so the messages are ordered in the same partition, and the global order cannot be achieved

Do you understand the partitioners, serializers, and interceptors in Kafka? What is the order of processing between them?

The partitioner Partitioner is used to process partitions, that is, which partition the message is sent to.
Serializer, this is a tool for serializing and deserializing data.
Interceptor, that is, a class Interceptor that performs pre-processing and finishing processing for message sending
The processing is smooth first through the interceptor => serializer => partitioner

What does the overall structure of the Kafka producer client look like? How many threads are used for processing? What are they?

Use two threads: main and sender threads. The main thread will send data to the RecoreAccumulator thread shared variable through the interceptor, serializer, and partitioner at one time, and then the sender thread will pull data from the shared variable and send it to kafka broker
When the batch.size reaches this size, the message is sent. If the linger.ms does not reach the size, the data is sent after waiting for the current time.

If the number of consumers in the consumer group exceeds the partition of the topic, some consumers will not be able to consume data.” Is this sentence correct?

This sentence is correct. Consumers that exceed the number of partitions will not receive data. The main reason is that messages from a partition can only be consumed by one consumer in a consumer group.

When the consumer submits the consumption displacement, is the offset of the latest message currently consumed or offset + 1 submitted?

The offset of the data sent by the producer starts from 0, and the offset of the data consumed by the consumer starts from 1, so the latest news is offset + 1

What situations will cause repeated consumption?

Consume first and then submit offset. If the consumption is finished and the machine crashes, it will cause repeated consumption

Those scenarios will cause news leakage consumption?

Submit the offset first, and it will crash before consumption, which will cause missed consumption

When you use kafka-topics.sh to create (delete) a topic, what logic will be executed behind Kafka?

A new topic node will be created under the /brokers/topics node in zookeeper, such as: /brokers/topics/first
Trigger the Controller’s listener
Kafka Controller is responsible for creating topic and updating metadata cache

Can the number of partitions of topic be increased? How can it be increased? If not, why?

You can increase and modify the number of partitions – alter can modify the number of partitions

Can the number of topic partitions be reduced? How can it be reduced? If not, why?

It cannot be reduced. After the partition is reduced, the data in the previous partition is not easy to handle

Does Kafka have internal topics? if so what? What’s the use?

Yes, __consumer_offsets is mainly used to save the offset consumed by consumers after version 0.9

The concept of Kafka partition allocation?

Kafka partitions For Kafka clusters, partitions can achieve load balancing, and for consumers, partitions can improve concurrency and read efficiency

Briefly describe Kafka’s log directory structure?

Each partition corresponds to a folder named topic-0/topic-1…, and each folder contains .index and .log files.

If I specify an offset, how does the Kafka Controller find the corresponding message?

offset indicates the number of the current message. First, you can use the dichotomy method to locate which .index file the current message belongs to, and then use the seek positioning method to find the position of the current offset in .index. At this time, you can get the initial offset. It can be found through the initial offset and then through seek to locate the message in .log.

Talk about the role of Kafka Controller?

A broker in the Kafka cluster will be elected as the Controller, responsible for managing the cluster broker’s on-line and offline, all topic partition copy assignments, and leader election. The work management of the Controller is dependent on zookeeper.

Where are elections required in Kafka? What are the electoral strategies in these places?

In the ISR, a Leader needs to be elected, and the selection strategy is first-come-first-served. Elections are required in the partition, and the leader and follower need to be elected.

What is an invalid copy? What countermeasures are there?

Invalid copies are followers with a speed difference greater than 10s from the leader. The ISR will kick out these failed followers, and will rejoin the ISR when the speed is close to the leader within 10s.

What design of Kafka makes it have such high performance?

Kafka’s natural distributed architecture
The log file is divided into segments, and the segment is indexed
Sequential reading and writing is used for a single node. Sequential reading and writing refers to the sequential addition of files, which reduces the overhead of disk addressing and greatly improves the speed of random writing.
Using zero-copy technology, there is no need to switch to the user mode, and the read and write operations can be completed in the kernel mode, and the number of data copies is also less.