elasticsearch and mysql data synchronization

Table of Contents

  • data synchronization
    • 1. Thought analysis
      • 1. Synchronous call
      • 2. Asynchronous notification
      • 3. Monitor binlog
      • 4. Choose
    • 2. Realize data synchronization
      • 1. Idea
      • 2. Import the demo
      • 3. Declare switches and queues
        • 3.1 Introducing dependencies
        • 3.2 Configuration file
        • 3.3 Declare the queue switch name
        • 3.4 Declaring a Queue Exchange
      • 4. Send MQ message
        • 4.1 Transaction configuration class
        • 4.2 service code
      • 5. Receive MQ message

Data synchronization

The hotel data in elasticsearch comes from the mysql database, so when the mysql data changes, the elasticsearch must also change accordingly. This is the data synchronization between elasticsearch and mysql.

[External link picture transfer failed, the source site may have an anti-leeching mechanism, it is recommended to save the picture Save it and upload directly (img-HAdv7YDC-1679468296276)(assets/image-20210723214758392.png)]

1. Thinking analysis

There are three common data synchronization schemes:

  • synchronous call
  • asynchronous notification
  • monitor binlog

1. Synchronous call

Solution 1: Synchronous call

[External link picture transfer failed, the source site may have an anti-leeching mechanism, it is recommended to save the picture Save it and upload directly (img-2Kr0UnlI-1679466723120)(assets/image-20210723214931869.png)]

The basic steps are as follows:

  • hotel-demo provides an interface to modify the data in elasticsearch
  • After the hotel management service completes the database operation, it directly calls the interface provided by hotel-demo,

2. Asynchronous notification

Solution 2: Asynchronous notification

[External link picture transfer failed, the source site may have an anti-leeching mechanism, it is recommended to save the picture Save it and upload directly (img-H3Zr9Vqz-1679466723124)(assets/image-20210723215140735.png)]

The process is as follows:

  • Hotel-admin sends MQ message after adding, deleting and modifying mysql database data
  • Hotel-demo listens to MQ and completes elasticsearch data modification after receiving the message

3. Monitor binlog

Solution 3: Monitor binlog

[External link picture transfer failed, the source site may have an anti-leeching mechanism, it is recommended to save the picture Save it and upload directly (img-KuKuKj7V-1679466723126)(assets/image-20210723215518541.png)]

The process is as follows:

  • Enable the binlog function for mysql
  • The addition, deletion, and modification operations of mysql will be recorded in the binlog
  • Hotel-demo monitors binlog changes based on canal, and updates the content in elasticsearch in real time

4. Select

Method 1: Synchronous call

  • Advantages: simple to implement, rough
  • Disadvantages: high degree of business coupling

Method 2: Asynchronous notification

  • Advantages: low coupling, generally difficult to implement
  • Disadvantages: rely on the reliability of mq

Method 3: Monitor binlog

  • Advantages: Complete decoupling between services
  • Disadvantages: Enabling binlog increases database burden and high implementation complexity

2. Realize data synchronization

1. Idea

Use the hotel-admin project provided by the pre-class materials as a microservice for hotel management. When the hotel data is added, deleted, or modified, the same operation is required for the data in elasticsearch.

step:

  • Import the hotel-admin project provided by the pre-course materials, start and test the CRUD of hotel data

  • Declare exchange, queue, RoutingKey

  • Complete message sending in the add, delete, and change business in hotel-admin

  • Complete message monitoring in hotel-demo and update data in elasticsearch

  • Start and test the data sync function

2. Import demo

Data: Data here

Import the hotel-admin project provided by the data:

[External link picture transfer failed, the source site may have an anti-leeching mechanism, it is recommended to save the picture Save it and upload directly (img-NgNS6gN5-1679466723128)(assets/image-20210723220237930.png)]

After running, visit http://localhost:8099

[External link picture transfer failed, the source site may have an anti-leeching mechanism, it is recommended to save the picture Save it and upload directly (img-EHjziqrE-1679466723129)(assets/image-20210723220354464.png)]

Which contains the hotel’s CRUD functionality:

[External link picture transfer failed, the source site may have an anti-leeching mechanism, it is recommended to save the picture Save it and upload directly (img-EesTGlEK-1679466723129)(assets/image-20210723220511090.png)]

3. Declare switches and queues

The MQ structure is shown in the figure:

[External link picture transfer failed, the source site may have an anti-leeching mechanism, it is recommended to save the picture Save it and upload directly (img-6IzJaY3o-1679466723130)(assets/image-20210723215850307.png)]

3.1 Introducing dependencies

Introduce the dependency of rabbitmq in hotel-admin and hotel-demo:

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 Configuration file

spring:
  rabbitmq:
    host: 192.168.1.100
    username: guest
    password: guest
    virtual-host: /

3.3 Declaring the name of the queue switch

Create a new class MqConstants under the cn.itcast.hotel.constants package in hotel-admin and hotel-demo:

package cn.itcast.hotel.constants;

    public class MqConstants {<!-- -->
    /**
     * switch
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";
    /**
     * Listen for new and modified queues
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    /**
     * Listen for deleted queues
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    /**
     * Added or modified RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";
    /**
     * Deleted RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}

3.4 Declaring queue switches

In hotel-demo, define configuration classes, declare queues and switches:

package cn.itcast.hotel.config;

import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {<!-- -->
    @Bean
    public TopicExchange topicExchange(){<!-- -->
        return new TopicExchange(MqConstants. HOTEL_EXCHANGE, true, false);
    }

    @Bean
    public Queue insertQueue(){<!-- -->
        return new Queue(MqConstants. HOTEL_INSERT_QUEUE, true);
    }

    @Bean
    public Queue deleteQueue(){<!-- -->
        return new Queue(MqConstants. HOTEL_DELETE_QUEUE, true);
    }

    @Bean
    public Binding insertQueueBinding(){<!-- -->
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding(){<!-- -->
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

4. Send MQ message

Send MQ messages respectively in the add, delete, and modify services in hotel-admin:

[External link picture transfer failed, the source site may have an anti-leeching mechanism, it is recommended to save the picture Save it and upload directly (img-nelp7h1M-1679466723130)(assets/image-20210723221843816.png)]

4.1 Transaction configuration class

Guarantee that Rabbitmq executes after committing the transaction. Transaction Control Details

package cn.itcast.hotel.config;

import com.sun.istack.internal.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

@Component("afterCommitExecutor")
public class AfterCommitExecutor extends TransactionSynchronizationAdapter implements Executor {<!-- -->
    private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
    private ThreadPoolExecutor threadPool;

    private Logger logger = LoggerFactory. getLogger(AfterCommitExecutor. class);
    
    @PostConstruct
    public void init() {<!-- -->
        logger.debug("Initialize thread pool...");
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        if (0 >= availableProcessors) {<!-- -->
            availableProcessors = 1;
        }
        int maxPoolSize = (availableProcessors > 5) ? availableProcessors * 2 : 5;
        logger.debug("CPU Processors :%s MaxPoolSize:%s", availableProcessors, maxPoolSize);
        threadPool = new ThreadPoolExecutor(
            availableProcessors,
            maxPoolSize,
            60,
            TimeUnit. SECONDS,
            new LinkedBlockingQueue<Runnable>(maxPoolSize * 2),
            Executors. defaultThreadFactory(),
            new RejectedExecutionHandler() {<!-- -->
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {<!-- -->
                    logger.debug("Task:%s rejected", r.toString());
                    if (!executor.isShutdown()) {<!-- -->
                        executor.getQueue().poll();
                        executor. execute(r);
                    }
                }
            }
        );
    }

    @PreDestroy
    public void destroy() {<!-- -->
        logger.debug("Destroy thread pool...");
        if (null != threadPool & amp; & amp; !threadPool.isShutdown()) {<!-- -->
            threadPool. shutdown();
        }
    }

    @Override
    public void execute(@NotNull Runnable runnable) {<!-- -->
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {<!-- -->
            runnable. run();
            return;
        }
        List<Runnable> threadRunnables = RUNNABLES. get();
        if (threadRunnables == null) {<!-- -->
            threadRunnables = new ArrayList<Runnable>();
            RUNNABLES.set(threadRunnables);
            TransactionSynchronizationManager. registerSynchronization(this);
        }
        threadRunnables. add(runnable);
    }

    @Override
    public void afterCommit() {<!-- -->
        logger.debug("Transaction submission completed ... ");
        List<Runnable> threadRunnables = RUNNABLES. get();
        for (int i = 0; i <threadRunnables. size(); i ++ ) {<!-- -->
            Runnable runnable = threadRunnables. get(i);
            try {<!-- -->
                threadPool. execute(runnable);
            } catch (RuntimeException e) {<!-- -->
                logger. error("", e);
            }
        }
    }

    @Override
    public void afterCompletion(int status) {<!-- -->
        logger.debug("Transaction completed .... ");
        RUNNABLES. remove();
    }
}

4.2 service code

 @Override
    @Transactional
    public boolean save(Hotel hotel) {<!-- -->
        logger.info("----- into insert service -----");
        hotel.setId(Long.getLong(UUID.randomUUID().toString()));
        int i = hotelMapper.insert(hotel);
        afterCommitExecutor. execute(new Runnable() {<!-- -->
            @Override
            public void run() {<!-- -->
                rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId().toString());
                logger.info("----- rabbitmq send message -----");
            }
        });


        if (1==i ||"1".equals(1) ){<!-- -->
            return true;

        } else {<!-- -->
            return false;
        }
    }

  
    @Override
    @Transactional
    public boolean updateById(Hotel hotel) {<!-- -->
        logger.info("----- into service -----");

        //Modify DB data
        int i = hotelMapper. updateById(hotel);

        // use AfterCommitExecutor
        afterCommitExecutor. execute(new Runnable() {<!-- -->
            @Override
            public void run() {<!-- -->
                rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId().toString());
                logger.info("----- rabbitmq send message -----");
            }
        });

        logger.info("--------- out service ----------");

        if (1==i ||"1".equals(1) ){<!-- -->
            return true;

        } else {<!-- -->
            return false;
        }
    }

    @Override
    public void removeById(Long id) {<!-- -->
        logger.info("----- into service -----");

        //Modify DB data
        int i = hotelMapper. deleteById(id);

        // use AfterCommitExecutor
        afterCommitExecutor. execute(new Runnable() {<!-- -->
            @Override
            public void run() {<!-- -->
                rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
                logger.info("----- rabbitmq send message -----");
            }
        });

        logger.info("--------- out service ----------");


    }

5. Receive MQ messages

Things to do when hotel-demo receives MQ messages include:

  • New message: Query hotel information according to the passed hotel id, and then add a piece of data to the index library
  • Delete message: Delete a piece of data in the index library according to the passed hotel id

1) First, add and delete services in IHotelService under the cn.itcast.hotel.service package of hotel-demo

void deleteById(Long id);

void insertById(Long id);

2) Implement the business in HotelService under the cn.itcast.hotel.service.impl package in hotel-demo:

@Override
public void deleteById(Long id) {<!-- -->
    try {<!-- -->
        // 1. Prepare Request
        DeleteRequest request = new DeleteRequest("hotel", id.toString());
        // 2. Send request
        restHighLevelClient.delete(request, RequestOptions.DEFAULT);
    } catch (IOException e) {<!-- -->
        throw new RuntimeException(e);
    }
}

@Override
public void insertById(Long id) {<!-- -->
    try {<!-- -->
        // 0. Query hotel data according to id
        Hotel hotel = getById(id);
        // convert to document type
        HotelDoc hotelDoc = new HotelDoc(hotel);

        // 1. Prepare the Request object
        IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
        // 2. Prepare the Json document
        request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
        // 3. Send request
        restHighLevelClient.index(request, RequestOptions.DEFAULT);
    } catch (IOException e) {<!-- -->
        throw new RuntimeException(e);
    }
}

3) Write a listener

Add a new class in the cn.itcast.hotel.mq package in hotel-demo:

package cn.itcast.hotel.mq;

import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HotelListener {<!-- -->

    @Autowired
    private IHotelService hotelService;

    /**
     * Monitor the business added or modified by the hotel
     * @param id hotel id
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id){<!-- -->
        hotelService.insertById(id);
    }

    /**
     * Monitor the business deleted by the hotel
     * @param id hotel id
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id){<!-- -->
        hotelService. deleteById(id);
    }
}