Flink connects to kafka and reports: org.apache.kafka.common.errors.TimeoutException

Test flink1.12.7 to connect to kafka:

package org.test.flink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class FlinkKafka {
    public static void main(String[] args) throws Exception {
        // 1. Execution environment-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. Data source-source
        // 2-1. Create attributes when consuming Kafka data
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "11.0.24.216:9092");
        props.setProperty("group.id", "test1111");
        System.out.println("--2-2. Build FlinkKafkaConsumer instance object--");
        // 2-2. Construct FlinkKafkaConsumer instance object
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
                "testtopic", //
                new SimpleStringSchema(), //
                props
        );
        System.out.println("--2-3. Add Source--");
        // 2-3. Add Source
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);

        // 3. Data transformation-transformation
        // 4. Data receiver-sink
        System.out.println("--kafkaStream.print();--");
        kafkaStream.print();

        // 5. Trigger execution-execute
        env.execute("ConnectorFlinkKafkaConsumerDemo") ;
    }
}

The execution error is as follows:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1061)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition testtopic-0 could be determined

Process finished with exit code 1

After exclusion, the article was found: flink connection kafka report: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic_Future Senior Java Architect’s Blog-CSDN Blog

Because the default level of log4j in the project is error, first configure resource/log4j.properties and change the log level to info:

log4j.rootLogger = info,console

### Configure output to console ###
log4j.appender.console = org.apache.log4j.ConsoleAppender
log4j.appender.console.Target = System.out
log4j.appender.console.layout = org.apache.log4j.PatternLayout
#log4j.appender.console.layout.ConversionPattern = %d{ABSOLUTE} %5p %c{ 1 }:%L - %m%n

Run it again and find the following error:

java.net.UnknownHostException: kafka
at java.net.InetAddress.getAllByName0(InetAddress.java:1281)
at java.net.InetAddress.getAllByName(InetAddress.java:1193)
at java.net.InetAddress.getAllByName(InetAddress.java:1127)
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:949)
at org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:240)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:788)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:750)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2338)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1725)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:446)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:243)

Because I installed the docker version of kafka on the server for testing:

The .env configuration is as follows:

KAFKA_LOG_RETENTION_HOURS=16
KAFKA_BROKER_ID=0
KAFKA_ZOOKEEPER_CONNECT=zk-host:2181
# This parameter cannot use 0.0.0.0, otherwise an error will be reported. It needs to be set to the actual accessed IP.
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

Check the explanation of the KAFKA_ADVERTISED_LISTENERS parameter:

In-depth understanding of KAFKA_ADVERTISED_LISTENERS_kafka_cfg_listeners_jason.zeng@1502207’s blog-CSDN blog

KAFKA_ADVERTISED_LISTENERS: Responsible for publishing external network addresses. This address will be published to zookeeper.

So you will eventually access kafka.

There are currently two options:

1. Configure domain name pointing in local hosts;

C:\Windows\System32\drivers\etc\HOSTS
11.0.24.216 kafka

2. Modify the configuration of kafka installation and change kafka to a specific IP

I successfully solved it with 1.

Specific code reference (on gitee): flink-java-1.12.7: Java demo of flink1.12.7, including flink wordcount example, how to connect to kafka

Reference: flink connection kafka report: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic_Future Senior Java Architect’s Blog-CSDN Blog

Docker installs kafka: Docker installs Kafka_On the Road of Struggle’s Blog-CSDN Blog