Elasticsearch use – combined with MybatisPlus to use ES & es and MySQL data consistency & combined with RabbitMQ to achieve decoupling

Foreword

This blog is a use case of elasticsearch, including using ES with MybatisPlus, how to ensure the data consistency between MySQL and es, and using RabbitMQ for decoupling and customizing the method of sending messages.

The list of other related Elasticsearch articles is as follows:

  • Installation and parameter settings of the Docker version of Elasticsearch & port opening and browser access

  • Installation of visual Kibana tool for Elasticsearch & installation and use of IK word segmenter

  • Springboot integration of Elasticsearch & Kibana for full query and fuzzy query

Directory

  • Preface
  • lead out
  • Using ES with MybatisPlus
    • 1.Introduce dependencies
    • 2. Configure
    • 3. Add annotations to the entity class
    • 4.Create the Repository of the operation
    • 5. Initialize the data in es
    • 6. Perform full query and paging
      • Conditional paging query
  • Data consistency between es and mysql
    • Delayed double deletion
    • Locking method
  • Decoupling with rabbitmq
    • Configuration yml file
    • rabbitmq configuration class
    • callback callback method
    • Custom message sending tool class
    • Send messages
    • Receive message, update es
  • Summarize

Export

1. Use cases of elasticsearch, including using ES in combination with MybatisPlus;
2. How to ensure data consistency between MySQL and es;
3. Used RabbitMQ for decoupling and customized the method of sending messages.

Use ES with MybatisPlus

1.Introduce dependencies

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>

        <!--mysql driver-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!-- druid-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
        </dependency>

        <!-- springboot integrates mybaits plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2. Configure

package com.tianju.es.config;


import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;

/**
 * You can also write ESConfig as a general configuration class instead of inheriting the AbstractElasticsearchConfiguration class.
 * However, the advantage of inheriting AbstractElasticsearchConfiguration is that it has already configured the elasticsearchTemplate for us to use directly.
 */
@Configuration
public class ESConfig extends AbstractElasticsearchConfiguration {<!-- -->
    @Override
    public RestHighLevelClient elasticsearchClient() {<!-- -->
        ClientConfiguration clientConfiguration =
                ClientConfiguration.builder()
                        .connectedTo("192.168.111.130:9200")
                        .build();
        return RestClients.create(clientConfiguration).rest();
    }
}

3. Add annotations to entity classes

package com.tianju.es.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.math.BigDecimal;

/**
 * Products, including inventory, price information
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("finance_sku")
@Document(indexName = "finance_sku")
public class FinanceSkuES {<!-- -->
    @TableId(value = "ID",type = IdType.AUTO)
    private Long id;
    @TableField("finance_sku_describe")
    @Field(index = true,analyzer = "ik_smart",
            searchAnalyzer = "ik_smart",type = FieldType.Text)
    private String detail; // details
    @TableField("finance_sku_price")
    private BigDecimal price;
    @TableField("finance_sku_stock")
    private long stock;
    @TableField("finance_state")
    private Integer status;
}

Parameter explanation

@Document(indexName = "books", shards = 1, replicas = 0)
@Data
public class Book {<!-- -->
    @Id
    @Field(type = FieldType.Integer)
    privateInteger id;
    
    @Field(type = FieldType.Keyword)
    private String title;
    
    @Field(type = FieldType.Text)
    private String press;
    
    @Field(type = FieldType.Keyword)
    private String author;
    
    @Field(type = FieldType.Keyword,index=false)
    private BigDecimal price;
    
    @Field(type = FieldType.Text)
    private String description;
}
  • @Document: the annotation will index all properties in the entity;
    indexName = “books”: indicates creating an index named “books”;
    shards = 1: Indicates that only one shard is used;
    replicas = 0: Indicates that replica backup is not used;
    index = false: Cannot index query
  • @Field(type = FieldType.Keyword): Used to specify the data type of the field.

4. Create a Repository of operations

It inherits a large number of ready-made methods from its ancestors, and in addition, it can define specific methods according to the rules of spring data.

package com.tianju.es.mapper;

import com.tianju.es.entity.FinanceSkuES;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;

/**
 * Operate es, similar to the previous mapper
 */
@Repository
public interface SkuESMapper extends ElasticsearchRepository<FinanceSkuES, Long> {<!-- -->

    /**
     * Based on keywords, word segmentation and paging query of sku data
     * @param detail query conditions
     * @param pageable paging
     * @return
     */
    Page<FinanceSkuES> findFinanceSkuESByDetail(String detail, Pageable pageable);

    /**
     * Delete based on id
     * @param id
     */
    void removeFinanceSkuESById(Long id);

}

5. Initialize the data in es

Running background information

View es page information, index management

6. Perform full query and paging

Perform full query

{<!-- -->
  "content": [
    {<!-- -->
      "id": 1,
      "detail": "HUAWEI MateBook
      "price": 13999.0,
      "stock": 50,
      "status": 1
    },
    {<!-- -->
      "id": 2,
      "detail": "HUAWEI Mate 60 Pro + 16GB + 1TB Xuanbai",
      "price": 9999.0,
      "stock": 60,
      "status": 1
    },
    {<!-- -->
      "id": 3,
      "detail": "iPhone 15 Pro Max Super Retina XDR display",
      "price": 9299.0,
      "stock": 46,
      "status": 1
    },
    {<!-- -->
      "id": 4,
      "detail": "MacBook Air Apple M2 chip 8-core CPU 8-core graphics processor 8GB unified memory 256GB solid state drive",
      "price": 8999.0,
      "stock": 60,
      "status": 1
    }
  ],
  "pageable": {<!-- -->
    "sort": {<!-- -->
      "empty": true,
      "sorted": false,
      "unsorted": true
    },
    "offset": 0,
    "pageSize": 4,
    "pageNumber": 0,
    "paged": true,
    "unpaged": false
  },
  "totalElements": 4,
  "last": true,
  "totalPages": 1,
  "number": 0,
  "size": 4,
  "sort": {<!-- -->
    "empty": true,
    "sorted": false,
    "unsorted": true
  },
  "numberOfElements": 4,
  "first": true,
  "empty": false
}

Conditional paging query

Note that the page of the paging query starts from 0. Try to find the smallest unit after word segmentation that needs to be entered into the word segmenter. For example, hu is not the smallest unit, but HUAWEI is.

The result of word segmentation by word segmenter

Data consistency between es and mysql

Delayed double deletion

 @Override
    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {<!-- -->
        // Considering es as a cache, how to ensure the data consistency between es and mysql?
        //Delayed double deletion mode
        // 1. Delete cache es first
        skuESMapper.deleteAll();
        // 2. Update database mysql
        updateById(financeSkuES);
        // 3. Delay operation
        try {<!-- -->
            Thread.sleep(3000);
        } catch (InterruptedException e) {<!-- -->
            throw new RuntimeException(e);
        }
        // 4. Delete cache es again
        skuESMapper.deleteAll();

        // 5.Last update cache es
        skuESMapper.saveAll(list());
        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: " + byId);
        return byId.get();
    }

There is something wrong with the above code. I am modifying it here. The result is that it is deleted directly from es at the beginning. The modified data should be deleted according to the ID, and then the modified data is set into es.

How to lock

It feels like it’s of no use, I just used it to lock it.

Use rabbitmq for decoupling

Configuration yml file

spring:
  main:
    allow-circular-references: true
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    ### Local database
    url: jdbc:mysql://127.0.0.1:3306/consumer_finance_product?useUnicode=true & amp;characterEncoding=utf8 & amp;serverTimezone=GMT+8 & amp;allowMultiQueries=true
    username: root
    password: 123

  #Redis related configuration
  redis:
    host: 119.3.162.127
    port: 6379
    database: 0
    password: Pet3927

  # rabbitmq related
  rabbitmq:
    host: 192.168.111.130
    port: 5672
    username:admin
    password: 123
    virtual-host: /test

    # Producer ensures message reliability
    publisher-returns: true
    publisher-confirm-type: correlated

    #Set manual confirmation
    listener:
      simple:
        acknowledge-mode: manual

rabbitmq configuration class

Convert Java object to json string for transmission

package com.tianju.es.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {<!-- -->

    public static final String ES_EXCHANGE = "es_exchange";
    public static final String ES_QUEUE = "es_queue";
    public static final String ES_KEY = "es_key";

    @Bean
    public DirectExchange directExchange(){<!-- -->
        return new DirectExchange(ES_EXCHANGE);
    }

    @Bean
    public Queue esQueue(){<!-- -->
        return new Queue(ES_QUEUE);
    }

    @Bean
    public Binding esQueueToDirectExchange(){<!-- -->
        return BindingBuilder.bind(esQueue())
                .to(directExchange())
                .with(ES_KEY);
    }

    /**
     * Convert object to json string
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){<!-- -->
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){<!-- -->
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());//Modify the converter
        return rabbitTemplate;
    }

}

callback callback method

package com.tianju.es.rabbit;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * Producer message reliability
 */
// RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback
@Configuration
@Slf4j
public class CallbackConfig
        implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {<!-- -->

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // initialization
    @PostConstruct
    public void init(){<!-- -->
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setMandatory(true);
    }

    /**
     * Will be executed regardless of success or failure
     * @param correlationData correlation object needs to be given when sending a message
     * @param ack true indicates success, false indicates failure to send
     * @param cause If it fails, the reason for the failure will be written; if it succeeds, it will return null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {<!-- -->
        log.debug("ack was successful: " + ack);
        log.debug("cause information: " + cause);

        if (correlationData!=null){<!-- -->
            JSONObject jsonObject = JSON.parseObject(correlationData.getReturnedMessage().getBody());
            String exchange = correlationData.getReturnedMessage().getMessageProperties().getReceivedExchange();
            String routingKey = correlationData.getReturnedMessage().getMessageProperties().getReceivedRoutingKey();
            log.debug("Message body: " + jsonObject);
            log.debug("switch: " + exchange);
            log.debug("Routing key: " + routingKey);
        }


        if (ack){<!-- -->
            return;
        }

        // Failed

        // 1. The upper limit of retries (default value 5) will increase each time the interval is retried.
        // 2. Save the message, switch name, routing key and other related messages to the database. There is a program that scans the relevant messages regularly and then resends the message.
        // If the upper limit of resending times (default value 5) exceeds the threshold, manual processing will be performed

        // 2. Save the message body, switch name, routing key and other related messages to the database. There is a program that scans the related messages regularly and then resends the message.
        // If the upper limit of resending times (default value 5) exceeds the threshold, manual processing will be performed
        // 2.1 Relevant information needs to be stored in the data, table fields: message body, switch name, routing key, status, times
        // 2.2 Scheduled tasks (single: spring scheduled tasks distributed: XxL-job), send messages




    }


    /**
     * Will only be executed if it fails
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {<!-- -->
        // 2. Save the message, switch name, routing key and other related messages to the database. There is a program that scans the relevant messages regularly and then resends the message.

    }
}

Customized messaging tool class

package com.tianju.common.util;


import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;


@Slf4j
public class RabbitUtil {<!-- -->

    /**
     * Delay queue, send message, enter the dead letter queue after arrival time
     * @param rabbitTemplate the rabbitTemplate called
     * @param redisTemplate is used to store tokens in redis
     * @param msg message sent
     * @param token The token sent to ensure idempotence
     * @param ttl If it is delayed consumption, the expiration time of the message will enter the dead letter switch and enter the dead letter queue after reaching the changed time.
     * @param exchange switch name
     * @param routingKey routing key name
     * @param <T> The entity class that sends the message
     */
    public static <T> void sendMsg(RabbitTemplate rabbitTemplate,
                                    StringRedisTemplate redisTemplate,
                                    T msg,String token,Integer ttl,
                                    String exchange,String routingKey) {<!-- -->

        log.debug("Send message {} to switch [{}] through routing key [{}], token is {}", exchange, routingKey, msg, token);

        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {<!-- -->
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {<!-- -->
                redisTemplate.opsForValue().set(token, token,5*60000);
                message.getMessageProperties().setMessageId(token);
                if (ttl!=null){<!-- -->
                    message.getMessageProperties().setExpiration(ttl.toString());
                }
                return message;
            }
        };

        CorrelationData correlationData = new CorrelationData();
        // message body
        Message message = new Message(JSON.toJSONBytes(msg));
        //Switch name
        message.getMessageProperties().setReceivedExchange(exchange);
        // routing key
        message.getMessageProperties().setReceivedRoutingKey(routingKey);
        correlationData.setReturnedMessage(message);

        //Send MQ message
        rabbitTemplate.convertAndSend(exchange, // Send to switch
                routingKey, // According to this routingKey, it will be given to the TTL queue. When the time comes, it will become a dead letter. It will be sent to the dead letter switch and sent to the dead letter queue.
                msg,
                messagePostProcessor,
                correlationData
        );
    }
}

Send messages

interface

package com.tianju.es.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.tianju.es.entity.FinanceSkuES;

public interface SkuService extends IService<FinanceSkuES> {<!-- -->

    /**
     * Delayed double deletion method ensures data consistency between es cache and mysql database
     * @param financeSkuES modified data
     * @return
     */
    FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES);

    /**
     * Locking method, but it seems useless
     * @param financeSkuES
     * @return
     */
    FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES);

    /**
     * Decoupling through rabbitmq
     * @param financeSkuES
     * @return
     */
    String updateByIdRabbitMQ(FinanceSkuES financeSkuES);
}

Implementation class

package com.tianju.es.service.impl;

import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.tianju.common.util.RabbitUtil;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import com.tianju.es.mapper.SkuMybatisPlusMapper;
import com.tianju.es.rabbit.RabbitConfig;
import com.tianju.es.service.SkuService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Collection;
import java.util.Optional;
import java.util.UUID;

@Service
public class SkuServiceImpl extends ServiceImpl
        implements SkuService {

    @Autowired
    private SkuESMapper skuESMapper;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {<!-- -->
        // Considering es as a cache, how to ensure the data consistency between es and mysql?
        //Delayed double deletion mode
        // 1. Delete cache es first
        skuESMapper.deleteAll();
        // 2. Update database mysql
        updateById(financeSkuES);
        // 3. Delay operation
        try {<!-- -->
            Thread.sleep(3000);
        } catch (InterruptedException e) {<!-- -->
            throw new RuntimeException(e);
        }
        // 4. Delete cache es again
        skuESMapper.deleteAll();

        // 5.Last update cache es
        skuESMapper.saveAll(list());
        Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: " + byId);
        return byId.get();
    }

    @Override
    public FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES) {
        //Second way to lock
        String uuid = UUID.randomUUID().toString();
        // Equivalent to setnx instruction
        Boolean skuLock = stringRedisTemplate.opsForValue().setIfAbsent("skuLock", uuid);
        try {
            if (skuLock){ // Obtained the lock
                skuESMapper.deleteAll();
                updateById(financeSkuES);
            }
        }finally {
            if (uuid.equals(stringRedisTemplate.opsForValue().get("skuLock"))){
                stringRedisTemplate.delete("skuLock");
            }
        }
        skuESMapper.saveAll(list());
        Optional byId = skuESMapper.findById(financeSkuES.getId());
        log.debug("byId: " + byId);
        return byId.get();
    }

    @Override
    public String updateByIdRabbitMQ(FinanceSkuES financeSkuES) {
        // Use rabbitmq for decoupling

        updateById(financeSkuES);
        FinanceSkuES skuES = getById(financeSkuES.getId());
        String uuid = IdUtil.fastUUID();

        RabbitUtil.sendMsg(
                rabbitTemplate,stringRedisTemplate,skuES,uuid,null,
                RabbitConfig.ES_EXCHANGE,RabbitConfig.ES_KEY
        );

        return "Message has been sent:" + skuES;
    }
}

Receive message, update es

After receiving the message, update es, delete the original one, and set the latest one.

package com.tianju.es.rabbit;

import com.rabbitmq.client.Channel;
import com.tianju.es.entity.FinanceSkuES;
import com.tianju.es.mapper.SkuESMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class ESListener {<!-- -->

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private SkuESMapper skuESMapper;

    @RabbitListener(queues = RabbitConfig.ES_QUEUE)
    public void esUpdate(FinanceSkuES financeSkuES, Message message, Channel channel) {<!-- -->
        String messageId = message.getMessageProperties().getMessageId();
        log.debug("Business ----> Monitoring messages from queue {}, messageId is {}", financeSkuES, messageId);

        try {<!-- -->
            // Idempotence
            if (redisTemplate.delete(messageId)){<!-- -->
                //Delete original es data based on id
                //Then set the new data in
                log.debug("Process es business, delete the original one and replace it with the latest one");
                skuESMapper.removeFinanceSkuESById(financeSkuES.getId());
                skuESMapper.save(financeSkuES);
            }

            // Manually sign the message
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }catch (Exception e){<!-- -->
            // Idempotence
            redisTemplate.opsForValue().set(messageId,messageId,5*60000);

            // 1. The upper limit of retries (default value 5) will increase each time the interval is retried.
            // 2. Save the message, switch name, routing key and other related messages to the database. There is a program that scans the relevant messages regularly and then resends the message.
            // If the upper limit of resending times (default value 5) exceeds the threshold, manual processing will be performed

            // Known messages, switches, routers, messages message.getBody() The message is sent to the listening queue

            try {<!-- -->
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException ex) {<!-- -->
                throw new RuntimeException(ex);
            }
        }
    }
}

Log printed in background

Summary

1. Use cases of elasticsearch, including using ES in combination with MybatisPlus;
2. How to ensure data consistency between MySQL and es;
3. Used RabbitMQ for decoupling and customized the method of sending messages.