I recently used logstash and canal in my project, record the installation steps.
1. Install logstash
1. Official website download: Official website address, the demo download I am here is version 7.17.10
Step one:
Second step:
Step 3:
2. Unzip:
3. Event Demo:
Step one: Go to the bin directory and open the terminal:
Step 2: Run the logstash command:
./logstash -e "input { stdin { } } output { stdout {} }"
Step 3: Run successfully
Step 4: Enter “zgytest”, success
4. Start with a custom configuration file:
Step 1: Enter the config directory
Step 2: Copy logstash-sample.conf and rename it to mytask.conf
Step 3: Edit mytask.conf
input {<!-- --> udp {<!-- --> port => 514 type => "syslog" } } output {<!-- --> stdout {<!-- --> codec => rubydebug } }
Step 4: Start
./logstash -f /Users/zgy/Downloads/logstash-7.17.10/config/mytask.conf
Start successfully
2. Install canal
1. Enter github: canal
Scroll down to find Download
What I demonstrate here is the installation of version 1.1.6:
Click to download
2. Unzip after downloading:
Go to the bin directory, open the terminal, and execute
./startup.sh
Start canal successfully
3. Quick start: https://github.com/alibaba/canal/wiki/QuickStart
First find the installation directory of mysql and find the configuration file: my.cnf or my.ini. Since I installed mysql (mysql8 version) in docker, the following steps apply to mysql installed in docker
Step one: Since the vim command must be installed in the container to encode the configuration file, we directly copy the configuration file from the container to the local machine:
docker cp mysql-docker:/etc/my.cnf /Users/zgy/Downloads/Java/es/my.cnf mysql-docker: is my container name /etc/my.cnf: is the path to the configuration file in the container /Users/zgy/Downloads/Java/es/my.cnf: is the path where I want to copy the configuration file to the local
Step 2: Edit the configuration file copied to the local device
Add the following configuration
Step 3: Copy the edited configuration file to the container
docker cp /Users/zgy/Downloads/Java/es/my.cnf mysql-docker:/etc/my.cnf
Step 4: Restart the container to make the configuration items take effect
Step 5: Authorize the canal link MySQL account has the permission to serve as a MySQL slave. If you already have an account, you can grant it directly and execute the following command in the database
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
Step 6: Introduce dependencies into the project:
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency>
Step 7: Write an example:
import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; public class SimpleCanalClientExample {<!-- --> public static void main(String args[]) {<!-- --> //Create link CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try {<!-- --> connector.connect(); connector.subscribe(".*\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) {<!-- --> Message message = connector.getWithoutAck(batchSize); // Get the specified amount of data long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) {<!-- --> emptyCount + + ; System.out.println("empty count : " + emptyCount); try {<!-- --> Thread.sleep(1000); } catch (InterruptedException e) {<!-- --> } } else {<!-- --> emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \ ", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // Submit confirmation // connector.rollback(batchId); // Processing failed, rollback data } System.out.println("empty too many times, exit"); } finally {<!-- --> connector.disconnect(); } } private static void printEntry(List<Entry> entries) {<!-- --> for (Entry entry : entries) {<!-- --> if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {<!-- --> continue; } RowChange rowChage = null; try {<!-- --> rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) {<!-- --> throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================ > binlog[%s:%s] , name[%s,%s] , eventType: %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) {<!-- --> if (eventType == EventType.DELETE) {<!-- --> printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) {<!-- --> printColumn(rowData.getAfterColumnsList()); } else {<!-- --> System.out.println("------- & amp;gt; before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("------- & amp;gt; after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) {<!-- --> for (Column column : columns) {<!-- --> System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } }
Step 8: Start
Step 9: Change database data and canal monitor
Successfully monitored
Problems encountered:
Canal is started, and the example is also started, but canal cannot detect changes in the data in the database.
Solution:
Go to the terminal and log in to the canal user created earlier.
mysql -u canal -p -h 127.0.0.1