Flink SQL — command line usage

1. Start Flink SQL
First start the Flink cluster and select independent cluster mode or session mode. The session mode is selected here:

yarn-session.sh -d

Starting the Flink SQL client:
sql-client.sh
2. kafka SQL connector
When using kafka as a data source, you need to upload the jar package to flnik's lib:

/usr/local/soft/flink-1.15.2/lib

You can go to the official website to find the corresponding version to download and upload.

1. Create table:

Stream the definition table again
Creating a table in flink is equivalent to creating a view (no data is stored in the view, and the data will be read from the original table only when the view is queried)


CREATE TABLE students (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'student',
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)


2. Query data (continuous query):

select clazz,count(1) as c from students group by clazz;


3. The client provides three modes for maintaining and visualizing results:

1. Table mode (the mode used by default), (table mode), materializes the results in memory and displays the results visually in a regular paging table.

SET 'sql-client.execution.result-mode' = 'table';

2. Changelog mode (changelog mode) does not materialize and visualize the results, but a continuous query consisting of insertion (+) and undoing (-) produces a result stream.

SET 'sql-client.execution.result-mode' = 'changelog';

3. Tableau mode is closer to a traditional database, and the execution results will be displayed directly on the screen in tabular form. The specific displayed content will depend on the job execution mode (execution.type):

SET 'sql-client.execution.result-mode' = 'tableau';

4. Flink SQL streaming and batch integration:
1. Stream processing:

a. Stream processing can handle both bounded and unbounded flows.

b. The output result of stream processing is a continuous result.

c. The bottom layer of stream processing is the continuous flow model. The upstream Task and the downstream Task are started at the same time and wait for the arrival of data.

SET 'execution.runtime-mode' = 'streaming'; 
2. Batch processing:

a. Batch processing can only be used to process bounded streams

b. The output is the final result.

c. The bottom layer of batch processing is the MapReduce model. The upstream Task will be executed first, and then the downstream Task will be executed.

SET 'execution.runtime-mode' = 'batch';
Flink does batch processing and reads a file:

--Create a bounded stream table
CREATE TABLE students_hdfs (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem', -- Required: Specify the connector type
  'path' = 'hdfs://master:9000/data/spark/stu/students.txt', -- Required: Specify the path
  'format' = 'csv' -- Required: file system connector specifies format
);


select clazz,count(1) as c from
students_hdfs
group by clazz
5. Flink SQL connector:
1. kafka SQL connector

Some parameters need to be understood from the official website.

1. kafka source

-- Create kafka table
CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- offset
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --the time when data enters kafka, which can be used as event time
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- the topic of the data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'earliest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'csv' -- the format for reading data
);

2. kafka sink

-- Create kafka table
CREATE TABLE students_kafka_sink (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students_sink', -- the topic of data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'earliest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'csv' -- the format for reading data
);

-- Save query results to kafka
insert into students_kafka_sink
select * from students_hdfs;

kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic students_sink

3. Write the updated stream into kafka

Because Kafka is a message queue, it will not be deduplicated. So you only need to change the format of reading data to canal-json. When the data is read back, it is still in the original streaming mode.

CREATE TABLE clazz_num_kafka (
    clazz STRING,
    num BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'clazz_num', -- the topic of the data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'earliest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'canal-json' -- the format for reading data
);

--Writing updated data to kafka requires the use of canal-json format, and the data will contain the operation type.
{"data":[{"clazz":"Liberal Arts Class 1","num":71}],"type":"INSERT"}
{"data":[{"clazz":"Science Class 3","num":67}],"type":"DELETE"}


insert into clazz_num_kafka
select clazz,count(1) as num from
students
group by clazz;


kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num
2. hdfs SQL connector

1. hdfs source

Flink can read files using bounded streams or unbounded streams.

--bounded flow
CREATE TABLE students_hdfs_batch (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem', -- Required: Specify the connector type
  'path' = 'hdfs://master:9000/data/student', -- Required: Specify the path
  'format' = 'csv' -- Required: file system connector specifies format
);

select * from students_hdfs_batch;

-- unbounded flow
-- Based on HDFS for stream processing, data is read in file units, and the delay is larger than that of Kafka.
CREATE TABLE students_hdfs_stream (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'filesystem', -- Required: Specify the connector type
    'path' = 'hdfs://master:9000/data/student', -- Required: Specify the path
    'format' = 'csv' , -- Required: file system connector specifies format
    'source.monitor-interval' = '5000' -- scan the directory at regular intervals and generate an unbounded stream
);


select * from students_hdfs_stream;

2. hdfs sink

-- 1. Batch processing mode (the usage and underlying principles are similar to hive)
SET 'execution.runtime-mode' = 'batch';

--Create table
CREATE TABLE clazz_num_hdfs (
    clazz STRING,
    num BIGINT
)WITH (
  'connector' = 'filesystem', -- Required: Specify the connector type
  'path' = 'hdfs://master:9000/data/clazz_num', -- Required: Specify the path
  'format' = 'csv' -- Required: file system connector specifies format
);
-- Save query results to table
insert into clazz_num_hdfs
select clazz,count(1) as num
from students_hdfs_batch
group by clazz;


-- 2. Stream processing mode
SET 'execution.runtime-mode' = 'streaming';

-- Create a table, if the stream of ten updated changes returned by the query data needs to use canal-json format
CREATE TABLE clazz_num_hdfs_canal_json (
    clazz STRING,
    num BIGINT
)WITH (
  'connector' = 'filesystem', -- Required: Specify the connector type
  'path' = 'hdfs://master:9000/data/clazz_num_canal_json', -- Required: Specify the path
  'format' = 'canal-json' -- Required: file system connector specifies format
);

insert into clazz_num_hdfs_canal_json
select clazz,count(1) as num
from students_hdfs_stream
group by clazz;
3. MySQL SQL connector

1. Integration:

# 1. Upload dependency packages to flink’s lib directory/usr/local/soft/flink-1.15.2/lib
flink-connector-jdbc-1.15.2.jar
mysql-connector-java-5.1.49.jar

# 2. Need to restart the flink cluster
yarn application -kill [appid]
yarn-session.sh -d

# 3. Re-enter the sql command line
sql-client.sh

2. mysql source

--bounded flow
--The field types and field names of the tables in flink need to be consistent with mysql
CREATE TABLE students_jdbc (
    id BIGINT,
    name STRING,
    age BIGINT,
    gender STRING,
    clazz STRING,
    PRIMARY KEY (id) NOT ENFORCED -- primary key
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'students',
    'username' ='root',
    'password' ='123456'
);

select * from students_jdbc;

3. mysql sink

-- Create kafka table
CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- offset
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --the time when data enters kafka, which can be used as event time
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- the topic of the data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'earliest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'csv' -- the format for reading data
);

--Create mysql sink table
CREATE TABLE clazz_num_mysql (
    clazz STRING,
    num BIGINT,
    PRIMARY KEY (clazz) NOT ENFORCED -- primary key
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'clazz_num',
    'username' ='root',
    'password' ='123456'
);

---Create the receiving table in mysql
CREATE TABLE clazz_num (
    clazz varchar(10),
    num BIGINT,
    PRIMARY KEY (clazz) -- primary key
) ;

--Write sql query results to mysql in real time
-- Write the updated stream to mysql, and flink will automatically update the data according to the primary key.
insert into clazz_num_mysql
select
clazz,
count(1) as num from
students_kafka
group by clazz;

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students insert a piece of data
4. DataGen: used to generate random data, generally used in high-performance testing
-- Create package (can only be used in source table)
CREATE TABLE students_datagen (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='5', -- The amount of randomly generated data per second
    'fields.age.min'='1',
    'fields.age.max'='100',
    'fields.sid.length'='10',
    'fields.name.length'='2',
    'fields.sex.length'='1',
    'fields.clazz.length'='4'
);

5. print: used for high-performance testing and can only be used for sink tables
CREATE TABLE print_table (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
     'connector' = 'print'
);

insert into print_table
select * from students_datagen;



The results need to be viewed in the submitted task. 
6. BlackHole: It is used for high-performance testing and can be used for Flink’s back-pressure testing later.
CREATE TABLE blackhole_table (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'blackhole'
);

insert into blackhole_table
select * from students_datagen;
6. SQL syntax
1. Hints:

It is used to prompt execution. In Flink, it can dynamically modify the attributes in the table. In Spark, it can be used for broadcasting. After modifying the attributes in the dynamic table, you can read the modified requirements without rebuilding the table.

CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- offset
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --the time when data enters kafka, which can be used as event time
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- the topic of the data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'latest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'csv' -- the format for reading data
);

--Dynamicly modify table attributes, you can modify the location of reading kafka data when querying data, without re-creating the table
select * from students_kafka /* + OPTIONS('scan.startup.mode' = 'earliest-offset') */;


-- bounded flow
CREATE TABLE students_hdfs (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'filesystem', -- Required: Specify the connector type
    'path' = 'hdfs://master:9000/data/student', -- Required: Specify the path
    'format' = 'csv' -- Required: file system connector specifies format
);

-- When querying HDFS, it can be dynamically changed to unbounded flow without re-creating the table.
select * from students_hdfs /* + OPTIONS('source.monitor-interval' = '5000' ) */;
2. WITH:

When a SQL statement is used multiple times, the SQL will be given an alias through with, which is similar to encapsulation. It creates a temporary view (not a real view) for the SQL to facilitate next use.

CREATE TABLE students_hdfs (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'filesystem', -- Required: Specify the connector type
    'path' = 'hdfs://master:9000/data/student', -- Required: Specify the path
    'format' = 'csv' -- Required: file system connector specifies format
);

-- When querying HDFS, it can be dynamically changed to unbounded flow without re-creating the table.
select * from students_hdfs /* + OPTIONS('source.monitor-interval' = '5000' ) */;



-- The sql of the subquery represented by the tmp alias can be used multiple times in subsequent sql
with tmp as (
    select * from students_hdfs
    /* + OPTIONS('source.monitor-interval' = '5000' ) */
    where clazz='Class 1 of Liberal Arts'
)
select * from tmp
union all
select * from tmp;
3. DISTINCT:

In flink’s stream processing, using distinct, flink needs to save the previous data in the state. If the data keeps increasing, the state will become larger and larger. As the state becomes larger and larger, the checkpoint time will increase, which will eventually cause problems with the flink task.

select
count(distinct sid)
from students_kafka /* + OPTIONS('scan.startup.mode' = 'earliest-offset') */;

select
    count(sid)
from (
    select
    distinct *
    from students_kafka /* + OPTIONS('scan.startup.mode' = 'earliest-offset') */
);

Precautions:

1. When the Flink Client exits, the dynamic table created in it will no longer exist. These table structures are metadata and are stored in memory.

2. When filtering where, there will be three situations in the string: empty string, space string, and null string. There is a difference between the three:

These three are different concepts, and the filtering conditions are different when filtering where.

1. Filter empty strings:
 where s! = 'empty string'

2. Filter space strings:
 where s! = 'space'

3. Filter null strings:

where s! = null
Common functions in Flink SQL:

from_unixtime:
   
    Returns the representation of the numeric argument numberic in string format string (defaults to yyyy-MM-dd HH:mm:ss’

to_timestamp:
    
    Convert string string1 in format string2 (default: 'yyyy-MM-dd HH:mm:ss') to timestamp




The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Java Skill TreeHomepageOverview 139319 people are learning the system