SpringCloudStreamkafka failed to receive jsonarray string

Article directory

    • scene phenomenon
    • Introduction to Spring Cloud Stream
    • Problem solving

Scene phenomena

  • As a message queue, Kafka serves as a channel for front-end device data to be consumed by the back-end, and is also consumed by multiple different microservices.
  • A service establishes a socket message with the front-end edge computing device, receives real-time traffic event push, and then sends the event to Kafka. Spring Kafka is used here. Normally, the event list data is converted into a string and then sent.
  • Event information needs to be stored in the database and pushed in real time. It also needs to be called by third-party services. The third party also consumes Kafka.
  • This service, as a consumer of kafka, reads event data and sends text messages to specified users, but it uses Spring.cloud.stream, functional programming, and binds kafka topics.
  • When consuming this service, some printing is added. However, after the event is pushed to kafka, no logs are printed and no errors are reported. However, using the kafka command to view it, it has been consumed by the consumer.
  • On the consumer side, the configuration file is as follows:
spring:
  cloud:
    stream:
      function:
        definition: vipRouteGpsConsumer;offlineEventConsumer;boxEventTopicConsumer
      bindings:
        vipRouteGpsConsumer-in-0:
          destination: vipRouteGps
        offlineEventConsumer-in-0:
          destination: offlineEvent
        boxEventTopicConsumer-in-0:
          destination:boxTopic
  • The source code for consumption is as follows:
@Configuration
public class KafkaConfiguration {<!-- -->
    @Bean
    public Consumer<String> boxEventTopicConsumer() {<!-- -->
        return value -> {<!-- -->
            log.info("Consumer Received (boxTopic): " + value);
            // todo : do somethings
        };
    }
}

Introduction to Spring Cloud Stream

  • Spring Cloud Stream is a framework for building message-driven microservice applications
  • It is based on Spring Boot and Spring Integration and provides a simple and flexible way to connect message brokers and applications
  • The core concepts of Spring Cloud Stream include:
    • Binder: Binder is the core component of Spring Cloud Stream, which provides adapters for interacting with message brokers. Spring Cloud Stream provides support for a variety of message brokers, such as Kafka, RabbitMQ, etc.
    • Destination: Destination represents the destination of the message, which can be a topic (Topic) or a queue (Queue).
    • Message: Message is the basic unit in Spring Cloud Stream, which contains the payload of the message and some metadata.
    • Producer: Producer is used to send messages to the message broker.
    • Consumer: Consumer is used to receive messages from the message broker
  • Spring Cloud Stream provides two ways to define message-driven applications:
    • Use annotations: Define the producers and consumers of messages by adding annotations to methods.
    • Use the functional programming model: create message producers and consumers by defining input and output Bindings
  • Spring Cloud Stream also provides some additional functions, such as message conversion, message grouping, message persistence, etc., to meet the needs of different scenarios. By using Spring Cloud Stream, developers can more easily build and manage message-driven microservice applications

Problem handling

  • When consuming a multi-partition topic, I found that only one partition data was printed, and the others were not. However, when I checked, I found that all of them had been consumed.
2023-09-21 13:45:13.706 INFO 2184 --- [container-0-C-1] c.newatc.data.kafka.KafkaConfiguration: Consumer Received (boxTopic): {"equipmentNo": 0,"laneNum":4,"statisticsCycle":60,"time":1695275100110,"vehicleLanes":[{"averageParkingNum":0.5,"averageSpeed":6.1, "lane":1,"laneFlow":2,"maxQueueLength":12,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":2,"pedestrianAverageTravelTime":0," vehicleAverageDelay":15.3,"vehicleAverageTravelTime":0.2},{"averageParkingNum":0.0,"averageSpeed":35.7,"lane":2,"laneFlow":1," maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":1,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":0.0,"vehicleAverageTravelTime":0.0},{" averageParkingNum":0.0,"averageSpeed":0.0,"lane":3,"laneFlow":3,"maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow\ ":0,"pedestrianAverageTravelTime":0,"vehicleAverageDelay":0.0,"vehicleAverageTravelTime":0.0},{"averageParkingNum":0.0,"averageSpeed":25.4,"lane\ ":4,"laneFlow":1,"maxQueueLength":0,"nonVehicleAverageTravelTime":0.0,"overStopLineFlow":5,"pedestrianAverageTravelTime":0,"vehicleAverageDelay": 0.0,"vehicleAverageTravelTime":1.6}]}
  • The final test found that the list was converted into a jsonarray string and could not be printed. But a single data json string can
  • After changing the code to String[], I found an error. The code is as follows
 @Bean
    public Consumer<String[]> boxEventTopicConsumer(HWTrafficEventManager hwTrafficEventManager) {<!-- -->
        log.info("Consumer Received (boxTopic): " + hwTrafficEventManager);
        return value -> {<!-- -->
            log.info("Consumer Received (boxTopic): " + value.length);

        };
    }
  • The error is reported as follows:
2023-09-21 11:47:59.684 ERROR 9112 --- [container-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff none exhausted for boxTopic-3@29941

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)
 at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo" :1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]); nested exception is com.fasterxml.jackson.databind .exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)
 at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo" :1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]), failedMessage=GenericMessage [payload=byte[96] , headers={kafka_offset=29941, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3c31888e, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=3, kafka_receivedMessageKey=[B@3fcf7bd1, kaf ka_receivedTopic=boxTopic, kafka_receivedTimestamp=1695267871945 , contentType=application/json, kafka_groupId=data-center}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2683)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2649)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2609)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2536)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2427)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2305)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1979)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1364)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1355)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1247)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)
 at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo" :1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0]); nested exception is com.fasterxml.jackson.databind .exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)
 at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo" :1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0])
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:237)
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:113)
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:176)
at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.fromMessage(SmartCompositeMessageConverter.java:48)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputMessageIfNecessary(SimpleFunctionRegistry.java:1282)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:1057)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:696)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:551)
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:754)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:586)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2629)
... 12 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `[Ljava.lang.String;` from Object value (token `JsonToken.START_OBJECT`)
 at [Source: (byte[])"[{"equipmentNo":0,"eventCode":8,"eventType":1,"laneTurn":0,"relationNo" :1,"time":1695267981221}]"; line: 1, column: 2] (through reference chain: java.lang.Object[][0])
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1741)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1515)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1420)
at com.fasterxml.jackson.databind.DeserializationContext.extractScalarFromObject(DeserializationContext.java:932)
at com.fasterxml.jackson.databind.deser.std.StdDeserializer._parseString(StdDeserializer.java:1292)
at com.fasterxml.jackson.databind.deser.std.StringArrayDeserializer.deserialize(StringArrayDeserializer.java:166)
at com.fasterxml.jackson.databind.deser.std.StringArrayDeserializer.deserialize(StringArrayDeserializer.java:25)
at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3723)
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:223)
... 45 common frames omitted
  • It can be seen from the log that a problem occurred during the parsing of the received data. The string of jsonarray cannot be parsed.
  • When a message occurs, add a “-” after the normal message and you can receive it normally. However, you need to delete the last character before parsing and then convert it to json.
  • Later I discovered that it is possible to receive it as a character array and then convert it to a string.
  • Corresponding configuration
spring:
  cloud:
    stream:
      function:
        definition: boxEventTopicConsumer
      bindings:
        boxEventTopicConsumer-in-0:
          destination:boxTopic
          group: data-center
          contentType: application/json
  • Consumer consumption code
@Configuration
public class KafkaConfiguration {<!-- -->
    @Bean
    public Consumer<Object> boxEventTopicConsumer() {<!-- -->
        return value -> {<!-- -->
            String str = new String((byte[])value, java.nio.charset.StandardCharsets.UTF_8);
            log.info("Consumer Received (boxTopic): " + str);
            // todo : do somethings
        };
    }