Table of Contents
- data synchronization
-
- 1. Thought analysis
-
- 1. Synchronous call
- 2. Asynchronous notification
- 3. Monitor binlog
- 4. Choose
- 2. Realize data synchronization
-
- 1. Idea
- 2. Import the demo
- 3. Declare switches and queues
-
- 3.1 Introducing dependencies
- 3.2 Configuration file
- 3.3 Declare the queue switch name
- 3.4 Declaring a Queue Exchange
- 4. Send MQ message
-
- 4.1 Transaction configuration class
- 4.2 service code
- 5. Receive MQ message
Data synchronization
The hotel data in elasticsearch comes from the mysql database, so when the mysql data changes, the elasticsearch must also change accordingly. This is the data synchronization between elasticsearch and mysql.
1. Thinking analysis
There are three common data synchronization schemes:
- synchronous call
- asynchronous notification
- monitor binlog
1. Synchronous call
Solution 1: Synchronous call
The basic steps are as follows:
- hotel-demo provides an interface to modify the data in elasticsearch
- After the hotel management service completes the database operation, it directly calls the interface provided by hotel-demo,
2. Asynchronous notification
Solution 2: Asynchronous notification
The process is as follows:
- Hotel-admin sends MQ message after adding, deleting and modifying mysql database data
- Hotel-demo listens to MQ and completes elasticsearch data modification after receiving the message
3. Monitor binlog
Solution 3: Monitor binlog
The process is as follows:
- Enable the binlog function for mysql
- The addition, deletion, and modification operations of mysql will be recorded in the binlog
- Hotel-demo monitors binlog changes based on canal, and updates the content in elasticsearch in real time
4. Select
Method 1: Synchronous call
- Advantages: simple to implement, rough
- Disadvantages: high degree of business coupling
Method 2: Asynchronous notification
- Advantages: low coupling, generally difficult to implement
- Disadvantages: rely on the reliability of mq
Method 3: Monitor binlog
- Advantages: Complete decoupling between services
- Disadvantages: Enabling binlog increases database burden and high implementation complexity
2. Realize data synchronization
1. Idea
Use the hotel-admin project provided by the pre-class materials as a microservice for hotel management. When the hotel data is added, deleted, or modified, the same operation is required for the data in elasticsearch.
step:
-
Import the hotel-admin project provided by the pre-course materials, start and test the CRUD of hotel data
-
Declare exchange, queue, RoutingKey
-
Complete message sending in the add, delete, and change business in hotel-admin
-
Complete message monitoring in hotel-demo and update data in elasticsearch
-
Start and test the data sync function
2. Import demo
Data: Data here
Import the hotel-admin project provided by the data:
After running, visit http://localhost:8099
Which contains the hotel’s CRUD functionality:
3. Declare switches and queues
The MQ structure is shown in the figure:
3.1 Introducing dependencies
Introduce the dependency of rabbitmq in hotel-admin and hotel-demo:
<!--amqp--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.2 Configuration file
spring: rabbitmq: host: 192.168.1.100 username: guest password: guest virtual-host: /
3.3 Declaring the name of the queue switch
Create a new class MqConstants
under the cn.itcast.hotel.constants
package in hotel-admin and hotel-demo:
package cn.itcast.hotel.constants; public class MqConstants {<!-- --> /** * switch */ public final static String HOTEL_EXCHANGE = "hotel.topic"; /** * Listen for new and modified queues */ public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue"; /** * Listen for deleted queues */ public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue"; /** * Added or modified RoutingKey */ public final static String HOTEL_INSERT_KEY = "hotel.insert"; /** * Deleted RoutingKey */ public final static String HOTEL_DELETE_KEY = "hotel.delete"; }
3.4 Declaring queue switches
In hotel-demo, define configuration classes, declare queues and switches:
package cn.itcast.hotel.config; import cn.itcast.hotel.constants.MqConstants; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MqConfig {<!-- --> @Bean public TopicExchange topicExchange(){<!-- --> return new TopicExchange(MqConstants. HOTEL_EXCHANGE, true, false); } @Bean public Queue insertQueue(){<!-- --> return new Queue(MqConstants. HOTEL_INSERT_QUEUE, true); } @Bean public Queue deleteQueue(){<!-- --> return new Queue(MqConstants. HOTEL_DELETE_QUEUE, true); } @Bean public Binding insertQueueBinding(){<!-- --> return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY); } @Bean public Binding deleteQueueBinding(){<!-- --> return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY); } }
4. Send MQ message
Send MQ messages respectively in the add, delete, and modify services in hotel-admin:
4.1 Transaction configuration class
Guarantee that Rabbitmq executes after committing the transaction. Transaction Control Details
package cn.itcast.hotel.config; import com.sun.istack.internal.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationManager; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; @Component("afterCommitExecutor") public class AfterCommitExecutor extends TransactionSynchronizationAdapter implements Executor {<!-- --> private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>(); private ThreadPoolExecutor threadPool; private Logger logger = LoggerFactory. getLogger(AfterCommitExecutor. class); @PostConstruct public void init() {<!-- --> logger.debug("Initialize thread pool..."); int availableProcessors = Runtime.getRuntime().availableProcessors(); if (0 >= availableProcessors) {<!-- --> availableProcessors = 1; } int maxPoolSize = (availableProcessors > 5) ? availableProcessors * 2 : 5; logger.debug("CPU Processors :%s MaxPoolSize:%s", availableProcessors, maxPoolSize); threadPool = new ThreadPoolExecutor( availableProcessors, maxPoolSize, 60, TimeUnit. SECONDS, new LinkedBlockingQueue<Runnable>(maxPoolSize * 2), Executors. defaultThreadFactory(), new RejectedExecutionHandler() {<!-- --> @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {<!-- --> logger.debug("Task:%s rejected", r.toString()); if (!executor.isShutdown()) {<!-- --> executor.getQueue().poll(); executor. execute(r); } } } ); } @PreDestroy public void destroy() {<!-- --> logger.debug("Destroy thread pool..."); if (null != threadPool & amp; & amp; !threadPool.isShutdown()) {<!-- --> threadPool. shutdown(); } } @Override public void execute(@NotNull Runnable runnable) {<!-- --> if (!TransactionSynchronizationManager.isSynchronizationActive()) {<!-- --> runnable. run(); return; } List<Runnable> threadRunnables = RUNNABLES. get(); if (threadRunnables == null) {<!-- --> threadRunnables = new ArrayList<Runnable>(); RUNNABLES.set(threadRunnables); TransactionSynchronizationManager. registerSynchronization(this); } threadRunnables. add(runnable); } @Override public void afterCommit() {<!-- --> logger.debug("Transaction submission completed ... "); List<Runnable> threadRunnables = RUNNABLES. get(); for (int i = 0; i <threadRunnables. size(); i ++ ) {<!-- --> Runnable runnable = threadRunnables. get(i); try {<!-- --> threadPool. execute(runnable); } catch (RuntimeException e) {<!-- --> logger. error("", e); } } } @Override public void afterCompletion(int status) {<!-- --> logger.debug("Transaction completed .... "); RUNNABLES. remove(); } }
4.2 service code
@Override @Transactional public boolean save(Hotel hotel) {<!-- --> logger.info("----- into insert service -----"); hotel.setId(Long.getLong(UUID.randomUUID().toString())); int i = hotelMapper.insert(hotel); afterCommitExecutor. execute(new Runnable() {<!-- --> @Override public void run() {<!-- --> rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId().toString()); logger.info("----- rabbitmq send message -----"); } }); if (1==i ||"1".equals(1) ){<!-- --> return true; } else {<!-- --> return false; } } @Override @Transactional public boolean updateById(Hotel hotel) {<!-- --> logger.info("----- into service -----"); //Modify DB data int i = hotelMapper. updateById(hotel); // use AfterCommitExecutor afterCommitExecutor. execute(new Runnable() {<!-- --> @Override public void run() {<!-- --> rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId().toString()); logger.info("----- rabbitmq send message -----"); } }); logger.info("--------- out service ----------"); if (1==i ||"1".equals(1) ){<!-- --> return true; } else {<!-- --> return false; } } @Override public void removeById(Long id) {<!-- --> logger.info("----- into service -----"); //Modify DB data int i = hotelMapper. deleteById(id); // use AfterCommitExecutor afterCommitExecutor. execute(new Runnable() {<!-- --> @Override public void run() {<!-- --> rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id); logger.info("----- rabbitmq send message -----"); } }); logger.info("--------- out service ----------"); }
5. Receive MQ messages
Things to do when hotel-demo receives MQ messages include:
- New message: Query hotel information according to the passed hotel id, and then add a piece of data to the index library
- Delete message: Delete a piece of data in the index library according to the passed hotel id
1) First, add and delete services in IHotelService
under the cn.itcast.hotel.service
package of hotel-demo
void deleteById(Long id); void insertById(Long id);
2) Implement the business in HotelService under the cn.itcast.hotel.service.impl
package in hotel-demo:
@Override public void deleteById(Long id) {<!-- --> try {<!-- --> // 1. Prepare Request DeleteRequest request = new DeleteRequest("hotel", id.toString()); // 2. Send request restHighLevelClient.delete(request, RequestOptions.DEFAULT); } catch (IOException e) {<!-- --> throw new RuntimeException(e); } } @Override public void insertById(Long id) {<!-- --> try {<!-- --> // 0. Query hotel data according to id Hotel hotel = getById(id); // convert to document type HotelDoc hotelDoc = new HotelDoc(hotel); // 1. Prepare the Request object IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString()); // 2. Prepare the Json document request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); // 3. Send request restHighLevelClient.index(request, RequestOptions.DEFAULT); } catch (IOException e) {<!-- --> throw new RuntimeException(e); } }
3) Write a listener
Add a new class in the cn.itcast.hotel.mq
package in hotel-demo:
package cn.itcast.hotel.mq; import cn.itcast.hotel.constants.MqConstants; import cn.itcast.hotel.service.IHotelService; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class HotelListener {<!-- --> @Autowired private IHotelService hotelService; /** * Monitor the business added or modified by the hotel * @param id hotel id */ @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE) public void listenHotelInsertOrUpdate(Long id){<!-- --> hotelService.insertById(id); } /** * Monitor the business deleted by the hotel * @param id hotel id */ @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE) public void listenHotelDelete(Long id){<!-- --> hotelService. deleteById(id); } }