Flink SQL –Command line usage (02)

1. Window function:
1. Create table:
-- Create kafka table
CREATE TABLE bid (
    bidtime TIMESTAMP(3),
    price DECIMAL(10, 2) ,
    item STRING,
    WATERMARK FOR bidtime AS bidtime
) WITH (
  'connector' = 'kafka',
  'topic' = 'bid', -- the topic of the data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'latest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'csv' -- the format for reading data
);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid
2020-04-15 08:05:00,4.00,C
2020-04-15 08:07:00,2.00,A
2020-04-15 08:09:00,5.00,D
2020-04-15 08:11:00,3.00,B
2020-04-15 08:13:00,1.00,E
2020-04-15 08:17:00,6.00,F
2. Scroll window:
1. Rolling event time window:
-- TUMBLE: rolling window function, the function adds [window start time, window end time, window time] based on the original table.
-- TABLE; table function, converts the results of the functions inside into a dynamic table
SELECT * FROM
TABLE(
   TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
);


-- Perform aggregation calculations based on fields provided by window functions
-- Real-time statistics of the total amount of each product, every 10 minutes
SELECT
    item,
    window_start,
    window_end,
    sum(price) as sum_price
FROM
TABLE(
    -- rolling event time window
   TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
)
group by item,window_start,window_end;
2. Rolling processing time window:
CREATE TABLE words (
    word STRING,
    proctime as PROCTIME() -- defines processing time, PROCTIME: function to obtain processing time
) WITH (
  'connector' = 'kafka',
  'topic' = 'words', -- the topic of the data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'latest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'csv' -- the format for reading data
);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words
java
spark

-- There is no difference in the sql syntax of processing time and event time in flink SQL
SELECT * FROM
TABLE(
   TUMBLE(TABLE words, DESCRIPTOR(proctime), INTERVAL '5' SECOND)
);


SELECT
    word,window_start,window_end,
    count(1) as c
FROM
TABLE(
   TUMBLE(TABLE words, DESCRIPTOR(proctime), INTERVAL '5' SECOND)
)
group by
    word,window_start,window_end
3. Sliding window:
-- HOP: sliding window function
-- A piece of data in a sliding window may fall into multiple windows

SELECT * FROM
TABLE(
   HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
);


-- Calculate the total amount of all products in the last 10 minutes every 5 minutes
SELECT
     window_start,
     window_end,
     sum(price) as sum_price
FROM
TABLE(
   HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
)
group by
    window_start,window_end
4. Session window:
CREATE TABLE words (
    word STRING,
    proctime as PROCTIME() -- defines processing time, PROCTIME: function to obtain processing time
) WITH (
  'connector' = 'kafka',
  'topic' = 'words', -- the topic of the data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'latest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'csv' -- the format for reading data
);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words
java
spark

select
    word,
    SESSION_START(proctime,INTERVAL '5' SECOND) as window_start,
    SESSION_END(proctime,INTERVAL '5' SECOND) as window_end,
    count(1) as c
from
    words
group by
    word,SESSION(proctime,INTERVAL '5' SECOND);
2. OVER aggregation:
1. Batch processing:

In the batch processing mode in Flink, the over function is consistent with hive.

SET 'execution.runtime-mode' = 'batch';
-- bounded flow
CREATE TABLE students_hdfs_batch (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem', -- Required: Specify the connector type
  'path' = 'hdfs://master:9000/data/student', -- Required: Specify the path
  'format' = 'csv' -- Required: file system connector specifies format
);

-- row_number,sum,count,avg,lag,lead,max,min
-- It should be noted that sum, when sorted, is an aggregation, and when it is not sorted, it is a global aggregation.
-- Get the top two oldest students in each class

select *
from(
    select
    *,
    row_number() over(partition by clazz order by age desc) as r
    from
    students_hdfs_batch
) as a
where r <=2
2. Stream processing:

Limitations on the use of over aggregation in flink stream processing

1. The order by field must be sorted in ascending order by the time field or you can add conditional filtering when using over_number.

2. In stream processing, Flink currently only supports over windows defined in ascending order of time attributes. Because in batch processing, the size of the data volume is fixed and no new data will be generated, so when doing sorting, only one sorting is required, so the sorting field can be specified casually, but in stream processing, the data volume is It is continuously generated. Every time you do a sorting, you need to take out all the previous data and store it. As time goes by, the amount of data will continue to increase, and the amount of calculations during sorting is very large. But according to the order of time, time is in order, which can reduce the cost of calculation.

3. You can also choose top N to reduce the amount of calculation.

4. When doing sorting in Flink, you need to consider the calculation cost. The generally used sorting field is the time field.

SET 'execution.runtime-mode' = 'streaming';
--Create kafka table
CREATE TABLE students_kafka (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING,
    proctime as PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- the topic of the data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'earliest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'csv' -- the format for reading data
);
-- In stream processing mode, flink can only sort by time field in ascending order.

-- If you sort by a common field, in stream processing mode, every new piece of data needs to be recalculated, and the calculation cost is too high.
-- Adding conditions based on row_number can limit the increasing cost of calculations

select * from (
select
    *,
    row_number() over(partition by clazz order by age desc) as r
from
    students_kafka
)
where r <= 2;


-- In stream processing mode, flink can only sort by time field in ascending order.
select
*,
sum(age) over(partition by clazz order by proctime)
from
students_kafka


-- time boundary
-- RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
select
*,
sum(age) over(
    partition byclazz
    order by proctime
    -- Statistics of the last 10 seconds
    RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
)
from
students_kafka /* + OPTIONS('scan.startup.mode' = 'latest-offset') */;


-- data boundaries
--ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
select
*,
sum(age) over(
    partition byclazz
    order by proctime
    -- Statistics of the last 10 seconds
   ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
)
from
students_kafka /* + OPTIONS('scan.startup.mode' = 'latest-offset') */;


kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students

1500100003,tom,22,female,Science Class 6
3. Order By:

When using order by to sort, the time field must be used in the sorted field:

-- The sorting field must be sorted in ascending order by time, using the time field: proctime
select * from
students_kafka
order by proctime,age;

--Limit the calculation cost of sorting and avoid global sorting. When using restrictions, you only need to sort the restricted ones when doing sorting, which reduces the calculation cost.

select *
from
students_kafka
order by age
limit 10;
4. Row_number deduplication
CREATE TABLE students_kafka (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING,
    proctime as PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- the topic of the data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'earliest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'csv' -- the format for reading data
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100003,tom,22,female,Science Class 6

select * from (
select
sid,name,age,
row_number() over(partition by sid order by proctime) as r
from students_kafka /* + OPTIONS('scan.startup.mode' = 'latest-offset') */
)
where r = 1;
5. JOIN

Regular Joins: Mainly used for batch processing. If used for stream processing, the status will become larger and larger.

Interval Join: Mainly used for dual-stream join

Temporal Joins: used to associate temporal tables with flow tables (different time states, such as exchange rate tables)

Lookup Join: used for flow table associated dimension tables (tables that do not change much)

1. Regular Joins
1. Batch processing:
CREATE TABLE students_hdfs_batch (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem', -- Required: Specify the connector type
  'path' = 'hdfs://master:9000/data/student', -- Required: Specify the path
  'format' = 'csv' -- Required: file system connector specifies format
);

CREATE TABLE score_hdfs_batch (
    sid STRING,
    cid STRING,
    score INT
)WITH (
  'connector' = 'filesystem', -- Required: Specify the connector type
  'path' = 'hdfs://master:9000/data/score', -- Required: Specify the path
  'format' = 'csv' -- Required: file system connector specifies format
);

SET 'execution.runtime-mode' = 'batch';

-- inner join
select a.sid,a.name,b.score from
students_hdfs_batch as a
inner join
score_hdfs_batch as b
on a.sid=b.sid;

--left join
select a.sid,a.name,b.score from
students_hdfs_batch as a
left join
score_hdfs_batch as b
on a.sid=b.sid;

-- full join
select a.sid,a.name,b.score from
students_hdfs_batch as a
full join
score_hdfs_batch as b
on a.sid=b.sid;
2. Stream processing:
CREATE TABLE students_kafka (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'kafka',
    'topic' = 'students', -- the topic of the data
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
    'properties.group.id' = 'testGroup', -- consumer group
    'scan.startup.mode' = 'latest-offset', -- the position of reading data earliest-offset latest-offset
    'format' = 'csv', -- the format for reading data
    'csv.ignore-parse-errors' = 'true' -- Automatically skip the current row if data parsing exception occurs
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100001,tom,22,female,Class 6, Liberal Arts
1500100002, tom1, 24, male, liberal arts class six
1500100003,tom2,22,female,Science Class 6

CREATE TABLE score_kafka (
    sid STRING,
    cid STRING,
    score INT
)WITH (
    'connector' = 'kafka',
    'topic' = 'scores', -- the topic of the data
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
    'properties.group.id' = 'testGroup', -- consumer group
    'scan.startup.mode' = 'latest-offset', -- the position of reading data earliest-offset latest-offset
    'format' = 'csv', -- the format for reading data
    'csv.ignore-parse-errors' = 'true'
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000002,5
1500100001,1000003,137


SET 'execution.runtime-mode' = 'streaming';

-- Use regular association methods for stream processing. Flink will always save the data of the two tables in the state, and the state will become larger and larger.
-- You can set the status validity period to prevent the status from increasing indefinitely
SET 'table.exec.state.ttl' = '5000';

-- full join
select a.sid,b.sid,a.name,b.score from
students_kafka as a
full join
score_kafka as b
on a.sid=b.sid;
Note: When using stream processing join, first in stream processing mode, the real-time data in the two tables will be stored in the current state

Hypothesis: The premise is the stream processing mode, and the names and grades in the two real-time tables need to be associated together. At this time, join is used. After a long period of time, say one year, the student names and grades can still be associated. Together, the reason is that the previous data will be stored in the state, but it will also cause problems. As time goes by, there will be more and more data in the state. May cause the task to fail.

You can specify the time to save the state through parameters. Once the time has passed, the state will disappear and the data will no longer exist:

-- Use the regular association method for stream processing. Flink will always save the data of the two tables in the state, and the state will become larger and larger.
-- You can set the status validity period to prevent the status from increasing indefinitely
SET 'table.exec.state.ttl' = '5000';



  'csv.ignore-parse-errors' = 'true'
-- Automatically skip the current line if data parsing exception occurs
2. Interval Join

When two tables are joined, only data within a period of time is associated. Previous data does not need to be saved in the state, which can avoid infinite growth of the state.

CREATE TABLE students_kafka_time (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)WITH (
    'connector' = 'kafka',
    'topic' = 'students', -- the topic of the data
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
    'properties.group.id' = 'testGroup', -- consumer group
    'scan.startup.mode' = 'latest-offset', -- the position of reading data earliest-offset latest-offset
    'format' = 'csv', -- the format for reading data
    'csv.ignore-parse-errors' = 'true' -- Automatically skip the current row if data parsing exception occurs
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100001,tom,22,female,Class 6, Liberal Arts,2023-11-10 17:10:10
1500100001, tom1, 24, male, liberal arts class 6, 2023-11-10 17:10:11
1500100001,tom2,22,female,Science Class 6,2023-11-10 17:10:12

CREATE TABLE score_kafka_time (
    sid STRING,
    cid STRING,
    score INT,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)WITH (
    'connector' = 'kafka',
    'topic' = 'scores', -- the topic of the data
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
    'properties.group.id' = 'testGroup', -- consumer group
    'scan.startup.mode' = 'latest-offset', -- the position of reading data earliest-offset latest-offset
    'format' = 'csv', -- the format for reading data
    'csv.ignore-parse-errors' = 'true'
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98,2023-11-10 17:10:09
1500100001,1000002,5,2023-11-10 17:10:11
1500100001,1000003,137,2023-11-10 17:10:12

-- a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts
--The time of table a data needs to be within the range of the time of table b data minus 5 seconds to the time of table b data
SELECT a.sid,b.sid,a.name,b.score
FROM students_kafka_time a, score_kafka_time b
WHERE a.sid = b.sid
AND a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts
3. Temporal Joins

1. Used to associate temporal tables with flow tables, such as order tables and exchange rate tables.

2. Each time data will have different states. If you just use ordinary correlation, it can be correlated to the latest data.

-- order form
CREATE TABLE orders (
    order_id STRING, -- order number
    price DECIMAL(32,2), --order amount
    currency STRING, -- exchange rate number
    order_time TIMESTAMP(3), -- order time
    WATERMARK FOR order_time AS order_time -- water level line
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders', -- the topic of data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'latest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'csv' -- the format for reading data
);

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
001,100,CN,2023-11-11 09:48:10
002,200,CN,2023-11-11 09:48:11
003,300,CN,2023-11-11 09:48:14
004,400,CN,2023-11-11 09:48:16
005,500,CN,2023-11-11 09:48:18

-- Exchange rate table
CREATE TABLE currency_rates (
    currency STRING, -- exchange rate number
    conversion_rate DECIMAL(32, 2), -- exchange rate
    update_time TIMESTAMP(3), -- exchange rate update time
    WATERMARK FOR update_time AS update_time, -- water level
    PRIMARY KEY(currency) NOT ENFORCED -- primary key
) WITH (
  'connector' = 'kafka',
  'topic' = 'currency_rates', -- the topic of data
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
  'properties.group.id' = 'testGroup', -- consumer group
  'scan.startup.mode' = 'earliest-offset', -- the position of reading data earliest-offset latest-offset
  'format' = 'canal-json' -- the format for reading data
);

insert into currency_rates
values
('CN',7.2,TIMESTAMP'2023-11-11 09:48:05'),
('CN',7.1,TIMESTAMP'2023-11-11 09:48:10'),
('CN',6.9,TIMESTAMP'2023-11-11 09:48:15'),
('CN',7.4,TIMESTAMP'2023-11-11 09:48:20');

kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates

-- If you use the regular correlation method, the latest exchange rate will be obtained, not the exchange rate at the corresponding time.
select a.order_id,b.* from
orders as a
left join
currency_rates as b
on a.currency=b.currency;


-- Temporal table join
-- FOR SYSTEM_TIME AS OF orders.order_time: Use the time of the order table to query the data of the corresponding time in the exchange rate table
SELECT
     order_id,
     price,
     conversion_rate,
     order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

4. Look Join: Mainly used to associate dimension tables. Dimension table: refers to a table whose data does not change much.

1. The traditional way is to read all the data in the database into the flow table, and when a piece of data comes, it will be associated with a piece of data. If the student table in the database is updated, Flink does not know and cannot associate the latest data.

2. The principle of Look Join: when the data in the flow table changes, the data will be queried from the data source of the associated field dimension table.

optimization:

You can use cache to cache data when using it. However, as time goes by, the number of caches will become larger and larger. At this time, you can set an expiration time for the cache. You can set parameters when creating the table:

 'lookup.cache.max-rows' = '1000', -- the maximum number of cached rows
 'lookup.cache.ttl' = '20000' -- Cache expiration time
--student table
CREATE TABLE students_jdbc (
    id BIGINT,
    name STRING,
    age BIGINT,
    gender STRING,
    clazz STRING,
    PRIMARY KEY (id) NOT ENFORCED -- primary key
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'students',
    'username' ='root',
    'password' ='123456',
    'lookup.cache.max-rows' = '1000', -- the maximum number of cached rows
    'lookup.cache.ttl' = '20000' -- cache expiration time
);

-- Score table
CREATE TABLE score_kafka (
    sid BIGINT,
    cid STRING,
    score INT,
    proc_time as PROCTIME()
)WITH (
    'connector' = 'kafka',
    'topic' = 'scores', -- the topic of the data
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker list
    'properties.group.id' = 'testGroup', -- consumer group
    'scan.startup.mode' = 'latest-offset', -- the position of reading data earliest-offset latest-offset
    'format' = 'csv', -- the format for reading data
    'csv.ignore-parse-errors' = 'true'
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000002,5
1500100001,1000003,137


-- Use conventional association methods to associate dimension tables
-- 1. When the task is started, the dimension table will be loaded into the flink state. If the student table in the database is updated, flink will not know and cannot associate the latest data.
select
b.id,b.name,a.score
from
score_kafka as a
left join
students_jdbc as b
on a.sid=b.id;


-- lookup join
-- FOR SYSTEM_TIME AS OF a.proc_time: Use related fields to query the latest data in the dimension table
-- Advantages: Every time a piece of data comes from the flow table, it will be queried in mysql, which can be associated with the latest data.
-- Each query to mysql will reduce performance
select
b.id,b.name,a.score
from
score_kafka as a
left join
students_jdbc FOR SYSTEM_TIME AS OF a.proc_time as b
on a.sid=b.id;