CVE-2023-34040 Kafka deserialization RCE

Vulnerability description

Spring Kafka is a module in the Spring Framework ecosystem that is used to simplify the process of integrating Apache Kafka in Spring applications. A record refers to a record in a Kafka message.

In the affected version, ErrorHandlingDeserializer is not configured for records by default. When the user sets the container property checkDeserExWhenKeyNull or checkDeserExWhenValueNull to true (the default is false), and Allowing untrusted sources to publish to a Kafka topic allows an attacker to inject a malicious payload into the Kafka topic and remotely execute arbitrary code when deserializing record headers.

Affected version

2.8.1 <= Spring-Kafka <= 2.9.10 3.0.0 <= Spring-Kafka <= 3.0.9

Vulnerability recurrence

The component affected by this vulnerability is actually Spring-Kafka. Strictly speaking, it is not a kafka vulnerability, but a Spring vulnerability.

Pre-vulnerability knowledge

Let’s first take a look at how SpringBoot and Kafka complete communication/consumption

Picture

The workflow is as follows

1. The producer sends the message to a Broker (or multiple) in the Kafka cluster. 2. The Kafka cluster stores the message in one or more partitions and maintains an offset for each partition. 3. Consumption A subscriber subscribes to one or more topics and reads messages from the Kafka cluster. 4. The consumer reads the messages in each partition sequentially and tracks the offset of each partition.

  • ? ErrorHandlingDeserializer: It is a deserializer (Deserializer) in Kafka, which can handle exceptions and errors during the deserialization process.

  • ? checkDeserExWhenKeyNull & amp; & amp; checkDeserExWhenValueNull: It is a serializer (Serializer) in Kafka. It can check whether the key (key/value) is null during the serialization process, and throws when the value is found to be null. abnormal.

Let’s briefly summarize the vulnerability conditions

In affected versions, logging is not configured by default with the ErrorHandlingDeserializer container property checkDeserExWhenKeyNull or checkDeserExWhenValueNull set to true

Environment setup

We need to set up a Kafka service to receive messages. It is more troublesome to set up on this machine. We can use docker to quickly build it on vps. It should be noted that Kafka must be able to accept external connections, docker-compose.yml as follows

version: '2'

services:
  zookeeper:
    image: zookeeper
    restart: always
    ports:
      - "2181:2181"
    container_name: zookeeper

  kafka:
    image: wurstmeister/kafka
    restart: always
    ports:
      - "9092:9092"
      - "9094:9094"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 124.222.21.138
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://124.222.21.138:9092,SSL://124.222.21.138:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    container_name: kafka

Spring Kafka producers and consumers can be written using the KafkaTemplate and “@KafkaListener` annotations provided by Spring Kafka.

Producers can use KafkaTemplate to send messages to the Kafka cluster:

package com.drunkbaby.springkafkatest.controller;
  
import com.drunkbaby.springkafkatest.common.KafkaInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
  
import java.time.LocalDateTime;
import java.util.concurrent.ExecutionException;
  
@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
  
    @PostMapping("/fireAndForget")
    public String fireAndForget() {
        kafkaTemplate.send(KafkaInfo.TOPIC_WELCOME, "fireAndForget:" + LocalDateTime.now());
        return "success";
    }
}

Consumers can use the @KafkaListener annotation to listen for messages in the Kafka cluster:

package com.drunkbaby.springkafkatest.consumer;
  
import com.drunkbaby.springkafkatest.common.KafkaInfo;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
  
@Component
public class Consumer {
    @KafkaListener(topics = KafkaInfo.TOPIC_WELCOME)
    public String consumer2(@Payload String message, @Headers MessageHeaders headers) {
        System.out.println("Consumer (annotation mode): Received message ==> ");
        System.out.println(" message: " + message);
        System.out.println(" headers:");
        headers.keySet().forEach(key -> System.out.println(" " + key + ":" + headers.get(key)));
        return "success";
    }

connection succeeded

Picture

Visit http://localhost:8083/producer/sync to send a record

Picture

Construct payload

What is actually affected is the Consumer, and the Consumer needs to set checkDeserExWhenKeyNull or checkDeserExWhenValueNull to true

ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setCheckDeserExWhenValueNull(true);
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);

Payload reference https://github.com/Contrast-Security-OSS/Spring-Kafka-POC-CVE-2023-34040

Vulnerability analysis

Mainly let’s look at the deserialization part

The breakpoint will first go to the org.springframework.kafka.listener.ListenerUtils#getExceptionFromHeader method, which will get the KEY_DESERIALIZER_EXCEPTION_HEADER in the PoC and use it as headers

Picture

Following up on the byteArrayToDeserializationException() method, we go directly to the deserialization part, and a resolveClass() verification is done before deserialization.

Picture

The resolveClass() verification here is one-time, which means that we can construct other payloads, such as CC chains, etc., and confirm that it can be opened.

Picture

After that, you will enter the readObject() method of the corresponding class.

Bug fix

https://github.com/spring-projects/spring-kafka/commit/25ac793a78725e2ca4a3a2888a1506a4bfcf0c9d

It’s equivalent to blackening the header here.