The producer who turns on RabbitMQ sends a message receipt confirmation (ACK) to the RabbitMQ server and the consumer manually acknowledges or discards the consumed message.
Enable producer confirmation messages by configuring publisher-confirm-type: correlated
and publisher-returns: true
.
server: port: 8014 spring: rabbitmq: username:admin password: 123456 dynamic: true # port: 5672 # host: 192.168.49.9 addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672 publisher-confirm-type: correlated publisher-returns: true application: name: shushan datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://ip/shushan username: root password: hikari: minimum-idle: 10 maximum-pool-size: 20 idle-timeout: 50000 max-lifetime: 540000 connection-test-query: select 1 connection-timeout: 600000
RabbitConfig:
package com.kexuexiong.shushan.common.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class RabbitConfig {<!-- --> @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {<!-- --> RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {<!-- --> log.info("confirmCallback data: " + correlationData); log.info("confirmCallback ack :" + ack); log.info("confirmCallback cause :" + cause); }); rabbitTemplate.setReturnsCallback(returned -> log.info("returnsCallback msg : " + returned)); return rabbitTemplate; } }
AckReceiver manually acknowledges the consumer:
package com.kexuexiong.shushan.common.mq; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.Map; import java.util.Objects; @Slf4j @Component public class AckReceiver implements ChannelAwareMessageListener {<!-- --> @Override public void onMessage(Message message, Channel channel) throws Exception {<!-- --> long deliveryTag = message.getMessageProperties().getDeliveryTag(); byte[] messageBody = message.getBody(); try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) {<!-- --> Map<String, String> msg = (Map<String, String>) inputStream.readObject(); log.info(message.getMessageProperties().getConsumerQueue() + "-ack Receiver:" + msg); log.info("header msg :" + message.getMessageProperties().getHeaders()); if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){<!-- --> channel.basicNack(deliveryTag,false,false); }else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){<!-- --> channel.basicAck(deliveryTag, true); }else {<!-- --> channel.basicAck(deliveryTag, true); } } catch (Exception e) {<!-- --> channel.basicReject(deliveryTag, false); log.error(e.getMessage()); } } }
Multiple message queues can be listened to by configuring simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE)
.
package com.kexuexiong.shushan.common.mq; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MessageListenerConfig {<!-- --> @Autowired private CachingConnectionFactory connectionFactory; @Autowired private AckReceiver ackReceiver; @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() {<!-- --> SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); simpleMessageListenerContainer.setConcurrentConsumers(2); simpleMessageListenerContainer.setMaxConcurrentConsumers(2); simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPIC simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE); simpleMessageListenerContainer.setMessageListener(ackReceiver); return simpleMessageListenerContainer; } }
package com.kexuexiong.shushan.controller.mq; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.RandomUtil; import com.kexuexiong.shushan.common.mq.MqConstant; import com.kexuexiong.shushan.controller.BaseController; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.UUID; @Slf4j @RestController @RequestMapping("/mq/") public class MqController extends BaseController {<!-- --> @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/callback/sendDirectMessage") public String sendDirectMessageCallback(){<!-- --> String msgId = UUID.randomUUID().toString(); String msg = "demo msg ,kexuexiong"; String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss"); Map<String,Object> map = new HashMap(); map.put("msgId",msgId); map.put("msg",msg); map.put("createTime",createTime); rabbitTemplate.convertAndSend("noneDirectExchange","demoDirectRouting",map); return "ok"; } @GetMapping("/callback/lonelyDirectExchange") public String lonelyDirectExchange(){<!-- --> String msgId = UUID.randomUUID().toString(); String msg = "demo msg ,kexuexiong"; String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss"); Map<String,Object> map = new HashMap(); map.put("msgId",msgId); map.put("msg",msg); map.put("createTime",createTime); rabbitTemplate.convertAndSend(MqConstant.lonelyDirectExchange,"demoDirectRouting",map); return "ok"; } }
test:
Send dirct message and the switch cannot be found
2023-10-10T17:04:58.492 + 08:00 ERROR 27232 --- [.168.49.10:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply -code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40) 2023-10-10T17:04:58.492 + 08:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig: confirmCallback data: null 2023-10-10T17:04:58.492 + 08:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :false 2023-10-10T17:04:58.492 + 08:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :channel error; protocol method: #method<channel.close>(reply-code =404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)
ack is false.
Sending dirct message Queue not found
2023-10-10T17:05:55.851 + 08:00 INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig: confirmCallback data: null 2023-10-10T17:05:55.852 + 08:00 INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig : confirmCallback ack :true 2023-10-10T17:05:55.852 + 08:00 INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig : confirmCallback cause :null 2023-10-10T17:05:55.865 + 08:00 INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig : returnsCallback msg : ReturnedMessage [message=(Body:'[serialized object]' MessageProperties [ headers={<!-- -->}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=lonelyDirectExchange, routingKey=demoDirectRouting]
ACK is true, replyText=NO_ROUTE.