Install and use logstash and canal on Mac (mysql is installed in a docker container)

I recently used logstash and canal in my project, record the installation steps.

1. Install logstash

1. Official website download: Official website address, the demo download I am here is version 7.17.10

Step one:
Second step:

Step 3:

2. Unzip:

3. Event Demo:

Step one: Go to the bin directory and open the terminal:

Step 2: Run the logstash command:
./logstash -e "input { stdin { } } output { stdout {} }"


Step 3: Run successfully

Step 4: Enter “zgytest”, success

4. Start with a custom configuration file:

Step 1: Enter the config directory

Step 2: Copy logstash-sample.conf and rename it to mytask.conf

Step 3: Edit mytask.conf
input {<!-- -->
  udp {<!-- -->
    port => 514
    type => "syslog"
  }
}

output {<!-- -->
  stdout {<!-- --> codec => rubydebug }
}
Step 4: Start
./logstash -f /Users/zgy/Downloads/logstash-7.17.10/config/mytask.conf

Start successfully

2. Install canal

1. Enter github: canal


Scroll down to find Download

What I demonstrate here is the installation of version 1.1.6:
Click to download


2. Unzip after downloading:


Go to the bin directory, open the terminal, and execute

./startup.sh

Start canal successfully

3. Quick start: https://github.com/alibaba/canal/wiki/QuickStart

First find the installation directory of mysql and find the configuration file: my.cnf or my.ini. Since I installed mysql (mysql8 version) in docker, the following steps apply to mysql installed in docker

Step one: Since the vim command must be installed in the container to encode the configuration file, we directly copy the configuration file from the container to the local machine:
docker cp mysql-docker:/etc/my.cnf /Users/zgy/Downloads/Java/es/my.cnf

mysql-docker: is my container name
/etc/my.cnf: is the path to the configuration file in the container
/Users/zgy/Downloads/Java/es/my.cnf: is the path where I want to copy the configuration file to the local
Step 2: Edit the configuration file copied to the local device

Add the following configuration

Step 3: Copy the edited configuration file to the container
docker cp /Users/zgy/Downloads/Java/es/my.cnf mysql-docker:/etc/my.cnf
Step 4: Restart the container to make the configuration items take effect
Step 5: Authorize the canal link MySQL account has the permission to serve as a MySQL slave. If you already have an account, you can grant it directly and execute the following command in the database
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
Step 6: Introduce dependencies into the project:
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>
Step 7: Write an example:
import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {<!-- -->


public static void main(String args[]) {<!-- -->
    //Create link
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {<!-- -->
        connector.connect();
        connector.subscribe(".*\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {<!-- -->
            Message message = connector.getWithoutAck(batchSize); // Get the specified amount of data
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {<!-- -->
                emptyCount + + ;
                System.out.println("empty count : " + emptyCount);
                try {<!-- -->
                    Thread.sleep(1000);
                } catch (InterruptedException e) {<!-- -->
                }
            } else {<!-- -->
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \
", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // Submit confirmation
            // connector.rollback(batchId); // Processing failed, rollback data
        }

        System.out.println("empty too many times, exit");
    } finally {<!-- -->
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entries) {<!-- -->
    for (Entry entry : entries) {<!-- -->
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {<!-- -->
            continue;
        }

        RowChange rowChage = null;
        try {<!-- -->
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {<!-- -->
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================ &gt; binlog[%s:%s] , name[%s,%s] , eventType: %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {<!-- -->
            if (eventType == EventType.DELETE) {<!-- -->
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {<!-- -->
                printColumn(rowData.getAfterColumnsList());
            } else {<!-- -->
                System.out.println("------- & amp;gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("------- & amp;gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {<!-- -->
    for (Column column : columns) {<!-- -->
        System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    }
}
Step 8: Start

Step 9: Change database data and canal monitor

Successfully monitored

Problems encountered:
Canal is started, and the example is also started, but canal cannot detect changes in the data in the database.
Solution:
Go to the terminal and log in to the canal user created earlier.

mysql -u canal -p -h 127.0.0.1