Table of Contents
1. File system SQL connector
2. How to specify the file system type
3. How to specify the file format
4. Read the file system
4.1 Turn on directory monitoring
4.2 Available Metadata
5. Write out the file system
5.1 Create partition table
5.2 Rolling strategy, file merging, partition submission
5.3 Specify Sink Parallelism
6. Example_read kafka through FlinkSQL and write to hive table
6.1. Create a kafka source table for reading kafka
6.2. Create an hdfs sink table for writing to hdfs
6.3. insert into and write to hdfs_sink_table
6.4. Query hdfs_sink_table
6.5. Create hive table and specify local
1. File system SQL connector
File system connectors allow reading and writing data from local or distributed file systems
Official website link: File System SQL Connector
2. How to specify the file system type
Specify the file system type by ‘path’ = ‘protocol name:///path’ when creating the table
Refer to the official website: File system type
CREATE TABLE filesystem_table ( id INT, name STRING, ds STRING ) partitioned by (ds) WITH ( 'connector' = 'filesystem', -- local file system 'path' = 'file:///URI', -- HDFS file system 'path' = 'hdfs://URI', -- Alibaba Cloud Object Storage 'path' = 'oss://URI', 'format' = 'json' );
3. How to specify the file format
The FlinkSQL file system connector supports multiple formats for reading and writing files
For example, when the read source format is csv, json, Parquet… you can specify the corresponding format type when creating the table.
To parse the data and map it to the fields in the table
CREATE TABLE filesystem_table_file_format ( id INT, name STRING, ds STRING ) partitioned by (ds) WITH ( 'connector' = 'filesystem', --Specify file format type 'format' = 'json|csv|orc|raw' );
4. Read file system
FlinkSQL can read data from a single file or an entire directory into a single table
Notice:
1. When reading a directory, read the files in the directory out of order
2. By default, the file is read in batch mode, and it will only scan the configuration path once and then stop.
When directory monitoring (source.monitor-interval) is enabled, the stream processing mode is enabled.
4.1 Turn on directory monitoring
Turn on directory monitoring by setting the source.monitor-interval
property to continue scanning when new files appear
Notice:
Only new files in the specified directory will be read, and old updated files will not be read.
-- Directory monitoring drop table filesystem_source_table; CREATE TABLE filesystem_source_table ( id INT, name STRING, `file.name` STRING NOT NULL METADATA ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016', 'format' = 'json', 'source.monitor-interval' = '3' -- Enable directory monitoring and set the monitoring interval ); --Continuous reading select * from filesystem_source_table;
4.2 Available Metadata
When using FLinkSQL to read data in the file system, metadata is supported to be read.
NOTE: All metadata is read-only
-- available Metadata drop table filesystem_source_table_read_metadata; CREATE TABLE filesystem_source_table_read_metadata ( id INT, name STRING, `file.path` STRING NOT NULL METADATA, `file.name` STRING NOT NULL METADATA, `file.size` BIGINT NOT NULL METADATA, `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012', 'format' = 'json' ); select * from filesystem_source_table_read_metadata;
Run results:
5. Write out the file system
5.1 Create Partition Table
FlinkSQL supports creating partitioned tables and writing data through insert into (append) and insert overwrite (cover)
--Create partition table drop table filesystem_source_table_partition; CREATE TABLE filesystem_source_table_partition ( id INT, name STRING, ds STRING ) partitioned by (ds) WITH ( 'connector' = 'filesystem', 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012', 'partition.default-name' = 'default_partition', 'format' = 'json' ); --Dynamic partition writing insert into filesystem_source_table_partition SELECT * FROM (VALUES (1,'a','20231010') , (2,'b','20231010') , (3,'c','20231011') , (4,'d','20231011') , (5,'e','20231012') , (6,'f','20231012') ) AS user1 (id,name,ds); -- Static partition writing insert into filesystem_source_table_partition partition(ds = '20231010') SELECT * FROM (VALUES (1,'a') , (2,'b') , (3,'c') , (4,'d') , (5,'e') , (6,'f') ) AS user1 (id,name); -- Query partition table data select * from filesystem_source_table_partition where ds = '20231010';
5.2 Rolling strategy, file merging, partition submission
You can read the previous blog: Flink bucket strategy when writing files
Official website link: Official website bucketing strategy
5.3 Specify Sink Parallelism
When using FlinkSQL to write out to the file system, you can set the parallelism of the sink operator through sink.parallelism
Note: Configuring sink parallelism is only supported if and only if the upstream changelog mode is INSERT-ONLY. Otherwise, the program will throw an exception
CREATE TABLE hdfs_sink_table ( `log` STRING, `dt` STRING, -- partition field, day `hour` STRING -- Partition field, hour ) partitioned by (dt,`hour`) WITH ( 'connector' = 'filesystem', 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka', 'sink.parallelism' = '2', -- Specify the degree of parallelism of the sink operator 'format' = 'raw' );
6. Example_Read kafka through FlinkSQL and write to hive table
need:
Use FlinkSQL to write kafka data to the hdfs specified directory
Partition based on kafka’s timestamp (partitioned by hour)
6.1. Create kafka source table for reading kafka
-- TODO When creating and reading a kafka table, read the kafka metadata field at the same time drop table kafka_source_table; CREATE TABLE kafka_source_table( `log` STRING, `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- the timestamp of the message ) WITH ( 'connector' = 'kafka', 'topic' = '20231017', 'properties.bootstrap.servers' = 'worker01:9092', 'properties.group.id' = 'FlinkConsumer', 'scan.startup.mode' = 'earliest-offset', 'format' = 'raw' );
6.2. Create hdfs sink table for writing out to hdfs
drop table hdfs_sink_table; CREATE TABLE hdfs_sink_table ( `log` STRING, `dt` STRING, -- partition field, day `hour` STRING -- Partition field, hour ) partitioned by (dt,`hour`) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka', 'sink.parallelism' = '2', -- Specify the degree of parallelism of the sink operator 'format' = 'raw' );
6.3, insert into into hdfs_sink_table
-- streaming sql, insert file system table insert into hdfs_sink_table select log ,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt ,DATE_FORMAT(`timestamp`,'HH') as `hour` from kafka_source_table;
6.4, Query hdfs_sink_table
-- batch sql, use partition pruning for selection select * from hdfs_sink_table;
6.5. Create hive table, specify local
create table `kafka_to_hive` ( `log` string comment 'log data') comment 'buried log data' PARTITIONED BY (dt string,`hour` string) row format delimited fields terminated by '\t' lines terminated by '\ ' stored as orc LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';