Hello everyone, I am Bucai Chen~
The previous article has introduced in detail how to use Canal middleware to synchronize MySQL data to ElasticSearch. However, since Canal has not been maintained for a long time, you may encounter many problems during use. Therefore, while trying Canal, we can also consider using Logstash to achieve similar functionality. This chapter will focus on how to use Logstash to synchronize MySQL data to ElasticSearch. If you have mastered the previous tutorial on Canal, you can start reading directly from the Logstash part of environment preparation.
Java Technology Guide: https://java-family.cn
Environment preparation
Tools | Version | Linux (ARM) | Windows | Remarks |
---|---|---|---|---|
JDK | 1.8 | jdk-8u371-linux-aarch64.tar.gz | jdk-8u371-windows-x64.exe | |
MySQL | 8.0.26 | mysql-community-server-8.0.26-1.el7.aarch64.rpm | mysql-installer-community-8.0.26.0.msi | |
Elasticsearch | 7.17.11 | elasticsearch-7.17.11-linux-aarch64.tar.gz | elasticsearch-7.17.11-windows-x86_64 | |
Logstash | 7.17.10 | logstash-7.17.10-linux-aarch64 | logstash- 7.17.10-windows-x86_64 |
JDK
MySQL
Elasticsearch
Windows operating system users can directly unzip and go to the bin folder to run the elasticsearch.bat file.
Unzip
tar -zxvf elasticsearch-7.17.11-linux-aarch64.tar.gz -C /usr/software mv /usr/software/elasticsearch-7.17.11-linux-aarch64 /usr/software/elasticsearch-7.17.11
Create data directory
cd /usr/software/elasticsearch-7.17.11/ mkdir data
Modify configuration file
cd /usr/software/elasticsearch-7.17.11/ vi config/elasticsearch.yml
In the configuration file, release the relevant comments and the main modifications are as follows:
# Cluster name cluster.name:xxx # Node name node.name: node-1 # Data and log storage directory path.data: /usr/software/elasticsearch-7.17.11/data path.logs: /usr/software/elasticsearch-7.17.11/logs # Any computer node access network.host: 0.0.0.0 #Default port http.port: 9200 #Initial master node cluster.initial_master_nodes: ["node-1"]
Open port
firewall-cmd --add-port=9300/tcp --permanent firewall-cmd --add-port=9200/tcp --permanent firewall-cmd --reload systemctl restart firewalld
User permissions issue
Note: Under Linux systems, the root user cannot start Elasticsearch, so an additional dedicated user needs to be created to start Elasticsearch. Follow the public account: Ma Yuan Technology Column, reply with keywords: BAT, and get the real interview questions from big companies!
# Create user useradd elastic # Authorization chown -R elastic /usr/software/elasticsearch-7.17.11/
Insufficient memory problem
Note: If the server memory is large enough, you can skip this step, otherwise an out of memory error will be reported when starting!
cd /usr/software/elasticsearch-7.17.11/ viconfig/jvm.options bash copy code# Set heap memory size -Xms256m -Xmx256m
Other questions
-
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least
vi /etc/sysctl.conf #Add the following configuration at the end vm.max_map_count=655360 bash Copy the code sysctl -p
-
max file descriptors [4096] for elasticsearch process is too low
vi /etc/security/limits.conf #Add the following configuration at the end * soft nofile 65536 *hard nofile 131072 * soft nproc 2048 * soft nproc 4096
Background startup
su elastic cd /usr/software/elasticsearch-7.17.11/bin/ ./elasticsearch -d
Visualization plug-in
Install Multi Elasticsearch Head (opens new window) from the Chrome Web Store.
Click the extension icon in the toolbar of your web browser.
Note that you don’t need to enable CORS (opens new window) with this method.
Java Technology Guide: https://java-family.cn
Logstash
Note: The IP address used in this tutorial is 172.16.138.130, please replace it according to the actual IP!
Unzip
tar -zxvf logstash-7.17.10-linux-aarch64.tar.gz -C /usr/software mv /usr/software/logstash-7.17.10-linux-aarch64 /usr/software/logstash-7.17.10
Create directory
cd /usr/software/logstash-7.17.10 # This folder is dedicated to storing pipeline configuration files and driver packages related to MySQL. mkdir mysql
Download driver
Search for MySQL Connector Java in Maven Repository, select the corresponding MySQL version, and download it.
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar
Pipeline configuration
Create a .conf
extension configuration file in the mysql
folder and customize the file name:
cd /usr/software/logstash-7.17.10/mysql vi jdbc.conf
Configuration | Description |
---|---|
input | Specify the input data source. For supported data source types, see Input plugins. This article uses the JDBC data source. For specific parameter descriptions, please see the input parameter description. |
filter | Specifies the plug-in to filter the input data. For supported plug-in types, see Filter plug-ins. |
output | Specifies the target data source type. For supported data source types, see Output plugins. This article needs to synchronize data in MySQL to Elasticsearch, so the output needs to specify the target Elasticsearch information. |
The following configurations are configured according to test data. In actual business, please configure them appropriately according to business needs:
input { jdbc { # When synchronizing multiple tables, the table type is distinguished. It is recommended to name it "library name_table name" type => "mytest_user" #Specify JDBC connection MySQL driver file jdbc_driver_library => "/usr/software/logstash-7.17.10/mysql/mysql-connector-java-8.0.30.jar" # MySQL driver jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # Database connection information jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true & amp;characterEncoding=utf-8 & amp;useSSL=false & amp;allowLoadLocalInfile=false & amp;autoDeserialize=false" # database username jdbc_user => "root" # Database password jdbc_password => "123456" # Whether to enable paging jdbc_paging_enabled => "true" # Paging Size jdbc_page_size => "500" # Whether to record the last execution result. If it is true, the tracking_column field value last executed will be recorded and saved to the file specified by last_run_metadata_path. record_last_run => true # Specify the storage address of the last running time file last_run_metadata_path => "/usr/software/logstash-7.17.10/mysql/last_run_metadata_update_time.txt" #Specify the tracking column, which must be incremental, usually the MySQL primary key. tracking_column => "update_time" #Whether a certain column value needs to be recorded. When this value is set to true, the system will record the latest value of the column specified by the tracking_column parameter, and use the value of this column to determine the record that needs to be updated the next time the pipeline is executed. use_column_value => "true" # Tracking column type, the default is numeric. tracking_column_type => "timestamp" # Sync frequency schedule => "*/5 * * * * *" #Specify SQL statement statement => "SELECT * FROM user WHERE update_time > :sql_last_value AND update_time < NOW()" # Whether to clear last_run_metadata_path records, the default is false. If true, all database records will be queried from scratch every time. clean_run => "false" } } filter { } output { if [type] == "mytest_user" { elasticsearch { # Configure ES cluster address hosts => ["127.0.0.1:9200"] # Index name (must be lowercase) index => "user" # username user => "" # password password => "" # Data unique index (it is recommended to use database primary key) document_id => "%{id}" } } stdout { codec => json_lines } }
Note: jdbc_driver_library and last_run_metadata_path need to write absolute paths. Using relative paths may prompt a lack of permission.
Data synchronization
Finally, we have reached the data synchronization operation. Now the requirements are as follows: synchronize the user
table data in MySQL to the user
index in ES, so follow me and do it together!
Create database and tables
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; ---------------------------- --Table structure for user ---------------------------- DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( `id` int NOT NULL AUTO_INCREMENT, `username` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'username', `age` int DEFAULT NULL COMMENT 'age', `gender` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'gender', `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'Creation time', `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT 'update time', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; SET FOREIGN_KEY_CHECKS = 1;
Start
cd /usr/software/logstash-7.17.10 bin/logstash -f mysql/jdbc.conf
Note: The path to the pipeline configuration file must be correct!
Check whether the startup is successful
Synchronous testing
Add a record to the user
table in the MySQL database, and then go to the Elasticsearch visual interface to check whether the synchronization is successful:
FAQ
Delete data
If a record is deleted from MySQL, the operation is not synchronized to Elasticsearch. In order to achieve deletion synchronization operation, you can consider using soft deletion
, that is, logical deletion method:
Add a is_deleted
field to the MySQL data table to indicate whether the record is valid. Once an update occurs, is_deleted
will also be synchronized to Elasticsearch. Using this method, when executing a MySQL or Elasticsearch query, you need to rewrite the query statement to filter out the records where is_deleted
is true
, thereby achieving the soft deletion effect.
One final word (don’t prostitute for nothing, please pay attention)
Every article written by Chen is carefully written. If this article is helpful or inspiring to you, please like, read, repost, and collect it. Your support is my biggest motivation to persevere!
In addition, Chen’s Knowledge Planet has been opened. It only costs 199 yuan to join. The value of planet feedback is huge. Currently, it has updated the Code Yuan chronic disease cloud management practical project, the Spring family bucket practical series, the billion-level data sub-database and table practical, and the DDD micro Service practice column, I want to join a big factory, Spring, Mybatis and other framework source codes, 22 lectures on architecture practice, RocketMQ, etc…
More introduction
If you need to join the planet, add Chen’s WeChat account: special_coder
The knowledge points of the article match the official knowledge archives, and you can further learn relevant knowledge. Cloud native entry-level skills treeContinuous integration and deployment (Jenkins)Use helm to install Jenkins16750 people are learning the system