Vulnerability description
Spring Kafka is a module in the Spring Framework ecosystem that is used to simplify the process of integrating Apache Kafka in Spring applications. A record refers to a record in a Kafka message.
In the affected version, ErrorHandlingDeserializer
is not configured for records by default. When the user sets the container property checkDeserExWhenKeyNull
or checkDeserExWhenValueNull
to true (the default is false), and Allowing untrusted sources to publish to a Kafka topic allows an attacker to inject a malicious payload into the Kafka topic and remotely execute arbitrary code when deserializing record headers.
Affected version
2.8.1 <= Spring-Kafka <= 2.9.10 3.0.0 <= Spring-Kafka <= 3.0.9
Vulnerability recurrence
The component affected by this vulnerability is actually Spring-Kafka. Strictly speaking, it is not a kafka vulnerability, but a Spring vulnerability.
Pre-vulnerability knowledge
Let’s first take a look at how SpringBoot and Kafka complete communication/consumption
The workflow is as follows
1. The producer sends the message to a Broker (or multiple) in the Kafka cluster. 2. The Kafka cluster stores the message in one or more partitions and maintains an offset for each partition. 3. Consumption A subscriber subscribes to one or more topics and reads messages from the Kafka cluster. 4. The consumer reads the messages in each partition sequentially and tracks the offset of each partition.
-
? ErrorHandlingDeserializer: It is a deserializer (Deserializer) in Kafka, which can handle exceptions and errors during the deserialization process.
-
? checkDeserExWhenKeyNull & amp; & amp; checkDeserExWhenValueNull: It is a serializer (Serializer) in Kafka. It can check whether the key (key/value) is null during the serialization process, and throws when the value is found to be null. abnormal.
Let’s briefly summarize the vulnerability conditions
In affected versions, logging is not configured by default with the
ErrorHandlingDeserializer
container propertycheckDeserExWhenKeyNull
orcheckDeserExWhenValueNull
set to true
Environment setup
We need to set up a Kafka service to receive messages. It is more troublesome to set up on this machine. We can use docker to quickly build it on vps. It should be noted that Kafka must be able to accept external connections, docker-compose.yml
as follows
version: '2' services: zookeeper: image: zookeeper restart: always ports: - "2181:2181" container_name: zookeeper kafka: image: wurstmeister/kafka restart: always ports: - "9092:9092" - "9094:9094" depends_on: - zookeeper environment: KAFKA_ADVERTISED_HOST_NAME: 124.222.21.138 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9094 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://124.222.21.138:9092,SSL://124.222.21.138:9094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT container_name: kafka
Spring Kafka producers and consumers can be written using the KafkaTemplate
and “@KafkaListener` annotations provided by Spring Kafka.
Producers can use KafkaTemplate
to send messages to the Kafka cluster:
package com.drunkbaby.springkafkatest.controller; import com.drunkbaby.springkafkatest.common.KafkaInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.util.concurrent.ExecutionException; @RestController @RequestMapping("/producer") public class ProducerController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @PostMapping("/fireAndForget") public String fireAndForget() { kafkaTemplate.send(KafkaInfo.TOPIC_WELCOME, "fireAndForget:" + LocalDateTime.now()); return "success"; } }
Consumers can use the @KafkaListener
annotation to listen for messages in the Kafka cluster:
package com.drunkbaby.springkafkatest.consumer; import com.drunkbaby.springkafkatest.common.KafkaInfo; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @Component public class Consumer { @KafkaListener(topics = KafkaInfo.TOPIC_WELCOME) public String consumer2(@Payload String message, @Headers MessageHeaders headers) { System.out.println("Consumer (annotation mode): Received message ==> "); System.out.println(" message: " + message); System.out.println(" headers:"); headers.keySet().forEach(key -> System.out.println(" " + key + ":" + headers.get(key))); return "success"; }
connection succeeded
Visit http://localhost:8083/producer/sync
to send a record
Construct payload
What is actually affected is the Consumer, and the Consumer needs to set checkDeserExWhenKeyNull
or checkDeserExWhenValueNull
to true
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.getContainerProperties().setCheckDeserExWhenValueNull(true); factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);
Payload reference https://github.com/Contrast-Security-OSS/Spring-Kafka-POC-CVE-2023-34040
Vulnerability analysis
Mainly let’s look at the deserialization part
The breakpoint will first go to the org.springframework.kafka.listener.ListenerUtils#getExceptionFromHeader
method, which will get the KEY_DESERIALIZER_EXCEPTION_HEADER
in the PoC and use it as headers
Following up on the byteArrayToDeserializationException()
method, we go directly to the deserialization part, and a resolveClass()
verification is done before deserialization.
The resolveClass()
verification here is one-time, which means that we can construct other payloads, such as CC chains, etc., and confirm that it can be opened.
After that, you will enter the readObject()
method of the corresponding class.
Bug fix
https://github.com/spring-projects/spring-kafka/commit/25ac793a78725e2ca4a3a2888a1506a4bfcf0c9d
It’s equivalent to blackening the header here.