Dependencies: In addition to the following two, there are also mybatisplus, MySQL, and lombok
<!-- This dependency is to monitor MySQL data changes --> <dependency> <groupId>com.zendesk</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.27.1</version> <!--Version 2022.09.17--> </dependency> <!-- Create a SpringBoot project and add the redis starter dependency --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
Implementation idea: All data in the synchronized database needs to be read out
- First query all table names and primary keys in a database
- Query all the data in the table based on the table name (because the values corresponding to the field names are needed, all are stored in the map)
- Store the encapsulated data in the middleware (I use Redis here)
Code implementation:
- Entity class: table name, table primary key, table data (field name, value)
-
@Data public class TableDataAndKey { String tableName; String key; List<Map<String,Object>> data; }
-
- Read database data (the first query passes in the database name, and all table names and primary keys are retrieved. The second query passes in the table name, and all the data in the table is stored in the map)
-
<mapper namespace="com.lcx.mapper.MapperDb"> <select id="allTableName" resultType="com.lcx.entity.TableDataAndKey"> select a.TABLE_NAME 'tableName',a.column_name 'key' from information_schema.Columns a where TABLE_SCHEMA=#{dbName} AND COLUMN_KEY='PRI' </select> <select id="selectList" resultType="java.util.Map" parameterType="java.lang.String"> select * from ${tableName} </select> </mapper>
@Autowired MapperDb mapperDb; //Storage table data (pojo: table name, primary key, map<field name, value>) public static List<TableDataAndKey> tableData; public void tableNameAll(String name){ //Read all table names and primary keys in the zy database List<TableDataAndKey> tableName = mapperDb.allTableName(name); for (TableDataAndKey list : tableName) { //Query the data in the table based on the table name and store it in the map List<Map<String, Object>> maps = mapperDb.selectList(list.getTableName()); list.setData(maps); } tableData=tableName; }
In this way, the data of all tables in the database has been stored in the entity class, and then a static list variable is defined to store it. I have implemented the ApplicationRunner interface here and overridden the run method. When the project is started, just call the tableNameAll() method and pass in the database name. In this way, the static list variable has data.
-
- Store the data in the middleware (the Redis I use here can be used with kafka, etc.)
-
@Component @Order(2)//Specifies the order of execution public class DataToRedis implements ApplicationRunner { private static final String PRODUCT_LIST_KEY = "key_test"; @Autowired RedisTemplate redisTemplate; @Override public void run (ApplicationArguments args) throws Exception { redisTemplate.delete(PRODUCT_LIST_KEY); //Store data into redis getProductListByDataSource(); } //Insert redis private void getProductListByDataSource(){ //Cache data into redis //tableData: The final structure obtained by querying the database is: (table name, primary key field, list<map<field name, value>>) redisTemplate.opsForList().rightPushAll(PRODUCT_LIST_KEY,tableData); //Set the current key expiration time to 1 hour redisTemplate.expire(PRODUCT_LIST_KEY,1000*60*60, TimeUnit.MILLISECONDS); } }
In this way, the data is stored in Redis.
-
Then there is how to monitor MySQL data changes (you need to start the bin-log of mysql, otherwise you will not be able to monitor it)
Just add the following configurations to the my.ini file:
log-bin=mysql-bin #[Must] enable binary log server-id=100 #[Required] The unique ID of the server. If it is used for multiple MySQLs, the ID should not be repeated. binlog-format = ROW #This sentence must also be added, explained below
I’m not sure if it’s on or not.
The following code can be copied directly. I also copied it and need to change the IP, port, username, and password.
/** * Connect to mysqlBinLog (monitor MySQL data changes) */ public void connectMysqlBinLog() { log.info("Monitoring BinLog service has started"); //Your own MySQL information. host, port, username, password BinaryLogClient client = new BinaryLogClient("localhost", 3308, "root", "root"); /**Because binlog is not divided in units of databases, monitoring binglog is not a single database monitored, but the entire currently connected MySQL. *If any data addition, deletion or modification occurs in any of the libraries, it can be detected here. *So there is no need to set the name of the monitored database (I don’t know how to set it, and I haven’t found a constructor that contains this parameter) *If you need to monitor only the specified database, you can look at the following code to get the name of the currently changed database. You can decide whether to monitor based on the name **/ client.setServerId(100); //Stay consistent with the server-id I set before, but I don’t know why it can succeed even if it is inconsistent. client.registerEventListener(event -> { EventData data = event.getData(); if (data instanceof TableMapEventData) { //As long as the addition, deletion and modification operations occur in the connected MySQL, they will enter here, no matter which database TableMapEventData tableMapEventData = (TableMapEventData) data; //You can obtain the currently changed database by converting tableMapEventData into an instance of the TableMapEventData class. System.out.println("Changed database:" + tableMapEventData.getDatabase()); System.out.print("TableName:"); //table name System.out.println(tableMapEventData.getTable()); System.out.println(tableMapEventData); } //Triggered when table data is modified if (data instanceof UpdateRowsEventData) { System.out.print("Update:"); System.out.println(data); //Triggered when table data is inserted } else if (data instanceof WriteRowsEventData) { System.out.print("Insert:"); System.out.println(data); //Triggered after table data is deleted } else if (data instanceof DeleteRowsEventData) { System.out.print("Delete:"); System.out.println(data); } }); try { client.connect(); } catch (IOException e) { e.printStackTrace(); } }
This is almost complete. The following is the complete code: The first file is to enable monitoring and store all data in the database into static lsit variables (both of which are executed when the project is started. The execution sequence is determined by @Order(1 )
package com.lcx.run; import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.*; import com.lcx.entity.TableDataAndKey; import com.lcx.mapper.MapperDb; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.List; import java.util.Map; //This class can monitor the addition, deletion and modification of MySQL database data @Component @Slf4j //Used to print logs @Order(1)//Specifies the order of execution //In SpringBoot, an interface is provided: ApplicationRunner. //There is only one run method in this interface. The timing of its execution is: after the spring container is started, the run method of this interface implementation class will be executed immediately. public class MysqlBinLogClient implements ApplicationRunner { @Autowired MapperDb mapperDb; //Storage table data (pojo: table name, primary key, map) public static List tableData; @Override public void run(ApplicationArguments args) throws Exception { //Project startup completes connection bin-log new Thread(() -> { connectMysqlBinLog(); }).start(); //All tables in a database tableNameAll("zy"); } /** * Connect to mysqlBinLog (monitor MySQL data changes) */ public void connectMysqlBinLog() { log.info("Monitoring BinLog service has started"); //Your own MySQL information. host, port, username, password BinaryLogClient client = new BinaryLogClient("localhost", 3308, "root", "root"); /**Because binlog is not divided in units of databases, monitoring binglog is not a single database monitored, but the entire currently connected MySQL. *If any data addition, deletion or modification occurs in any of the libraries, it can be detected here. *So there is no need to set the name of the monitored database (I don’t know how to set it, and I haven’t found a constructor that contains this parameter) *If you need to monitor only the specified database, you can look at the following code to get the name of the currently changed database. You can decide whether to monitor based on the name **/ client.setServerId(100); //Stay consistent with the server-id I set before, but I don’t know why it can succeed even if it is inconsistent. client.registerEventListener(event -> { EventData data = event.getData(); if (data instanceof TableMapEventData) { //As long as the addition, deletion and modification operations occur in the connected MySQL, they will enter here, no matter which database TableMapEventData tableMapEventData = (TableMapEventData) data; //You can obtain the currently changed database by converting tableMapEventData into an instance of the TableMapEventData class. System.out.println("Changed database:" + tableMapEventData.getDatabase()); System.out.print("TableName:"); //table name System.out.println(tableMapEventData.getTable()); System.out.println(tableMapEventData); } //Triggered when table data is modified if (data instanceof UpdateRowsEventData) { System.out.print("Update:"); System.out.println(data); //Triggered when table data is inserted } else if (data instanceof WriteRowsEventData) { System.out.print("Insert:"); System.out.println(data); //Triggered after table data is deleted } else if (data instanceof DeleteRowsEventData) { System.out.print("Delete:"); System.out.println(data); } }); try { client.connect(); } catch (IOException e) { e.printStackTrace(); } } /** * Data of all tables in zy database (pojo: table name, primary key, map (field name, value)) */ public void tableNameAll(String name){ //Read all table names and primary keys in the zy database List tableName = mapperDb.allTableName(name); for (TableDataAndKey list : tableName) { //Query the data in the table based on the table name and store it in the map List
This is to store the above static lsit variable into redis
package com.lcx.run; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.lcx.entity.Student; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestBody; import java.util.List; import java.util.concurrent.TimeUnit; import static com.lcx.run.MysqlBinLogClient.tableData; @Component @Order(2)//Specifies the order of execution public class DataToRedis implements ApplicationRunner { private static final String PRODUCT_LIST_KEY = "key_test"; @Autowired RedisTemplate redisTemplate; @Override public void run (ApplicationArguments args) throws Exception { //Clear redis first redisTemplate.delete(PRODUCT_LIST_KEY); //Store data into redis getProductListByDataSource(); } public List<Student> redisStuList(@RequestBody Page<Student> page){ //Get paging data from cache List<Student> productList = getProductListByRedis(page); if (productList == null || productList.size() == 0) { //Get the paging data from the database and store it in redis getProductListByDataSource(); productList = getProductListByRedis(page); } return productList; } //Read redis private List<Student> getProductListByRedis(Page<Student> iPage) { int size= (int) iPage.getSize(); int current= (int) (iPage.getCurrent()-1)*size; //Start position current // End position int end = current + size-1; return redisTemplate.opsForList().range(PRODUCT_LIST_KEY, current, end); } //Insert redis private void getProductListByDataSource(){ //Cache data into redis //tableData constant: The final structure obtained by querying the database is: (table name, primary key field, list<map<field name, value>>) redisTemplate.opsForList().rightPushAll(PRODUCT_LIST_KEY,tableData); //Set the current key expiration time to 1 hour redisTemplate.expire(PRODUCT_LIST_KEY,1000*60*60, TimeUnit.MILLISECONDS); } }
Storing Redis will cause chaos. The following is the configuration file of Redis
package com.lcx.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){ //Create template RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); // Set up the connection factory redisTemplate.setConnectionFactory(redisConnectionFactory); // Set up the serialization tool GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer(); // key and hashKey are serialized using string redisTemplate.setKeySerializer(RedisSerializer.string()); redisTemplate.setHashKeySerializer(RedisSerializer.string()); // value and hashValue are serialized using JSON redisTemplate.setValueSerializer(jsonRedisSerializer); redisTemplate.setHashValueSerializer(jsonRedisSerializer); return redisTemplate; } }
yml file
spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://127.0.0.1:3308/zy?useUnicode=true &characterEncoding=UTF-8 &serverTimezone=UTC username: root password: root redis: database: 0 host: 127.0.0.1 port: 6379 password: root jedis: pool: #Maximum number of connections max-active: 8 #Maximum blocking waiting time (negative number means no limit) max-wait: -1 #MaxIdle max-idle: 8 #Minimum idle min-idle: 0 cache: redis: time-to-live: 360000000 mybatis-plus: mapper-locations: classpath*:mapper/*.xml configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
Summary: The code is very simple. I think the main difficulty is how to query the table name and primary key field of the database. This is a producer’s idea and code implementation. If you are interested, you can take a look.
I just found a job after my internship in my junior year. Also made a way to consume kafka data into MySQL. Producers are used to monitor data changes in sqlServer. The producer code of sqlServer was written before and stored in kafka. I had nothing to do and studied the code for monitoring MySQL data changes.