Integration of Kafka with Spring Boot and other application frameworks and message-driven model

Integration of Kafka and Spring Boot and other application frameworks and message-driven model

Kafka is an indispensable component in today’s efficient distributed systems for processing large-scale real-time data streams. The integration of Kafka with application frameworks such as Spring Boot can greatly simplify application development and operation. Below we will delve into how to integrate Kafka with Spring Boot and the message-driven model supported by Kafka.

1. Integration of Kafka and Spring Boot

1. Add dependencies

First, you need to add the Kafka dependency in the pom.xml file of the Spring Boot project. Here is a basic dependency configuration example:

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.7.4</version> <!-- Please select the version according to the actual situation -->
    </dependency>
    ...
</dependencies>

2. Configure Kafka properties

Add Kafka related configuration in the application.properties or application.yml file, as shown in the following example:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group-id
spring.kafka.consumer.auto-offset-reset=earliest

3. Create Kafka producer or consumer

Creating a Kafka producer or consumer is easy by using Spring Boot’s concise API. Here is a simple Kafka consumer example:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {<!-- -->
    @KafkaListener(topics = "my-topic")
    public void consume(String message) {<!-- -->
        System.out.println("Consumed message: " + message);
    }
}

In the above example, we create a Kafka consumer by using the @KafkaListener annotation, which will listen to the specified topic (my-topic) and process the received messages.

2. Message-driven model

Kafka supports the following message-driven models:

1. Publish-subscribe model (Pub-Sub)

In the publish-subscribe model, producers publish messages to one or more specific topics, and consumers subscribe to and process these messages from these topics. This is a very common messaging model that can implement broadcast or one-to-many communication. Here is a code example for a simple producer-subscriber model:

Producer:

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {<!-- -->
    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {<!-- -->
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {<!-- -->
        kafkaTemplate.send(topic, message);
    }
}

subscriber:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {<!-- -->
    @KafkaListener(topics = "my-topic")
    public void consume(String message) {<!-- -->
        System.out.println("Consumed message: " + message);
    }
}

2. Request-Reply model (Request-Reply)

In the request-response model, the producer sends a request to the consumer, and the consumer returns a response after processing the request. This model is more suitable for scenarios that require synchronous processing. The integration of Spring Boot and Kafka can be achieved by using KafkaTemplate to send requests and receive responses. For code examples of this model, please refer to the code of the producer-subscriber model at the beginning of the document. In the consumer, you can send response messages to the specified response topic by using KafkaTemplate. Producers can obtain responses from consumers by listening to this response topic. This model requires additional topics to handle requests and responses and therefore may increase the complexity of the system. However, it provides a nice synchronous communication mechanism.

3. Stream Processing Model (Stream Processing)

Kafka also provides a stream processing model that allows you to process real-time data streams with the help of Kafka Streams API. In this model, the application acts as a stream processor, reading data from one or more input streams and then writing the data to the output stream through some transformation operations. This model is suitable for complex real-time data processing scenarios, such as data cleaning, deduplication, aggregation, etc.
Hello, I will continue my answer above:

The Kafka Streams API provides the following two main operations:

1.Input/Output: Through the Kafka Streams API, you can read data from Kafka topics and write data to new or existing topics.
2. Conversion: Kafka Streams API provides many conversion operations, such as filter, map, reduce, join, etc. These operations process the data received from the input stream and write it to the output stream in the desired form.
3. Windowing operations: Windowing operations are very useful when processing time series data or when time-based aggregation operations are required. Kafka Streams API supports both rolling window and sliding window operations. You can window based on timestamps or other criteria.
4. Connect streams: Kafka Streams API provides the function of connecting streams, allowing you to connect different data sources and data targets through various connectors (for example, Kafka Connect). This makes Kafka no longer just a message queue, but can be used as a data pipeline to connect different systems and data stores.
5. Aggregation: Kafka Streams API provides various aggregation operations, such as reduce, count, sum, etc. These operations allow you to transform and aggregate the data in the message stream while processing it.
6. Window aggregation: Similar to windowing operations, Kafka Streams API also supports window aggregation operations. This allows you to aggregate data within a time window, such as calculating averages, sums, etc.
7. Joins: Kafka Streams API supports the join operation of two streams. You can use inner, outer, left or right type join to merge two streams. Of course, let’s dig deeper into using the Kafka Streams API.
8. Error handling and fault tolerance: When processing streaming data, errors are inevitable. The Kafka Streams API provides methods for error handling and fault tolerance. You can use some built-in operations, such as map(), filter(), mapValues(), etc. to process data in the stream. When an error is encountered, you can simply send the erroneous data or exception message to the designated error handling topic, and then handle these error messages in another stream processing process.
9. Message order guarantee: Kafka provides partitioning and copy mechanisms to ensure data reliability. In a Kafka cluster, the Kafka Broker stores messages in different partitions, with a replica for each partition, providing data redundancy in the event of a Broker failure. The Kafka Streams API supports this data reliability mechanism. When a task fails, it attempts to read data from its backup to ensure the order of messages.
10. Batch and Stream Processing: While Kafka is typically used to process real-time data streams, the Kafka Streams API also supports batch processing. Batch processing can be used to process large amounts of data. It can process multiple input records in one operation to improve data processing efficiency. In Kafka Streams, you can implement batch processing by using the through() method with a batch timestamp.
11. Scalability: The Kafka Streams API is extensible. It allows you to extend its functionality by writing custom processors. You can use the Processor API to implement a custom processor and then register it with Kafka Streams to extend its functionality.

The following is a simple Kafka Streams sample code, which reads data from an input topic (inputTopic), then filters the data (filter), and finally writes the results to a new output topic (outputTopic) Medium:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Filter;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.common.serialization.Serdes;

public class KafkaStreamsExample {<!-- -->

    public static void main(String[] args) {<!-- -->
        final StreamsConfig config = new StreamsConfig(new Properties());
        final StreamsBuilder builder = new StreamsBuilder();

        // Define your data processing logic here
        KStream<String, String> stream = builder.stream("inputTopic", Consumed.with(Serdes.String(), Serdes.String()));

        stream = stream.filter((key, value) -> value != null & amp; & amp; !value.isEmpty()); // Filter out empty messages

        stream.to("outputTopic", Produced.with(Serdes.String(), Serdes.String())); // Write the result to a new topic

        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start(); // Start the Kafka Streams application
    }
}

This example first defines a Kafka Streams application configuration (config), and then uses StreamsBuilder to read data from the inputTopic. It then uses the filter operation to filter out empty messages and writes the results to the outputTopic. Finally, it starts the Kafka Streams application.

The following is a simple example of the Kafka Streams API that uses window aggregation to calculate an average every 5 seconds in a stream:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.AggregationBuilder;
import org.apache.kafka.streams.kstream.KGroupedStream;

public class KafkaStreamsExampleWindowAgg {<!-- -->

    public static void main(String[] args) {<!-- -->
        final StreamsConfig config = new StreamsConfig(new Properties());
        final StreamsBuilder builder = new StreamsBuilder();

        // Define your data processing logic here
        KStream<String, Long> stream = builder.stream("inputTopic", Consumed.with(Serdes.String(), Serdes.Long()));

        AggregationBuilder aggregationBuilder = AggregationBuilder.global().perInterval(5000).from("stream").as("sum"); // Window aggregation every 5 seconds
        KStream<String, Long> resultStream = stream.groupBy(groupingKey(), counting(), aggregationBuilder);
        resultStream.to("outputTopic", Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start(); // Start the Kafka Streams application
    }
    
    private static ValueMapperWithKey<String, Long> counting() {<!-- -->
        return (key, value) -> 1L;
    }
    
    private static ValueMapperWithKey<String, Long> groupingKey() {<!-- -->
        return (key, value) -> value % 10L; // Assuming key is not needed and you want 10 different groups
    }
}

This example reads data from a topic named “inputTopic”, then performs window aggregation on the data every 5 seconds and writes the results to a new topic named “outputTopic”. The groupingKey() method defines how to group data, here we just for demonstration purposes will create a group key by modulo 10 each value. In a practical application, you might group based on keys with more business logic.

It should be noted that this example is only to demonstrate the basic use of Kafka Streams API. In an actual production environment, you may need to consider more details, such as error handling, application resiliency, performance optimization, etc.

3. Summary

In this article, we take an in-depth look at how Kafka is integrated with application frameworks such as Spring Boot and the message-driven model supported by Kafka.
In terms of integration, we introduced how to add Kafka dependency in the Spring Boot project and configure the corresponding properties to enable the application to communicate with the Kafka cluster. Then, we explained several common message-driven models in detail, including publish-subscribe model, request-response model and stream processing model. By using Kafka Streams API, we can easily implement these models and process large-scale real-time data streams.
In addition, we also shared a simple Kafka Streams API example that shows how to process and analyze data using features such as windowed operations, join streams, aggregations, and windowed aggregations.