Foreword
This blog is a use case of elasticsearch, including using ES with MybatisPlus, how to ensure the data consistency between MySQL and es, and using RabbitMQ for decoupling and customizing the method of sending messages.
The list of other related Elasticsearch articles is as follows:
-
Installation and parameter settings of the Docker version of Elasticsearch & port opening and browser access
-
Installation of visual Kibana tool for Elasticsearch & installation and use of IK word segmenter
-
Springboot integration of Elasticsearch & Kibana for full query and fuzzy query
Directory
- Preface
- lead out
- Using ES with MybatisPlus
-
- 1.Introduce dependencies
- 2. Configure
- 3. Add annotations to the entity class
- 4.Create the Repository of the operation
- 5. Initialize the data in es
- 6. Perform full query and paging
-
- Conditional paging query
- Data consistency between es and mysql
-
- Delayed double deletion
- Locking method
- Decoupling with rabbitmq
-
- Configuration yml file
- rabbitmq configuration class
- callback callback method
- Custom message sending tool class
- Send messages
- Receive message, update es
- Summarize
Export
1. Use cases of elasticsearch, including using ES in combination with MybatisPlus;
2. How to ensure data consistency between MySQL and es;
3. Used RabbitMQ for decoupling and customized the method of sending messages.
Use ES with MybatisPlus
1.Introduce dependencies
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> <!--mysql driver--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <!-- druid--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> </dependency> <!-- springboot integrates mybaits plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2. Configure
package com.tianju.es.config; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.context.annotation.Configuration; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.RestClients; import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration; /** * You can also write ESConfig as a general configuration class instead of inheriting the AbstractElasticsearchConfiguration class. * However, the advantage of inheriting AbstractElasticsearchConfiguration is that it has already configured the elasticsearchTemplate for us to use directly. */ @Configuration public class ESConfig extends AbstractElasticsearchConfiguration {<!-- --> @Override public RestHighLevelClient elasticsearchClient() {<!-- --> ClientConfiguration clientConfiguration = ClientConfiguration.builder() .connectedTo("192.168.111.130:9200") .build(); return RestClients.create(clientConfiguration).rest(); } }
3. Add annotations to entity classes
package com.tianju.es.entity; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; import java.math.BigDecimal; /** * Products, including inventory, price information */ @Data @NoArgsConstructor @AllArgsConstructor @TableName("finance_sku") @Document(indexName = "finance_sku") public class FinanceSkuES {<!-- --> @TableId(value = "ID",type = IdType.AUTO) private Long id; @TableField("finance_sku_describe") @Field(index = true,analyzer = "ik_smart", searchAnalyzer = "ik_smart",type = FieldType.Text) private String detail; // details @TableField("finance_sku_price") private BigDecimal price; @TableField("finance_sku_stock") private long stock; @TableField("finance_state") private Integer status; }
Parameter explanation
@Document(indexName = "books", shards = 1, replicas = 0) @Data public class Book {<!-- --> @Id @Field(type = FieldType.Integer) privateInteger id; @Field(type = FieldType.Keyword) private String title; @Field(type = FieldType.Text) private String press; @Field(type = FieldType.Keyword) private String author; @Field(type = FieldType.Keyword,index=false) private BigDecimal price; @Field(type = FieldType.Text) private String description; }
- @Document: the annotation will index all properties in the entity;
indexName = “books”: indicates creating an index named “books”;
shards = 1: Indicates that only one shard is used;
replicas = 0: Indicates that replica backup is not used;
index = false: Cannot index query - @Field(type = FieldType.Keyword): Used to specify the data type of the field.
4. Create a Repository of operations
It inherits a large number of ready-made methods from its ancestors, and in addition, it can define specific methods according to the rules of spring data.
package com.tianju.es.mapper; import com.tianju.es.entity.FinanceSkuES; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; import org.springframework.stereotype.Repository; /** * Operate es, similar to the previous mapper */ @Repository public interface SkuESMapper extends ElasticsearchRepository<FinanceSkuES, Long> {<!-- --> /** * Based on keywords, word segmentation and paging query of sku data * @param detail query conditions * @param pageable paging * @return */ Page<FinanceSkuES> findFinanceSkuESByDetail(String detail, Pageable pageable); /** * Delete based on id * @param id */ void removeFinanceSkuESById(Long id); }
5. Initialize the data in es
Running background information
View es page information, index management
6. Perform full query and paging
Perform full query
{<!-- --> "content": [ {<!-- --> "id": 1, "detail": "HUAWEI MateBook "price": 13999.0, "stock": 50, "status": 1 }, {<!-- --> "id": 2, "detail": "HUAWEI Mate 60 Pro + 16GB + 1TB Xuanbai", "price": 9999.0, "stock": 60, "status": 1 }, {<!-- --> "id": 3, "detail": "iPhone 15 Pro Max Super Retina XDR display", "price": 9299.0, "stock": 46, "status": 1 }, {<!-- --> "id": 4, "detail": "MacBook Air Apple M2 chip 8-core CPU 8-core graphics processor 8GB unified memory 256GB solid state drive", "price": 8999.0, "stock": 60, "status": 1 } ], "pageable": {<!-- --> "sort": {<!-- --> "empty": true, "sorted": false, "unsorted": true }, "offset": 0, "pageSize": 4, "pageNumber": 0, "paged": true, "unpaged": false }, "totalElements": 4, "last": true, "totalPages": 1, "number": 0, "size": 4, "sort": {<!-- --> "empty": true, "sorted": false, "unsorted": true }, "numberOfElements": 4, "first": true, "empty": false }
Conditional paging query
Note that the page of the paging query starts from 0. Try to find the smallest unit after word segmentation that needs to be entered into the word segmenter. For example, hu is not the smallest unit, but HUAWEI is.
The result of word segmentation by word segmenter
Data consistency between es and mysql
Delayed double deletion
@Override public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {<!-- --> // Considering es as a cache, how to ensure the data consistency between es and mysql? //Delayed double deletion mode // 1. Delete cache es first skuESMapper.deleteAll(); // 2. Update database mysql updateById(financeSkuES); // 3. Delay operation try {<!-- --> Thread.sleep(3000); } catch (InterruptedException e) {<!-- --> throw new RuntimeException(e); } // 4. Delete cache es again skuESMapper.deleteAll(); // 5.Last update cache es skuESMapper.saveAll(list()); Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId()); log.debug("byId: " + byId); return byId.get(); }
There is something wrong with the above code. I am modifying it here. The result is that it is deleted directly from es at the beginning. The modified data should be deleted according to the ID, and then the modified data is set into es.
How to lock
It feels like it’s of no use, I just used it to lock it.
Use rabbitmq for decoupling
Configuration yml file
spring: main: allow-circular-references: true datasource: driver-class-name: com.mysql.cj.jdbc.Driver ### Local database url: jdbc:mysql://127.0.0.1:3306/consumer_finance_product?useUnicode=true & amp;characterEncoding=utf8 & amp;serverTimezone=GMT+8 & amp;allowMultiQueries=true username: root password: 123 #Redis related configuration redis: host: 119.3.162.127 port: 6379 database: 0 password: Pet3927 # rabbitmq related rabbitmq: host: 192.168.111.130 port: 5672 username:admin password: 123 virtual-host: /test # Producer ensures message reliability publisher-returns: true publisher-confirm-type: correlated #Set manual confirmation listener: simple: acknowledge-mode: manual
rabbitmq configuration class
Convert Java object to json string for transmission
package com.tianju.es.rabbit; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig {<!-- --> public static final String ES_EXCHANGE = "es_exchange"; public static final String ES_QUEUE = "es_queue"; public static final String ES_KEY = "es_key"; @Bean public DirectExchange directExchange(){<!-- --> return new DirectExchange(ES_EXCHANGE); } @Bean public Queue esQueue(){<!-- --> return new Queue(ES_QUEUE); } @Bean public Binding esQueueToDirectExchange(){<!-- --> return BindingBuilder.bind(esQueue()) .to(directExchange()) .with(ES_KEY); } /** * Convert object to json string * @return */ @Bean public MessageConverter messageConverter(){<!-- --> return new Jackson2JsonMessageConverter(); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){<!-- --> RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter());//Modify the converter return rabbitTemplate; } }
callback callback method
package com.tianju.es.rabbit; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; /** * Producer message reliability */ // RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback @Configuration @Slf4j public class CallbackConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {<!-- --> @Autowired private RabbitTemplate rabbitTemplate; // initialization @PostConstruct public void init(){<!-- --> rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); rabbitTemplate.setMandatory(true); } /** * Will be executed regardless of success or failure * @param correlationData correlation object needs to be given when sending a message * @param ack true indicates success, false indicates failure to send * @param cause If it fails, the reason for the failure will be written; if it succeeds, it will return null. */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {<!-- --> log.debug("ack was successful: " + ack); log.debug("cause information: " + cause); if (correlationData!=null){<!-- --> JSONObject jsonObject = JSON.parseObject(correlationData.getReturnedMessage().getBody()); String exchange = correlationData.getReturnedMessage().getMessageProperties().getReceivedExchange(); String routingKey = correlationData.getReturnedMessage().getMessageProperties().getReceivedRoutingKey(); log.debug("Message body: " + jsonObject); log.debug("switch: " + exchange); log.debug("Routing key: " + routingKey); } if (ack){<!-- --> return; } // Failed // 1. The upper limit of retries (default value 5) will increase each time the interval is retried. // 2. Save the message, switch name, routing key and other related messages to the database. There is a program that scans the relevant messages regularly and then resends the message. // If the upper limit of resending times (default value 5) exceeds the threshold, manual processing will be performed // 2. Save the message body, switch name, routing key and other related messages to the database. There is a program that scans the related messages regularly and then resends the message. // If the upper limit of resending times (default value 5) exceeds the threshold, manual processing will be performed // 2.1 Relevant information needs to be stored in the data, table fields: message body, switch name, routing key, status, times // 2.2 Scheduled tasks (single: spring scheduled tasks distributed: XxL-job), send messages } /** * Will only be executed if it fails * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {<!-- --> // 2. Save the message, switch name, routing key and other related messages to the database. There is a program that scans the relevant messages regularly and then resends the message. } }
Customized messaging tool class
package com.tianju.common.util; import com.alibaba.fastjson2.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.data.redis.core.StringRedisTemplate; @Slf4j public class RabbitUtil {<!-- --> /** * Delay queue, send message, enter the dead letter queue after arrival time * @param rabbitTemplate the rabbitTemplate called * @param redisTemplate is used to store tokens in redis * @param msg message sent * @param token The token sent to ensure idempotence * @param ttl If it is delayed consumption, the expiration time of the message will enter the dead letter switch and enter the dead letter queue after reaching the changed time. * @param exchange switch name * @param routingKey routing key name * @param <T> The entity class that sends the message */ public static <T> void sendMsg(RabbitTemplate rabbitTemplate, StringRedisTemplate redisTemplate, T msg,String token,Integer ttl, String exchange,String routingKey) {<!-- --> log.debug("Send message {} to switch [{}] through routing key [{}], token is {}", exchange, routingKey, msg, token); MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {<!-- --> @Override public Message postProcessMessage(Message message) throws AmqpException {<!-- --> redisTemplate.opsForValue().set(token, token,5*60000); message.getMessageProperties().setMessageId(token); if (ttl!=null){<!-- --> message.getMessageProperties().setExpiration(ttl.toString()); } return message; } }; CorrelationData correlationData = new CorrelationData(); // message body Message message = new Message(JSON.toJSONBytes(msg)); //Switch name message.getMessageProperties().setReceivedExchange(exchange); // routing key message.getMessageProperties().setReceivedRoutingKey(routingKey); correlationData.setReturnedMessage(message); //Send MQ message rabbitTemplate.convertAndSend(exchange, // Send to switch routingKey, // According to this routingKey, it will be given to the TTL queue. When the time comes, it will become a dead letter. It will be sent to the dead letter switch and sent to the dead letter queue. msg, messagePostProcessor, correlationData ); } }
Send messages
interface
package com.tianju.es.service; import com.baomidou.mybatisplus.extension.service.IService; import com.tianju.es.entity.FinanceSkuES; public interface SkuService extends IService<FinanceSkuES> {<!-- --> /** * Delayed double deletion method ensures data consistency between es cache and mysql database * @param financeSkuES modified data * @return */ FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES); /** * Locking method, but it seems useless * @param financeSkuES * @return */ FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES); /** * Decoupling through rabbitmq * @param financeSkuES * @return */ String updateByIdRabbitMQ(FinanceSkuES financeSkuES); }
Implementation class
package com.tianju.es.service.impl; import cn.hutool.core.util.IdUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.tianju.common.util.RabbitUtil; import com.tianju.es.entity.FinanceSkuES; import com.tianju.es.mapper.SkuESMapper; import com.tianju.es.mapper.SkuMybatisPlusMapper; import com.tianju.es.rabbit.RabbitConfig; import com.tianju.es.service.SkuService; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import java.util.Collection; import java.util.Optional; import java.util.UUID; @Service public class SkuServiceImpl extends ServiceImplimplements SkuService { @Autowired private SkuESMapper skuESMapper; @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RabbitTemplate rabbitTemplate; @Override public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) {<!-- --> // Considering es as a cache, how to ensure the data consistency between es and mysql? //Delayed double deletion mode // 1. Delete cache es first skuESMapper.deleteAll(); // 2. Update database mysql updateById(financeSkuES); // 3. Delay operation try {<!-- --> Thread.sleep(3000); } catch (InterruptedException e) {<!-- --> throw new RuntimeException(e); } // 4. Delete cache es again skuESMapper.deleteAll(); // 5.Last update cache es skuESMapper.saveAll(list()); Optional<FinanceSkuES> byId = skuESMapper.findById(financeSkuES.getId()); log.debug("byId: " + byId); return byId.get(); } @Override public FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES) { //Second way to lock String uuid = UUID.randomUUID().toString(); // Equivalent to setnx instruction Boolean skuLock = stringRedisTemplate.opsForValue().setIfAbsent("skuLock", uuid); try { if (skuLock){ // Obtained the lock skuESMapper.deleteAll(); updateById(financeSkuES); } }finally { if (uuid.equals(stringRedisTemplate.opsForValue().get("skuLock"))){ stringRedisTemplate.delete("skuLock"); } } skuESMapper.saveAll(list()); Optional byId = skuESMapper.findById(financeSkuES.getId()); log.debug("byId: " + byId); return byId.get(); } @Override public String updateByIdRabbitMQ(FinanceSkuES financeSkuES) { // Use rabbitmq for decoupling updateById(financeSkuES); FinanceSkuES skuES = getById(financeSkuES.getId()); String uuid = IdUtil.fastUUID(); RabbitUtil.sendMsg( rabbitTemplate,stringRedisTemplate,skuES,uuid,null, RabbitConfig.ES_EXCHANGE,RabbitConfig.ES_KEY ); return "Message has been sent:" + skuES; } }
Receive message, update es
After receiving the message, update es, delete the original one, and set the latest one.
package com.tianju.es.rabbit; import com.rabbitmq.client.Channel; import com.tianju.es.entity.FinanceSkuES; import com.tianju.es.mapper.SkuESMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.io.IOException; @Slf4j @Component public class ESListener {<!-- --> @Autowired private StringRedisTemplate redisTemplate; @Autowired private SkuESMapper skuESMapper; @RabbitListener(queues = RabbitConfig.ES_QUEUE) public void esUpdate(FinanceSkuES financeSkuES, Message message, Channel channel) {<!-- --> String messageId = message.getMessageProperties().getMessageId(); log.debug("Business ----> Monitoring messages from queue {}, messageId is {}", financeSkuES, messageId); try {<!-- --> // Idempotence if (redisTemplate.delete(messageId)){<!-- --> //Delete original es data based on id //Then set the new data in log.debug("Process es business, delete the original one and replace it with the latest one"); skuESMapper.removeFinanceSkuESById(financeSkuES.getId()); skuESMapper.save(financeSkuES); } // Manually sign the message channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){<!-- --> // Idempotence redisTemplate.opsForValue().set(messageId,messageId,5*60000); // 1. The upper limit of retries (default value 5) will increase each time the interval is retried. // 2. Save the message, switch name, routing key and other related messages to the database. There is a program that scans the relevant messages regularly and then resends the message. // If the upper limit of resending times (default value 5) exceeds the threshold, manual processing will be performed // Known messages, switches, routers, messages message.getBody() The message is sent to the listening queue try {<!-- --> channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException ex) {<!-- --> throw new RuntimeException(ex); } } } }
Log printed in background
Summary
1. Use cases of elasticsearch, including using ES in combination with MybatisPlus;
2. How to ensure data consistency between MySQL and es;
3. Used RabbitMQ for decoupling and customized the method of sending messages.