2.2 How to use FlinkSQL to read & write to the file system (HDFS\Local\Hive)

Table of Contents

1. File system SQL connector

2. How to specify the file system type

3. How to specify the file format

4. Read the file system

4.1 Turn on directory monitoring

4.2 Available Metadata

5. Write out the file system

5.1 Create partition table

5.2 Rolling strategy, file merging, partition submission

5.3 Specify Sink Parallelism

6. Example_read kafka through FlinkSQL and write to hive table

6.1. Create a kafka source table for reading kafka

6.2. Create an hdfs sink table for writing to hdfs

6.3. insert into and write to hdfs_sink_table

6.4. Query hdfs_sink_table

6.5. Create hive table and specify local


1. File system SQL connector

File system connectors allow reading and writing data from local or distributed file systems

Official website link: File System SQL Connector

2. How to specify the file system type

Specify the file system type by ‘path’ = ‘protocol name:///path’ when creating the table

Refer to the official website: File system type

CREATE TABLE filesystem_table (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  -- local file system
  'path' = 'file:///URI',
  -- HDFS file system
  'path' = 'hdfs://URI',
  -- Alibaba Cloud Object Storage
  'path' = 'oss://URI',
  'format' = 'json'
);

3. How to specify the file format

The FlinkSQL file system connector supports multiple formats for reading and writing files

For example, when the read source format is csv, json, Parquet… you can specify the corresponding format type when creating the table.

To parse the data and map it to the fields in the table

CREATE TABLE filesystem_table_file_format (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  --Specify file format type
  'format' = 'json|csv|orc|raw'
);

4. Read file system

FlinkSQL can read data from a single file or an entire directory into a single table

Notice:

1. When reading a directory, read the files in the directory out of order

2. By default, the file is read in batch mode, and it will only scan the configuration path once and then stop.

When directory monitoring (source.monitor-interval) is enabled, the stream processing mode is enabled.

4.1 Turn on directory monitoring

Turn on directory monitoring by setting the source.monitor-interval property to continue scanning when new files appear

Notice:

Only new files in the specified directory will be read, and old updated files will not be read.

-- Directory monitoring
drop table filesystem_source_table;
CREATE TABLE filesystem_source_table (
  id INT,
  name STRING,
  `file.name` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016',
  'format' = 'json',
  'source.monitor-interval' = '3' -- Enable directory monitoring and set the monitoring interval
);

--Continuous reading
select * from filesystem_source_table;

4.2 Available Metadata

When using FLinkSQL to read data in the file system, metadata is supported to be read.

NOTE: All metadata is read-only

-- available Metadata
drop table filesystem_source_table_read_metadata;
CREATE TABLE filesystem_source_table_read_metadata (
  id INT,
  name STRING,
  `file.path` STRING NOT NULL METADATA,
  `file.name` STRING NOT NULL METADATA,
  `file.size` BIGINT NOT NULL METADATA,
  `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  'format' = 'json'
);

select * from filesystem_source_table_read_metadata;

Run results:

5. Write out the file system

5.1 Create Partition Table

FlinkSQL supports creating partitioned tables and writing data through insert into (append) and insert overwrite (cover)

--Create partition table
drop table filesystem_source_table_partition;
CREATE TABLE filesystem_source_table_partition (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  'partition.default-name' = 'default_partition',
  'format' = 'json'
);

--Dynamic partition writing
insert into filesystem_source_table_partition
SELECT * FROM (VALUES
  (1,'a','20231010')
, (2,'b','20231010')
, (3,'c','20231011')
, (4,'d','20231011')
, (5,'e','20231012')
, (6,'f','20231012')
) AS user1 (id,name,ds);

-- Static partition writing
insert into filesystem_source_table_partition partition(ds = '20231010')
SELECT * FROM (VALUES
  (1,'a')
, (2,'b')
, (3,'c')
, (4,'d')
, (5,'e')
, (6,'f')
) AS user1 (id,name);

-- Query partition table data
select * from filesystem_source_table_partition where ds = '20231010';

5.2 Rolling strategy, file merging, partition submission

You can read the previous blog: Flink bucket strategy when writing files

Official website link: Official website bucketing strategy

5.3 Specify Sink Parallelism

When using FlinkSQL to write out to the file system, you can set the parallelism of the sink operator through sink.parallelism

Note: Configuring sink parallelism is only supported if and only if the upstream changelog mode is INSERT-ONLY. Otherwise, the program will throw an exception

CREATE TABLE hdfs_sink_table (
  `log` STRING,
  `dt` STRING, -- partition field, day
  `hour` STRING -- Partition field, hour
) partitioned by (dt,`hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  'sink.parallelism' = '2', -- Specify the degree of parallelism of the sink operator
  'format' = 'raw'
);

6. Example_Read kafka through FlinkSQL and write to hive table

need:

Use FlinkSQL to write kafka data to the hdfs specified directory

Partition based on kafka’s timestamp (partitioned by hour)

6.1. Create kafka source table for reading kafka

-- TODO When creating and reading a kafka table, read the kafka metadata field at the same time
drop table kafka_source_table;
CREATE TABLE kafka_source_table(
  `log` STRING,
  `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- the timestamp of the message
) WITH (
  'connector' = 'kafka',
  'topic' = '20231017',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'raw'
);

6.2. Create hdfs sink table for writing out to hdfs

drop table hdfs_sink_table;
CREATE TABLE hdfs_sink_table (
  `log` STRING,
  `dt` STRING, -- partition field, day
  `hour` STRING -- Partition field, hour
) partitioned by (dt,`hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  'sink.parallelism' = '2', -- Specify the degree of parallelism of the sink operator
  'format' = 'raw'
);

6.3, insert into into hdfs_sink_table

-- streaming sql, insert file system table
insert into hdfs_sink_table
select
log
,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt
,DATE_FORMAT(`timestamp`,'HH') as `hour`
from kafka_source_table;

6.4, Query hdfs_sink_table

-- batch sql, use partition pruning for selection
select * from hdfs_sink_table;

6.5. Create hive table, specify local

create table `kafka_to_hive` (
`log` string comment 'log data')
 comment 'buried log data' PARTITIONED BY (dt string,`hour` string)
row format delimited fields terminated by '\t' lines terminated by '\
' stored as orc
LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';