Canal has too many synchronization data pits? Come try Logstash!

Hello everyone, I am Bucai Chen~

The previous article has introduced in detail how to use Canal middleware to synchronize MySQL data to ElasticSearch. However, since Canal has not been maintained for a long time, you may encounter many problems during use. Therefore, while trying Canal, we can also consider using Logstash to achieve similar functionality. This chapter will focus on how to use Logstash to synchronize MySQL data to ElasticSearch. If you have mastered the previous tutorial on Canal, you can start reading directly from the Logstash part of environment preparation.

Java Technology Guide: https://java-family.cn

Environment preparation

Tools Version Linux (ARM) Windows Remarks
JDK 1.8 jdk-8u371-linux-aarch64.tar.gz jdk-8u371-windows-x64.exe
MySQL 8.0.26 mysql-community-server-8.0.26-1.el7.aarch64.rpm mysql-installer-community-8.0.26.0.msi
Elasticsearch 7.17.11 elasticsearch-7.17.11-linux-aarch64.tar.gz elasticsearch-7.17.11-windows-x86_64
Logstash 7.17.10 logstash-7.17.10-linux-aarch64 logstash- 7.17.10-windows-x86_64

JDK

ee013ead381e83e618a2dfdb27438af7.jpeg
JDK

MySQL

3364f962e09218fadad8299e532fce49.jpeg

MySQL

Elasticsearch

Windows operating system users can directly unzip and go to the bin folder to run the elasticsearch.bat file.

Unzip
tar -zxvf elasticsearch-7.17.11-linux-aarch64.tar.gz -C /usr/software

mv /usr/software/elasticsearch-7.17.11-linux-aarch64 /usr/software/elasticsearch-7.17.11
Create data directory
cd /usr/software/elasticsearch-7.17.11/

mkdir data
Modify configuration file
cd /usr/software/elasticsearch-7.17.11/

vi config/elasticsearch.yml

In the configuration file, release the relevant comments and the main modifications are as follows:

# Cluster name
cluster.name:xxx
# Node name
node.name: node-1
# Data and log storage directory
path.data: /usr/software/elasticsearch-7.17.11/data
path.logs: /usr/software/elasticsearch-7.17.11/logs
# Any computer node access
network.host: 0.0.0.0
#Default port
http.port: 9200
#Initial master node
cluster.initial_master_nodes: ["node-1"]

bfa58cb2fd63e3e49a526329dd3e994e.jpeg
095516b1ebe62797a03c10595fc0b2d8.jpeg

Open port
firewall-cmd --add-port=9300/tcp --permanent

firewall-cmd --add-port=9200/tcp --permanent

firewall-cmd --reload

systemctl restart firewalld
User permissions issue

Note: Under Linux systems, the root user cannot start Elasticsearch, so an additional dedicated user needs to be created to start Elasticsearch. Follow the public account: Ma Yuan Technology Column, reply with keywords: BAT, and get the real interview questions from big companies!

# Create user
useradd elastic

# Authorization
chown -R elastic /usr/software/elasticsearch-7.17.11/
Insufficient memory problem

Note: If the server memory is large enough, you can skip this step, otherwise an out of memory error will be reported when starting!

cd /usr/software/elasticsearch-7.17.11/

viconfig/jvm.options
bash copy code# Set heap memory size
-Xms256m
-Xmx256m
Other questions
  1. max virtual memory areas vm.max_map_count [65530] is too low, increase to at least

vi /etc/sysctl.conf

#Add the following configuration at the end
vm.max_map_count=655360
bash
Copy the code sysctl -p
  1. max file descriptors [4096] for elasticsearch process is too low

vi /etc/security/limits.conf

#Add the following configuration at the end
* soft nofile 65536
*hard nofile 131072
* soft nproc 2048
* soft nproc 4096
Background startup
su elastic

cd /usr/software/elasticsearch-7.17.11/bin/

./elasticsearch -d
Visualization plug-in

Install Multi Elasticsearch Head (opens new window) from the Chrome Web Store.

Click the extension icon in the toolbar of your web browser.

Note that you don’t need to enable CORS (opens new window) with this method.

Java Technology Guide: https://java-family.cn

Logstash

Note: The IP address used in this tutorial is 172.16.138.130, please replace it according to the actual IP!

Unzip
tar -zxvf logstash-7.17.10-linux-aarch64.tar.gz -C /usr/software

mv /usr/software/logstash-7.17.10-linux-aarch64 /usr/software/logstash-7.17.10
Create directory
cd /usr/software/logstash-7.17.10

# This folder is dedicated to storing pipeline configuration files and driver packages related to MySQL.
mkdir mysql
Download driver

Search for MySQL Connector Java in Maven Repository, select the corresponding MySQL version, and download it.

8ccae36d478505255339f9e617015967.jpeg

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar
Pipeline configuration

Create a .conf extension configuration file in the mysql folder and customize the file name:

cd /usr/software/logstash-7.17.10/mysql

vi jdbc.conf
Configuration Description
input Specify the input data source. For supported data source types, see Input plugins. This article uses the JDBC data source. For specific parameter descriptions, please see the input parameter description.
filter Specifies the plug-in to filter the input data. For supported plug-in types, see Filter plug-ins.
output Specifies the target data source type. For supported data source types, see Output plugins. This article needs to synchronize data in MySQL to Elasticsearch, so the output needs to specify the target Elasticsearch information.

The following configurations are configured according to test data. In actual business, please configure them appropriately according to business needs:

input {
   jdbc {
      # When synchronizing multiple tables, the table type is distinguished. It is recommended to name it "library name_table name"
      type => "mytest_user"
      #Specify JDBC connection MySQL driver file
      jdbc_driver_library => "/usr/software/logstash-7.17.10/mysql/mysql-connector-java-8.0.30.jar"
      # MySQL driver
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      # Database connection information
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true & amp;characterEncoding=utf-8 & amp;useSSL=false & amp;allowLoadLocalInfile=false & amp;autoDeserialize=false"
      # database username
      jdbc_user => "root"
      # Database password
      jdbc_password => "123456"
      # Whether to enable paging
      jdbc_paging_enabled => "true"
      # Paging Size
      jdbc_page_size => "500"
      # Whether to record the last execution result. If it is true, the tracking_column field value last executed will be recorded and saved to the file specified by last_run_metadata_path.
      record_last_run => true
      # Specify the storage address of the last running time file
      last_run_metadata_path => "/usr/software/logstash-7.17.10/mysql/last_run_metadata_update_time.txt"
      #Specify the tracking column, which must be incremental, usually the MySQL primary key.
      tracking_column => "update_time"
      #Whether a certain column value needs to be recorded. When this value is set to true, the system will record the latest value of the column specified by the tracking_column parameter, and use the value of this column to determine the record that needs to be updated the next time the pipeline is executed.
      use_column_value => "true"
      # Tracking column type, the default is numeric.
      tracking_column_type => "timestamp"
      # Sync frequency
      schedule => "*/5 * * * * *"
      #Specify SQL statement
      statement => "SELECT * FROM user WHERE update_time > :sql_last_value AND update_time < NOW()"
      # Whether to clear last_run_metadata_path records, the default is false. If true, all database records will be queried from scratch every time.
      clean_run => "false"
  }
}

filter {

}

output {
   if [type] == "mytest_user" {
      elasticsearch {
         # Configure ES cluster address
         hosts => ["127.0.0.1:9200"]
         # Index name (must be lowercase)
         index => "user"
         # username
         user => ""
         # password
         password => ""
         # Data unique index (it is recommended to use database primary key)
         document_id => "%{id}"
      }
   }
   stdout {
      codec => json_lines
   }
}

Note: jdbc_driver_library and last_run_metadata_path need to write absolute paths. Using relative paths may prompt a lack of permission.

Data synchronization

Finally, we have reached the data synchronization operation. Now the requirements are as follows: synchronize the user table data in MySQL to the user index in ES, so follow me and do it together!

Create database and tables

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

----------------------------
--Table structure for user
----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'username',
  `age` int DEFAULT NULL COMMENT 'age',
  `gender` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'gender',
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'Creation time',
  `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT 'update time',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

SET FOREIGN_KEY_CHECKS = 1;

Start

cd /usr/software/logstash-7.17.10

bin/logstash -f mysql/jdbc.conf

Note: The path to the pipeline configuration file must be correct!

Check whether the startup is successful

b5945e4fb14b38ed09f8fe68e5ecc289.jpeg

Synchronous testing

Add a record to the user table in the MySQL database, and then go to the Elasticsearch visual interface to check whether the synchronization is successful:

215540741c75265bbae394bf7692ed21.jpeg
003680a69b882ca43adc95f5a80b1bfb.jpeg

FAQ

Delete data

If a record is deleted from MySQL, the operation is not synchronized to Elasticsearch. In order to achieve deletion synchronization operation, you can consider using soft deletion, that is, logical deletion method:

Add a is_deleted field to the MySQL data table to indicate whether the record is valid. Once an update occurs, is_deleted will also be synchronized to Elasticsearch. Using this method, when executing a MySQL or Elasticsearch query, you need to rewrite the query statement to filter out the records where is_deleted is true, thereby achieving the soft deletion effect.

One final word (don’t prostitute for nothing, please pay attention)

Every article written by Chen is carefully written. If this article is helpful or inspiring to you, please like, read, repost, and collect it. Your support is my biggest motivation to persevere!

In addition, Chen’s Knowledge Planet has been opened. It only costs 199 yuan to join. The value of planet feedback is huge. Currently, it has updated the Code Yuan chronic disease cloud management practical project, the Spring family bucket practical series, the billion-level data sub-database and table practical, and the DDD micro Service practice column, I want to join a big factory, Spring, Mybatis and other framework source codes, 22 lectures on architecture practice, RocketMQ, etc…

More introduction

If you need to join the planet, add Chen’s WeChat account: special_coder

91c4f4b804c36c66299d0d1b02358d7e.png

The knowledge points of the article match the official knowledge archives, and you can further learn relevant knowledge. Cloud native entry-level skills treeContinuous integration and deployment (Jenkins)Use helm to install Jenkins16750 people are learning the system