MySQL implements data synchronization (producer) and stores data in Redis (SpringBoot)

Dependencies: In addition to the following two, there are also mybatisplus, MySQL, and lombok

<!-- This dependency is to monitor MySQL data changes -->
        <dependency>
            <groupId>com.zendesk</groupId>
            <artifactId>mysql-binlog-connector-java</artifactId>
            <version>0.27.1</version> <!--Version 2022.09.17-->
        </dependency>

<!-- Create a SpringBoot project and add the redis starter dependency -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

Implementation idea: All data in the synchronized database needs to be read out

  1. First query all table names and primary keys in a database
  2. Query all the data in the table based on the table name (because the values corresponding to the field names are needed, all are stored in the map)
  3. Store the encapsulated data in the middleware (I use Redis here)

Code implementation:

  1. Entity class: table name, table primary key, table data (field name, value)
    1. @Data
      public class TableDataAndKey {
          String tableName;
          String key;
          
          List<Map<String,Object>> data;
      }
  2. Read database data (the first query passes in the database name, and all table names and primary keys are retrieved. The second query passes in the table name, and all the data in the table is stored in the map)
    1. <mapper namespace="com.lcx.mapper.MapperDb">
          
          <select id="allTableName" resultType="com.lcx.entity.TableDataAndKey">
              select a.TABLE_NAME 'tableName',a.column_name 'key' from information_schema.Columns a
              where TABLE_SCHEMA=#{dbName} AND COLUMN_KEY='PRI'
          </select>
          <select id="selectList" resultType="java.util.Map" parameterType="java.lang.String">
              select * from ${tableName}
          </select>
      
      
      </mapper>

       @Autowired
          MapperDb mapperDb;
          
          //Storage table data (pojo: table name, primary key, map<field name, value>)
          public static List<TableDataAndKey> tableData;
       
          public void tableNameAll(String name){
              //Read all table names and primary keys in the zy database
              List<TableDataAndKey> tableName = mapperDb.allTableName(name);
              for (TableDataAndKey list : tableName) {
                  //Query the data in the table based on the table name and store it in the map
                  List<Map<String, Object>> maps = mapperDb.selectList(list.getTableName());
                  list.setData(maps);
              }
              tableData=tableName;
          }

      In this way, the data of all tables in the database has been stored in the entity class, and then a static list variable is defined to store it. I have implemented the ApplicationRunner interface here and overridden the run method. When the project is started, just call the tableNameAll() method and pass in the database name. In this way, the static list variable has data.

      
      
  3. Store the data in the middleware (the Redis I use here can be used with kafka, etc.)
    1. @Component
      @Order(2)//Specifies the order of execution
      public class DataToRedis implements ApplicationRunner {
          private static final String PRODUCT_LIST_KEY = "key_test";
          @Autowired
          RedisTemplate redisTemplate;
      
      
          @Override
          public void run (ApplicationArguments args) throws Exception {
              redisTemplate.delete(PRODUCT_LIST_KEY);
              //Store data into redis
              getProductListByDataSource();
          }
          
      
      
      
          
          //Insert redis
          private void getProductListByDataSource(){
              //Cache data into redis
              //tableData: The final structure obtained by querying the database is: (table name, primary key field, list<map<field name, value>>)
              redisTemplate.opsForList().rightPushAll(PRODUCT_LIST_KEY,tableData);
              //Set the current key expiration time to 1 hour
              redisTemplate.expire(PRODUCT_LIST_KEY,1000*60*60, TimeUnit.MILLISECONDS);
          }
      
         
      }
      

      In this way, the data is stored in Redis.

Then there is how to monitor MySQL data changes (you need to start the bin-log of mysql, otherwise you will not be able to monitor it)

Just add the following configurations to the my.ini file:

log-bin=mysql-bin #[Must] enable binary log
server-id=100 #[Required] The unique ID of the server. If it is used for multiple MySQLs, the ID should not be repeated.
binlog-format = ROW #This sentence must also be added, explained below

I’m not sure if it’s on or not.

The following code can be copied directly. I also copied it and need to change the IP, port, username, and password.

 /**
     * Connect to mysqlBinLog (monitor MySQL data changes)
     */
    public void connectMysqlBinLog() {
        log.info("Monitoring BinLog service has started");

        //Your own MySQL information. host, port, username, password
        BinaryLogClient client = new BinaryLogClient("localhost", 3308, "root", "root");
        /**Because binlog is not divided in units of databases, monitoring binglog is not a single database monitored, but the entire currently connected MySQL.
         *If any data addition, deletion or modification occurs in any of the libraries, it can be detected here.
         *So there is no need to set the name of the monitored database (I don’t know how to set it, and I haven’t found a constructor that contains this parameter)
         *If you need to monitor only the specified database, you can look at the following code to get the name of the currently changed database. You can decide whether to monitor based on the name
         **/

        client.setServerId(100); //Stay consistent with the server-id I set before, but I don’t know why it can succeed even if it is inconsistent.

        
        client.registerEventListener(event -> {
            EventData data = event.getData();
            if (data instanceof TableMapEventData) {
                //As long as the addition, deletion and modification operations occur in the connected MySQL, they will enter here, no matter which database

                TableMapEventData tableMapEventData = (TableMapEventData) data;

                //You can obtain the currently changed database by converting tableMapEventData into an instance of the TableMapEventData class.
                System.out.println("Changed database:" + tableMapEventData.getDatabase());
                
                System.out.print("TableName:");
                //table name
                System.out.println(tableMapEventData.getTable());
                System.out.println(tableMapEventData);
            }
            //Triggered when table data is modified
            if (data instanceof UpdateRowsEventData) {
                System.out.print("Update:");
                System.out.println(data);
                //Triggered when table data is inserted
            } else if (data instanceof WriteRowsEventData) {
                System.out.print("Insert:");
                System.out.println(data);
                //Triggered after table data is deleted
            } else if (data instanceof DeleteRowsEventData) {
                System.out.print("Delete:");
                System.out.println(data);
            }
        });

        try {
            client.connect();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

This is almost complete. The following is the complete code: The first file is to enable monitoring and store all data in the database into static lsit variables (both of which are executed when the project is started. The execution sequence is determined by @Order(1 )

package com.lcx.run;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.lcx.entity.TableDataAndKey;
import com.lcx.mapper.MapperDb;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;
import java.util.Map;

//This class can monitor the addition, deletion and modification of MySQL database data
@Component
@Slf4j //Used to print logs
@Order(1)//Specifies the order of execution
//In SpringBoot, an interface is provided: ApplicationRunner.
//There is only one run method in this interface. The timing of its execution is: after the spring container is started, the run method of this interface implementation class will be executed immediately.
public class MysqlBinLogClient implements ApplicationRunner {
    @Autowired
    MapperDb mapperDb;
    
    //Storage table data (pojo: table name, primary key, map)
    public static List tableData;
    
    
    @Override
    public void run(ApplicationArguments args) throws Exception {
        //Project startup completes connection bin-log
        new Thread(() -> {
            connectMysqlBinLog();
        }).start();
        //All tables in a database
        tableNameAll("zy");
    }

    /**
     * Connect to mysqlBinLog (monitor MySQL data changes)
     */
    public void connectMysqlBinLog() {
        log.info("Monitoring BinLog service has started");

        //Your own MySQL information. host, port, username, password
        BinaryLogClient client = new BinaryLogClient("localhost", 3308, "root", "root");
        /**Because binlog is not divided in units of databases, monitoring binglog is not a single database monitored, but the entire currently connected MySQL.
         *If any data addition, deletion or modification occurs in any of the libraries, it can be detected here.
         *So there is no need to set the name of the monitored database (I don’t know how to set it, and I haven’t found a constructor that contains this parameter)
         *If you need to monitor only the specified database, you can look at the following code to get the name of the currently changed database. You can decide whether to monitor based on the name
         **/

        client.setServerId(100); //Stay consistent with the server-id I set before, but I don’t know why it can succeed even if it is inconsistent.

        
        client.registerEventListener(event -> {
            EventData data = event.getData();
            if (data instanceof TableMapEventData) {
                //As long as the addition, deletion and modification operations occur in the connected MySQL, they will enter here, no matter which database

                TableMapEventData tableMapEventData = (TableMapEventData) data;

                //You can obtain the currently changed database by converting tableMapEventData into an instance of the TableMapEventData class.
                System.out.println("Changed database:" + tableMapEventData.getDatabase());
                
                System.out.print("TableName:");
                //table name
                System.out.println(tableMapEventData.getTable());
                System.out.println(tableMapEventData);
            }
            //Triggered when table data is modified
            if (data instanceof UpdateRowsEventData) {
                System.out.print("Update:");
                System.out.println(data);
                //Triggered when table data is inserted
            } else if (data instanceof WriteRowsEventData) {
                System.out.print("Insert:");
                System.out.println(data);
                //Triggered after table data is deleted
            } else if (data instanceof DeleteRowsEventData) {
                System.out.print("Delete:");
                System.out.println(data);
            }
        });

        try {
            client.connect();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    /**
     * Data of all tables in zy database (pojo: table name, primary key, map (field name, value))
     */
    public void tableNameAll(String name){
        //Read all table names and primary keys in the zy database
        List tableName = mapperDb.allTableName(name);
        for (TableDataAndKey list : tableName) {
            //Query the data in the table based on the table name and store it in the map
            List> maps = mapperDb.selectList(list.getTableName());
            list.setData(maps);
        }
        tableData=tableName;
    }
}

This is to store the above static lsit variable into redis

package com.lcx.run;


import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.lcx.entity.Student;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;



import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.lcx.run.MysqlBinLogClient.tableData;




@Component
@Order(2)//Specifies the order of execution
public class DataToRedis implements ApplicationRunner {
    private static final String PRODUCT_LIST_KEY = "key_test";
    @Autowired
    RedisTemplate redisTemplate;


    @Override
    public void run (ApplicationArguments args) throws Exception {
        //Clear redis first
        redisTemplate.delete(PRODUCT_LIST_KEY);
        //Store data into redis
        getProductListByDataSource();
    }
    
    public List<Student> redisStuList(@RequestBody Page<Student> page){
        //Get paging data from cache
        List<Student> productList = getProductListByRedis(page);
        if (productList == null || productList.size() == 0) {
            //Get the paging data from the database and store it in redis
            getProductListByDataSource();
            productList = getProductListByRedis(page);
        }
        return productList;
    }
    

    //Read redis
    private List<Student> getProductListByRedis(Page<Student> iPage) {
      
        
        int size= (int) iPage.getSize();
        int current= (int) (iPage.getCurrent()-1)*size;
        //Start position current
        // End position
        int end = current + size-1;
        return redisTemplate.opsForList().range(PRODUCT_LIST_KEY, current, end);
    }

    
    //Insert redis
    private void getProductListByDataSource(){
        //Cache data into redis
        //tableData constant: The final structure obtained by querying the database is: (table name, primary key field, list<map<field name, value>>)
        redisTemplate.opsForList().rightPushAll(PRODUCT_LIST_KEY,tableData);
        //Set the current key expiration time to 1 hour
        redisTemplate.expire(PRODUCT_LIST_KEY,1000*60*60, TimeUnit.MILLISECONDS);
    }

   
}

Storing Redis will cause chaos. The following is the configuration file of Redis

package com.lcx.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
        //Create template
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        // Set up the connection factory
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // Set up the serialization tool
        GenericJackson2JsonRedisSerializer jsonRedisSerializer =
                new GenericJackson2JsonRedisSerializer();
        // key and hashKey are serialized using string
        redisTemplate.setKeySerializer(RedisSerializer.string());
        redisTemplate.setHashKeySerializer(RedisSerializer.string());
        // value and hashValue are serialized using JSON
        redisTemplate.setValueSerializer(jsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jsonRedisSerializer);
        return redisTemplate;
    }
}

yml file

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3308/zy?useUnicode=true &characterEncoding=UTF-8 &serverTimezone=UTC
    username: root
    password: root

  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    password: root
    jedis:
      pool:
        #Maximum number of connections
        max-active: 8
        #Maximum blocking waiting time (negative number means no limit)
        max-wait: -1
        #MaxIdle
        max-idle: 8
        #Minimum idle
        min-idle: 0

  cache:
    redis:
      time-to-live: 360000000
mybatis-plus:
  mapper-locations: classpath*:mapper/*.xml

  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl


Summary: The code is very simple. I think the main difficulty is how to query the table name and primary key field of the database. This is a producer’s idea and code implementation. If you are interested, you can take a look.

I just found a job after my internship in my junior year. Also made a way to consume kafka data into MySQL. Producers are used to monitor data changes in sqlServer. The producer code of sqlServer was written before and stored in kafka. I had nothing to do and studied the code for monitoring MySQL data changes.