Real-time data subscription of Mysql using canal and openfire

Article directory

    • 1. Openfire plug-in receives binlog data
      • 1.1. Create user group
      • 1.2. Interface implementation
    • 2. Canal client development
    • 3. Smack message client implementation.


Mysql binlog real-time data subscription

(1)
canal installation and client use

(2)
openfire 4.7.5 Web plug-in development

(3)
Real-time data subscription of Mysql using canal and openfire

The business system generates between 5,000 and 10,000 data every day. The amount of data is not large, but the number of users subscribing to this data is relatively large. Currently, it needs to support more than 100 users and requires real-time synchronization of data. The generated data contains categories. Users subscribe to the data they need based on actual purchases, but the total number of categories is not large. Users can be divided into different groups, and users in the same group subscribe to the same category data.
Simple idea and implementation:
1. To achieve real-time data synchronization, the best way is based on binlog log synchronization. Optional binlog reading tools: canal, floinkcdc, etc.
2. To synchronize data to the user’s database, options:
(1) Save binlog data to kafka, and install a client to read kafka on the user’s server to implement data storage. However, this solution is inconvenient for user management.
(2) The server and client realize data synchronization based on netty. The binlog data is sent to the netty server, and the netty server then synchronizes the data to the netty client through the transmission protocol and stores it in the database. However, this solution is more feasible, but the development workload is relatively large.
(3) Use the open source openfire instant messaging server and the client Smack library to achieve data synchronization on the server side and client side. The development workload of this solution is not large, and Openfire comes with user and group management, and a single machine can also support 10,000 concurrent users. , there will be no pressure for user growth in the future. However, developers need to learn openfire related technologies first, which requires a certain amount of learning time.
This article uses scheme (3) to test the data synchronization scheme.
The data synchronization link is as follows:

1. Openfire plug-in receives binlog data

1.1. Create user group

Create the user group data-group in the openfire background, and add users test1 and test2 to the group. The data in this group is only synchronized to the two users test1 and test2.

1.2, Interface implementation

The implementation code is based on the Jersey interface example of the previous openfire 4.7.5 Web plug-in development, adding the MessageService class, implementing the http interface for receiving data, receiving sql data from the canal client, and sending the data to the Smack client.

package org.igniterealtime.openfire.service;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.igniterealtime.openfire.exampleplugin.DbCdcPlugin;
import org.jivesoftware.admin.AuthCheckFilter;
import org.jivesoftware.openfire.MessageRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import javax.annotation.PostConstruct;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.util.*;

@Path("dbcdc/v1/message")
public class MessageService {<!-- -->
    private static final Logger log = LoggerFactory.getLogger(MessageService.class);
    private DbCdcPlugin plugin;
    @PostConstruct
    public void init() {<!-- -->
        plugin = (DbCdcPlugin) XMPPServer.getInstance().getPluginManager().getPlugin( "dbcdc" );
        AuthCheckFilter.addExclude("dbcdc/v1/message/sendMsgToOne");
    }

    // http://localhost:9090/plugins/dbcdc/v1/message/sendMsgToOne?msg=aaaaabbbbbb
    @POST
    @Path("/sendMsgToOne")
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
    public Map<String, Object> sendMsgToOne(@FormParam("msg") String msg) throws Exception
    {<!-- -->
        List<String> jids = getMembers();
        // Synchronize binlog for users in the group
        Map<String, Object> data = new HashMap<>();
        data.put("msg", msg);
        String from = "[email protected]";
        jids.forEach(jid->{<!-- -->
            String to = jid;
            Message message = new Message();
            message.setFrom(new JID(from));
            message.setTo(new JID(to));
            message.setID(nextID());
            message.setType(Message.Type.chat);
            message.setBody(msg);
            MessageRouter messageRouter = plugin.server.getMessageRouter();
            messageRouter.route(message);
            data.put(jid, "success");
        });
        return data;
    }

    private List<String> getMembers(){<!-- -->
        List<String> list = new ArrayList();
        try{<!-- -->
            // The test is only sent to the data-group user group
            Collection<JID> jids = plugin.groupManager.getGroup("data-group").getMembers();
            for(JID jid:jids){<!-- -->
                list.add(jid.toBareJID());
            }
        }
        catch(Exception e){<!-- -->
           log.error("getMembers error=====",e);
        }
        return list;
    }
    
    public static synchronized String nextID() {<!-- -->
        Random random = new Random();
        int number1 = random.nextInt(899) + 100;
        int number2 = random.nextInt(899) + 100;
        return new StringBuffer().append(number1).append("-").append(number2).toString();
    }
}

2. Canal client development

The Canal client receives the binlog data transmitted by the Canal server, converts it into the corresponding SQL statement, and then sends it to the Openfire server for distribution to users.
Based on the previous canal installation and client use, the CanalClient class is re-implemented. The example only handles three SQL operations: delete, update, and insert.

package com.penngo.canal.component;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.jsoup.Connection;
import org.jsoup.Jsoup;
import org.springframework.stereotype.Component;
import java.util.List;

@Component
@Slf4j
public class CanalClient {<!-- -->

    private final static String imUrl = "http://localhost:9090/plugins/dbcdc/v1/message/sendMsgToOne";
    @Resource
    private CanalConnector canalConnector;
    /**
     * canal storage method
     */
    public void run() {<!-- -->

        int batchSize = 1000;
        try {<!-- -->
            canalConnector.connect();
            canalConnector.subscribe("flinktest\..*");
            canalConnector.rollback();
            try {<!-- -->
                while (true) {<!-- -->
                    Message message = canalConnector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    System.out.println("batchId=======" + batchId + ",size:" + size);
                    if (batchId == -1 || size == 0) {<!-- -->
                        Thread.sleep(1000);
                    } else {<!-- -->
                        dataHandle(message.getEntries());
                    }
                    canalConnector.ack(batchId);

                }
            } catch (InterruptedException e) {<!-- -->
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {<!-- -->
                e.printStackTrace();
            }
        } finally {<!-- -->
            canalConnector.disconnect();
        }
    }



    /**
     * data processing
     * @param entries
     */
    private void dataHandle(List<Entry> entries) throws InvalidProtocolBufferException {<!-- -->
        for (Entry entry : entries) {<!-- -->
            if (EntryType.ROWDATA == entry.getEntryType()) {<!-- -->
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                EventType eventType = rowChange.getEventType();
                if (eventType == EventType.DELETE) {<!-- -->
                    genDeleteSql(entry);
                } else if (eventType == EventType.UPDATE) {<!-- -->
                    genUpdateSql(entry);
                } else if (eventType == EventType.INSERT) {<!-- -->
                    genInsertSql(entry);
                }
            }
        }
    }

    private void genUpdateSql(Entry entry) {<!-- -->
        try {<!-- -->
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {<!-- -->
                List<Column> newColumnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
                for (int i = 0; i < newColumnList.size(); i + + ) {<!-- -->
                    sql.append(" " + newColumnList.get(i).getName()
                             + " = '" + newColumnList.get(i).getValue() + "'");
                    if (i != newColumnList.size() - 1) {<!-- -->
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                List<Column> oldColumnList = rowData.getBeforeColumnsList();
                for (Column column : oldColumnList) {<!-- -->
                    if (column.getIsKey()) {<!-- -->
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                this.sqlToIm(sql.toString());
            }
        } catch (Exception e) {<!-- -->
            e.printStackTrace();
        }
    }

    private void genDeleteSql(Entry entry) {<!-- -->
        try {<!-- -->
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {<!-- -->
                List<Column> columnList = rowData.getBeforeColumnsList();
                StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
                for (Column column : columnList) {<!-- -->
                    if (column.getIsKey()) {<!-- -->
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                this.sqlToIm(sql.toString());
            }
        } catch (Exception e) {<!-- -->
            e.printStackTrace();
        }
    }

    private void genInsertSql(Entry entry) {<!-- -->
        try {<!-- -->
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {<!-- -->
                List<Column> columnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
                for (int i = 0; i < columnList.size(); i + + ) {<!-- -->
                    sql.append(columnList.get(i).getName());
                    if (i != columnList.size() - 1) {<!-- -->
                        sql.append(",");
                    }
                }
                sql.append(") VALUES (");
                for (int i = 0; i < columnList.size(); i + + ) {<!-- -->
                    sql.append("'" + columnList.get(i).getValue() + "'");
                    if (i != columnList.size() - 1) {<!-- -->
                        sql.append(",");
                    }
                }
                sql.append(")");
                this.sqlToIm(sql.toString());
            }
        } catch (Exception e) {<!-- -->
            e.printStackTrace();
        }
    }

    /**
     * Synchronize to openfire server
     * @param sql
     */
    public void sqlToIm(String sql) throws Exception {<!-- -->
        String body = Jsoup.connect(imUrl)
                .ignoreContentType(true)
                .header("Content-Type", "application/x-www-form-urlencoded")
                .data("msg", sql)
                .method(Connection.Method.POST)
                .execute().body();
        log.info("sqlToIm result========" + body);
    }
}

3. Smack message client implementation.

The Smack message client receives messages from the Openfire server and synchronizes the data to the database. Implement code
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.penngo.example</groupId>
    <artifactId>Smack-Test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <java.version>8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-core -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-core</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-extensions -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-extensions</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-im -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-im</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-tcp -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-tcp</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-debug -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-debug</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-experimental -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-experimental</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-legacy -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-legacy</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-bosh -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-bosh</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-resolver-minidns -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-resolver-minidns</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-resolver-javax -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-resolver-javax</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-resolver-dnsjava -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-resolver-dnsjava</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-xmlparser-xpp3 -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-xmlparser-xpp3</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-sasl-javax -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-sasl-javax</artifactId>
            <version>4.4.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.igniterealtime.smack/smack-java8 -->
        <dependency>
            <groupId>org.igniterealtime.smack</groupId>
            <artifactId>smack-java8</artifactId>
            <version>4.4.6</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.23</version>
        </dependency>
        <dependency>
            <groupId>commons-dbutils</groupId>
            <artifactId>commons-dbutils</artifactId>
            <version>1.6</version>
        </dependency>
    </dependencies>
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>Maven Aliyun Mirror</name>
            <url>https://maven.aliyun.com/repository/central</url>
        </repository>
    </repositories>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Business logic implementation SqlAgentIM.java

package com.penngo.example;

import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.SmackException;

import org.jivesoftware.smack.chat.Chat;
import org.jivesoftware.smack.chat.ChatManager;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smack.tcp.XMPPTCPConnection;
import org.jivesoftware.smack.tcp.XMPPTCPConnectionConfiguration;
import org.jxmpp.jid.EntityBareJid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import java.sql.Connection;
import java.sql.DriverManager;

public class SqlAgentIM {
    private static final Logger log = LoggerFactory.getLogger(SqlAgentIM.class);
    private String dburl = "jdbc:mysql://localhost:3306/flinktest2?serverTimezone=Asia/Shanghai & amp;characterEncoding=utf-8 & amp;useSSL=false";
    private AbstractXMPPConnection conn = null;
    private DbDao dbDao = null;
    public SqlAgentIM(){

    }

    public void run(){
        dbDao = new DbDao(dburl, "root", "test123");
        login();
    }

    public void login(){
        try{
            //Open the debug window
            SmackConfiguration.DEBUG = true;
            XMPPTCPConnectionConfiguration config = XMPPTCPConnectionConfiguration.builder()
                    .setUsernameAndPassword("test2", "123456")
                    .setXmppDomain("21doc.net")
                    .setHost("127.0.0.1")
                    .setPort(5222)
                    .setSecurityMode(ConnectionConfiguration.SecurityMode.disabled)
                    .build();

            conn = new XMPPTCPConnection(config);
            conn.connect().login();

            setOnlineStatus();

            ChatManager chatManager = ChatManager.getInstanceFor(conn);
            chatManager.addChatListener((chat, b) -> chat.addMessageListener((chat1, message) ->
            {
                String sql = message.getBody();
                //System.out.println("New message from " + chat1 + ": " + message.getBody());
                System.out.println("New message from " + chat1.getParticipant().asEntityBareJidString() + ": " + sql);
                try{
                    int successCount = dbDao.execute(sql);
                    log.info("execute result====successCount:" + successCount);
                }
                catch(Exception e){
                    e.printStackTrace();
                    log.error("execute error", e);
                }
            }));
        }
        catch(Exception e){
            e.printStackTrace();
        }
    }

    public void setOnlineStatus() throws SmackException.NotConnectedException, InterruptedException {
        Presence presence = new Presence(Presence.Type.available, "online", 1, Presence.Mode.available);
        conn.sendStanza(presence);
        Presence presence2 = new Presence(Presence.Type.subscribe, "online", 1, Presence.Mode.available);
        conn.sendStanza(presence2);
        Presence presence3 = new Presence(Presence.Type.subscribed, "online", 1, Presence.Mode.available);
        conn.sendStanza(presence3);
    }

    class DbDao {
    
        private String jdbc_driver;
        private String jdbc_url;
        private String jdbc_username;
        private String jdbc_password;
        private Connection conn = null;
        public DbDao(String url, String username, String password){
            if(conn == null){
                try{
                    if(jdbc_driver == null){
                        jdbc_driver = "";
                        jdbc_url = url;
                        jdbc_username = username;
                        jdbc_password = password;
                        
                    }
                    DbUtils.loadDriver(jdbc_driver);
                    
                    conn = DriverManager.getConnection(jdbc_url, jdbc_username, jdbc_password);
                }
                catch(Exception e){
                    e.printStackTrace();
                }
            }
        }
    
        public int execute(String sql, Object... params) throws Exception{
            QueryRunner qr = new QueryRunner();
            int i = qr.update(conn, sql, params);
            return i;
        }
        
        public void close(){
            DbUtils.closeQuietly(conn);
        }
    }

    public static void main(String[] args) {
        SqlAgentIM sqlAgentIM = new SqlAgentIM();
        sqlAgentIM.run();
    }
}

This article only implements mysql update when the user is online, and does not implement the following content:

  • 1. Synchronization of offline data when the client is offline;
  • 2. The data is fully updated when the user subscribes to the data for the first time.
    Subsequent optimization and improvement can be made based on the adoption of the plan.