Elasticsearch use – combined with MybatisPlus to use ES & es and MySQL data consistency & combined with RabbitMQ to achieve decoupling


This blog is a use case of elasticsearch, including using ES with MybatisPlus, how to ensure the data consistency between MySQL and es, and using RabbitMQ for decoupling and customizing the method of sending messages.

  • Preface
  • lead out
  • Using ES with MybatisPlus
    • 1.Introduce dependencies
    • 2. Configure
    • 3. Add annotations to the entity class
    • 4.Create the Repository of the operation
    • 5. Initialize the data in es
    • 6. Perform full query and paging
      • Conditional paging query
  • Data consistency between es and mysql
    • Delayed double deletion
    • Locking method
  • Decoupling with rabbitmq
    • Configuration yml file
    • rabbitmq configuration class
    • callback callback method
    • Custom message sending tool class
    • Send messages
    • Receive message, update es
  • Summarize


Use ES with MybatisPlus

1.Introduce dependencies


        <!--mysql driver-->

        <!-- druid-->

        <!-- springboot integrates mybaits plus -->


2. Configure

package com.tianju.es.config;

import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;

 * You can also write ESConfig as a general configuration class instead of inheriting the AbstractElasticsearchConfiguration class.
 * However, the advantage of inheriting AbstractElasticsearchConfiguration is that it has already configured the elasticsearchTemplate for us to use directly.
public class ESConfig extends AbstractElasticsearchConfiguration {<!-- -->
    public RestHighLevelClient elasticsearchClient() {<!-- -->
        ClientConfiguration clientConfiguration =
        return RestClients.create(clientConfiguration).rest();

3. Add annotations to entity classes

package com.tianju.es.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.math.BigDecimal;

 * Products, including inventory, price information
@Document(indexName = "finance_sku")
public class FinanceSkuES {<!-- -->
    @TableId(value = "ID",type = IdType.AUTO)
    private Long id;
    @Field(index = true,analyzer = "ik_smart",
            searchAnalyzer = "ik_smart",type = FieldType.Text)
    private String detail; // details
    private BigDecimal price;
    private long stock;
    private Integer status;

Parameter explanation

@Document(indexName = "books", shards = 1, replicas = 0)
public class Book {<!-- -->
    @Field(type = FieldType.Integer)
    privateInteger id;
    @Field(type = FieldType.Keyword)
    private String title;
    @Field(type = FieldType.Text)
    private String press;
    @Field(type = FieldType.Keyword)
    private String author;
    @Field(type = FieldType.Keyword,index=false)
    private BigDecimal price;
    @Field(type = FieldType.Text)
    private String description;
  • @Document: the annotation will index all properties in the entity;
    indexName = “books”: indicates creating an index named “books”;
    shards = 1: Indicates that only one shard is used;
    replicas = 0: Indicates that replica backup is not used;
    index = false: Cannot index query
  • @Field(type = FieldType.Keyword): Used to specify the data type of the field.

4. Create a Repository of operations

It inherits a large number of ready-made methods from its ancestors, and in addition, it can define specific methods according to the rules of spring data.

package com.tianju.es.mapper;

import com.tianju.es.entity.FinanceSkuES;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;

 * Operate es, similar to the previous mapper
public interface SkuESMapper extends ElasticsearchRepository<FinanceSkuES, Long> {<!-- -->

     * Based on keywords, word segmentation and paging query of sku data
     * @param detail query conditions
     * @param pageable paging
     * @return
    Page<FinanceSkuES> findFinanceSkuESByDetail(String detail, Pageable pageable);

     * Delete based on id
     * @param id
    void removeFinanceSkuESById(Long id);


5. Initialize the data in es

Running background information

View es page information, index management

6. Perform full query and paging

Perform full query

{<!-- -->
  "content": [
    {<!-- -->
      "id": 1,
      "detail": "HUAWEI MateBook
      "price": 13999.0,
      "stock": 50,
      "status": 1
    {<!-- -->
      "id": 2,
      "detail": "HUAWEI Mate 60 Pro + 16GB + 1TB Xuanbai",
      "price": 9999.0,
      "stock": 60,
      "status": 1
    {<!-- -->
      "id": 3,
      "detail": "iPhone 15 Pro Max Super Retina XDR display",
      "price": 9299.0,
      "stock": 46,
      "status": 1
    {<!-- -->
      "id": 4,
      "detail": "MacBook Air Apple M2 chip 8-core CPU 8-core graphics processor 8GB unified memory 256GB solid state drive",
      "price": 8999.0,
      "stock": 60,
      "status": 1
  "pageable": {<!-- -->
    "sort": {<!-- -->
      "empty": true,
      "sorted": false,
      "unsorted": true
    "offset": 0,
    "pageSize": 4,
    "pageNumber": 0,
    "paged": true,
    "unpaged": false
  "totalElements": 4,
  "last": true,
  "totalPages": 1,
  "number": 0,
  "size": 4,
  "sort": {<!-- -->
    "empty": true,
    "sorted": false,
    "unsorted": true
  "numberOfElements": 4,
  "first": true,
  "empty": false

Conditional paging query

Note that the page of the paging query starts from 0. Try to find the smallest unit after word segmentation that needs to be entered into the word segmenter. For example, hu is not the smallest unit, but HUAWEI is.

The result of word segmentation by word segmenter

Data consistency between es and mysql

Delayed double deletion

    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {<!-- -->
        // Considering es as a cache, how to ensure the data consistency between es and mysql?
        //Delayed double deletion mode
        // 1. Delete cache es first
        // 2. Update database mysql
        // 3. Delay operation
        try {<!-- -->
        } catch (InterruptedException e) {<!-- -->
            throw new RuntimeException(e);
        // 4. Delete cache es again

        // 5.Last update cache es
        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: " + byId);
        return byId.get();

There is something wrong with the above code. I am modifying it here. The result is that it is deleted directly from es at the beginning. The modified data should be deleted according to the ID, and then the modified data is set into es.

How to lock

It feels like it’s of no use, I just used it to lock it.

Use rabbitmq for decoupling

Configuration yml file

    allow-circular-references: true
    driver-class-name: com.mysql.cj.jdbc.Driver
    ### Local database
    url: jdbc:mysql:// & amp;characterEncoding=utf8 & amp;serverTimezone=GMT+8 & amp;allowMultiQueries=true
    username: root
    password: 123

  #Redis related configuration
    port: 6379
    database: 0
    password: Pet3927

  # rabbitmq related
    port: 5672
    password: 123
    virtual-host: /test

    # Producer ensures message reliability
    publisher-returns: true
    publisher-confirm-type: correlated

    #Set manual confirmation
        acknowledge-mode: manual

rabbitmq configuration class

Convert Java object to json string for transmission

package com.tianju.es.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class RabbitConfig {<!-- -->

    public static final String ES_EXCHANGE = "es_exchange";
    public static final String ES_QUEUE = "es_queue";
    public static final String ES_KEY = "es_key";

    public DirectExchange directExchange(){<!-- -->
        return new DirectExchange(ES_EXCHANGE);

    public Queue esQueue(){<!-- -->
        return new Queue(ES_QUEUE);

    public Binding esQueueToDirectExchange(){<!-- -->
        return BindingBuilder.bind(esQueue())

     * Convert object to json string
     * @return
    public MessageConverter messageConverter(){<!-- -->
        return new Jackson2JsonMessageConverter();

    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){<!-- -->
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());//Modify the converter
        return rabbitTemplate;


callback callback method

package com.tianju.es.rabbit;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

 * Producer message reliability
// RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback
public class CallbackConfig
        implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {<!-- -->

    private RabbitTemplate rabbitTemplate;

    // initialization
    public void init(){<!-- -->

     * Will be executed regardless of success or failure
     * @param correlationData correlation object needs to be given when sending a message
     * @param ack true indicates success, false indicates failure to send
     * @param cause If it fails, the reason for the failure will be written; if it succeeds, it will return null.
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {<!-- -->
        log.debug("ack was successful: " + ack);
        log.debug("cause information: " + cause);

        if (correlationData!=null){<!-- -->
            JSONObject jsonObject = JSON.parseObject(correlationData.getReturnedMessage().getBody());
            String exchange = correlationData.getReturnedMessage().getMessageProperties().getReceivedExchange();
            String routingKey = correlationData.getReturnedMessage().getMessageProperties().getReceivedRoutingKey();
            log.debug("Message body: " + jsonObject);
            log.debug("switch: " + exchange);
            log.debug("Routing key: " + routingKey);

        if (ack){<!-- -->

        // Failed

        // 1. The upper limit of retries (default value 5) will increase each time the interval is retried.
        // 2. Save the message, switch name, routing key and other related messages to the database. There is a program that scans the relevant messages regularly and then resends the message.
        // If the upper limit of resending times (default value 5) exceeds the threshold, manual processing will be performed

        // 2. Save the message body, switch name, routing key and other related messages to the database. There is a program that scans the related messages regularly and then resends the message.
        // If the upper limit of resending times (default value 5) exceeds the threshold, manual processing will be performed
        // 2.1 Relevant information needs to be stored in the data, table fields: message body, switch name, routing key, status, times
        // 2.2 Scheduled tasks (single: spring scheduled tasks distributed: XxL-job), send messages


     * Will only be executed if it fails
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {<!-- -->
        // 2. Save the message, switch name, routing key and other related messages to the database. There is a program that scans the relevant messages regularly and then resends the message.


Customized messaging tool class

package com.tianju.common.util;

import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;

public class RabbitUtil {<!-- -->

     * Delay queue, send message, enter the dead letter queue after arrival time
     * @param rabbitTemplate the rabbitTemplate called
     * @param redisTemplate is used to store tokens in redis
     * @param msg message sent
     * @param token The token sent to ensure idempotence
     * @param ttl If it is delayed consumption, the expiration time of the message will enter the dead letter switch and enter the dead letter queue after reaching the changed time.
     * @param exchange switch name
     * @param routingKey routing key name
     * @param <T> The entity class that sends the message
    public static <T> void sendMsg(RabbitTemplate rabbitTemplate,
                                    StringRedisTemplate redisTemplate,
                                    T msg,String token,Integer ttl,
                                    String exchange,String routingKey) {<!-- -->

        log.debug("Send message {} to switch [{}] through routing key [{}], token is {}", exchange, routingKey, msg, token);

        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {<!-- -->
            public Message postProcessMessage(Message message) throws AmqpException {<!-- -->
                redisTemplate.opsForValue().set(token, token,5*60000);
                if (ttl!=null){<!-- -->
                return message;

        CorrelationData correlationData = new CorrelationData();
        // message body
        Message message = new Message(JSON.toJSONBytes(msg));
        //Switch name
        // routing key

        //Send MQ message
        rabbitTemplate.convertAndSend(exchange, // Send to switch
                routingKey, // According to this routingKey, it will be given to the TTL queue. When the time comes, it will become a dead letter. It will be sent to the dead letter switch and sent to the dead letter queue.

Send messages


package com.tianju.es.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.tianju.es.entity.FinanceSkuES;

public interface SkuService extends IService<FinanceSkuES> {<!-- -->

     * Delayed double deletion method ensures data consistency between es cache and mysql database
     * @param financeSkuES modified data
     * @return
    FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES);

     * Locking method, but it seems useless
     * @param financeSkuES
     * @return
    FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES);

     * Decoupling through rabbitmq
     * @param financeSkuES
     * @return
    String updateByIdRabbitMQ(FinanceSkuES financeSkuES);

Implementation class

package com.tianju.es.service.impl;

import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.tianju.common.util.RabbitUtil;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import com.tianju.es.mapper.SkuMybatisPlusMapper;
import com.tianju.es.rabbit.RabbitConfig;
import com.tianju.es.service.SkuService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Collection;
import java.util.Optional;
import java.util.UUID;

public class SkuServiceImpl extends ServiceImpl
        implements SkuService {

    private SkuESMapper skuESMapper;

    private StringRedisTemplate stringRedisTemplate;

    private RabbitTemplate rabbitTemplate;

    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {<!-- -->
        // Considering es as a cache, how to ensure the data consistency between es and mysql?
        //Delayed double deletion mode
        // 1. Delete cache es first
        // 2. Update database mysql
        // 3. Delay operation
        try {<!-- -->
        } catch (InterruptedException e) {<!-- -->
            throw new RuntimeException(e);
        // 4. Delete cache es again

        // 5.Last update cache es
        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: " + byId);
        return byId.get();

    public FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES) {
        //Second way to lock
        String uuid = UUID.randomUUID().toString();
        // Equivalent to setnx instruction
        Boolean skuLock = stringRedisTemplate.opsForValue().setIfAbsent("skuLock", uuid);
        try {
            if (skuLock){ // Obtained the lock
        }finally {
            if (uuid.equals(stringRedisTemplate.opsForValue().get("skuLock"))){
        Optional byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: " + byId);
        return byId.get();

    public String updateByIdRabbitMQ(FinanceSkuES financeSkuES) {
        // Use rabbitmq for decoupling

        FinanceSkuES skuES = getById(financeSkuES.getId());
        String uuid = IdUtil.fastUUID();


        return "Message has been sent:" + skuES;

Receive message, update es

After receiving the message, update es, delete the original one, and set the latest one.

package com.tianju.es.rabbit;

import com.rabbitmq.client.Channel;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;

public class ESListener {<!-- -->

    private StringRedisTemplate redisTemplate;

    private SkuESMapper skuESMapper;

    @RabbitListener(queues = RabbitConfig.ES_QUEUE)
    public void esUpdate(FinanceSkuES financeSkuES, Message message, Channel channel) {<!-- -->
        String messageId = message.getMessageProperties().getMessageId();
        log.debug("Business ----> Monitoring messages from queue {}, messageId is {}", financeSkuES, messageId);

        try {<!-- -->
            // Idempotence
            if (redisTemplate.delete(messageId)){<!-- -->
                //Delete original es data based on id
                //Then set the new data in
                log.debug("Process es business, delete the original one and replace it with the latest one");

            // Manually sign the message
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }catch (Exception e){<!-- -->
            // Idempotence

            // 1. The upper limit of retries (default value 5) will increase each time the interval is retried.
            // 2. Save the message, switch name, routing key and other related messages to the database. There is a program that scans the relevant messages regularly and then resends the message.
            // If the upper limit of resending times (default value 5) exceeds the threshold, manual processing will be performed

            // Known messages, switches, routers, messages message.getBody() The message is sent to the listening queue

            try {<!-- -->
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException ex) {<!-- -->
                throw new RuntimeException(ex);

Log printed in background


