21. Integration of Flink’s table API and DataStream API (2) – batch processing mode and insert-only stream processing

Flink series of articles

1. Flink column

The Flink column systematically introduces a certain knowledge point and explains it with specific examples.

  • 1. Flink deployment series
    This section introduces the basic content related to the deployment and configuration of Flink.

  • 2. Flink basic series
    This part introduces the basic parts of Flink, such as terminology, architecture, programming model, programming guide, basic datastream API usage, four cornerstones, etc.

  • 3. Flik Table API and SQL basic series
    This section introduces the basic usage of Flink Table API and SQL, such as Table API and SQL creation library, table usage, query, window function, catalog, etc.

  • 4. Flik Table API and SQL improvement and application series
    This part is the application part of table api and sql, which is more closely related to actual production applications and contains content that is difficult to develop.

  • 5. Flink monitoring series
    This part is related to actual operation, maintenance and monitoring work.

2. Flink example column

The Flink example column is an auxiliary explanation for the Flink column. It generally does not introduce information on knowledge points, but more often provides examples that can be used concretely. This column is no longer divided into categories, and the content of the introduction can be seen through the links.

Click for entry to all articles in the two columns: Summary index of Flink series articles

Article directory

  • Flink series of articles
  • 1. Integration of Table API and DataStream API
    • 4. Batch processing mode
      • 1), Changelog Unification
    • 5. Handling of (Insert-Only) Streams
      • 1), fromDataStream example
      • 2), createTemporaryView example
      • 3), toDataStream example

This article is the second article on the integration of Flink table api and datastream api. It focuses on integration and insert-only processing in batch mode, and explains it with specific examples.
This article relies on flink and kafka clusters to be used normally.
This article is divided into 2 parts, namely integration and insert-only processing in batch mode.
The examples in this article are run in Flink version 1.17.

1. Table API and DataStream API integration

4. Batch processing mode

Batch runtime mode is a specialized execution mode for bounded Flink programs.

In general, boundedness is a property of a data source that tells us whether all records from that source are known before execution, or whether new data will be revealed, possibly indefinitely. Conversely, a job is bounded if all its sources are bounded, otherwise it is unbounded.

On the other hand, the streaming runtime pattern can be used for both bounded and unbounded jobs.

For more information on the different execution modes, see the corresponding DataStream API section.

The Table API and SQL Planner provide a specialized set of optimizer rules and runtime operators for either of these modes.

As of Flink version 1.17, the runtime mode is not automatically derived from the source, so when instantiating a StreamTableEnvironment, the runtime mode must be set explicitly or will be adopted from the StreamExecutionEnvironment:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;

// adopt mode from StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// or

// set mode explicitly for StreamTableEnvironment
// it will be propagated to StreamExecutionEnvironment during planning
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode());

Before setting the runtime mode to BATCH, the following prerequisites must be met:

  • All sources must declare themselves bounded.
  • As of Flink version 1.17, table sources must emit only insert changes.
  • The operator requires sufficient off-heap memory for sorting and other intermediate results.
  • All table operations must be available in batch mode. As of Flink version 1.17, some of them are only available in streaming mode. Please check out the corresponding Table API and SQL pages.

Batch execution has the following meanings (among others):

  • Progressive watermarks are neither generated nor used in operators. However, the source emits a maximum watermark before being closed.
  • Depending on execution.batch-shuffle-mode, exchanges between tasks may be blocked. This also means potentially reduced resource requirements compared to executing the same pipeline in streaming mode.
  • Checkpoints are disabled. Inserted artificial state backend.
  • Table operations do not produce incremental updates, but only a complete final result that is converted into an insert-only changelog stream.

Since batch processing can be considered a special case of stream processing, we recommend implementing the streaming pipeline first as it is the most versatile implementation for both bounded and unbounded data.
In theory, a stream pipeline can execute all operators. However, in practice some operations may not make much sense as they will result in a growing state and are therefore not supported. Global sorting is an example available only in batch mode. Simply put: it should be possible to run a workflow pipeline in batch mode, but not necessarily the other way around.

The following example demonstrates how to use a DataGen table source in batch mode. Many sources provide the option to implicitly make a connector bounded, for example, by defining a termination offset or timestamp. In our example, we limit the number of rows using the number-of-rows option.

public static void test5() throws Exception {<!-- -->
// 1. Create a running environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
\t\t
//Create table
Table table =
tenv.from(
TableDescriptor.forConnector("datagen")
.option("number-of-rows", "5") // make the source bounded
.schema(
Schema.newBuilder()
.column("uid", DataTypes.TINYINT())
.column("payload", DataTypes.STRING())
.build())
.build());
\t\t
//Convert to datastream and output
tenv.toDataStream(table)
.keyBy(r -> r.<Byte>getFieldAs("uid"))
.map(r -> "alan_payload: " + r.<String>getFieldAs("payload"))
.executeAndCollect()
.forEachRemaining(System.out::println);

env.execute();
}
  • output
alan_payload: 143dc81ed1cf71d9b7a4f8088cae78b5fd919f0ba2bc57e24828c18dea47fb9e84f4ce6a74d0f18285c8c66b9587947a81b1
alan_payload: c3bc0a98d286c9db33a02896bca16ac327f267183e16bc42c813741297ed3f51b998dc45d23231d2ca06677072c21b222369
alan_payload: ce3bae6e08c4dbef6b4d4517b426c76792b788126747c494110a48e6b4909920602643e37323e64038e64cc2d359476e7495
alan_payload: b22c2ac79d2e9be20caf3c311d12637dc42422f7d25132750b4afbb8e8dd341d0f767e42e70874f7207cf5a24c7d1caea713
alan_payload: d1bb8a7fe2077efaa61dc4befe8fef884c257c5c201c62bbac11787a222b70df021e16cba32d5cfc42527589af45dc968c7f

1), Changelog Unification

In most cases, the pipeline definition itself remains unchanged in both Table API and DataStream API when switching from streaming mode to batch mode and vice versa. However, as mentioned earlier, since incremental operations in batch mode are avoided, the resulting changelog streams may be different.
Time-based operations that rely on event times and utilize watermarks as integrity markers can generate insert-only changelog streams that are independent of runtime mode.

The following Java example demonstrates a Flink program that is unified not only at the API level, but also in the generated changelog stream.
This example joins two tables in SQL, UserTable and OrderTable, using interval join based on time attributes in both tables (ts).
It uses the DataStream API to implement a custom operator that deduplicates usernames using KeyedProcessFunction and value state.

See the output comment section for the running results.

public static void test6() throws Exception {<!-- -->
// 1. Create a running environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
\t\t
//Data source userStream
DataStream<Row> userStream = env
.fromElements(
Row.of(LocalDateTime.parse("2023-11-13T17:50:00"), 1, "alan"),
Row.of(LocalDateTime.parse("2023-11-13T17:55:00"), 2, "alanchan"),
Row.of(LocalDateTime.parse("2023-11-13T18:00:00"), 2, "alanchanchn"))
.returns(
Types.ROW_NAMED(
new String[] {<!-- -->"ts", "uid", "name"},
Types.LOCAL_DATE_TIME, Types.INT, Types.STRING));
\t\t
//Data source orderStream
DataStream<Row> orderStream = env
.fromElements(
Row.of(LocalDateTime.parse("2023-11-13T17:52:00"), 1, 122),
Row.of(LocalDateTime.parse("2023-11-13T17:57:00"), 2, 239),
Row.of(LocalDateTime.parse("2023-11-13T18:01:00"), 2, 999))
.returns(
Types.ROW_NAMED(
new String[] {<!-- -->"ts", "uid", "amount"},
Types.LOCAL_DATE_TIME, Types.INT, Types.INT));
\t\t
//Create view UserTable
tenv.createTemporaryView(
"UserTable",
userStream,
Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP(3))
.column("uid", DataTypes.INT())
.column("name", DataTypes.STRING())
.watermark("ts", "ts - INTERVAL '1' SECOND")
.build());
\t\t
//Create view OrderTable
tenv.createTemporaryView(
"OrderTable",
orderStream,
Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP(3))
.column("uid", DataTypes.INT())
.column("amount", DataTypes.INT())
.watermark("ts", "ts - INTERVAL '1' SECOND")
.build());
\t\t
// Establish an association between OrderTable and UserTable
Table joinedTable =
tenv.sqlQuery(
"SELECT U.name, O.amount " +
"FROM UserTable U, OrderTable O " +
"WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES");
\t\t
//Convert table to datastream
DataStream<Row> joinedStream = tenv.toDataStream(joinedTable);

joinedStream.print();
// + I[alanchan, 239]
// + I[alanchanchn, 999]
// + I[alan, 122]
\t\t\t\t\t\t
env.execute();
}
\t
  • Use ProcessFunction and ValueState to create custom operators
    In the above example, just add the following code. The result of the operation is to output the name
//Use ProcessFunction and value state to implement custom operators
joinedStream
.keyBy(r -> r.<String>getFieldAs("name"))
.process(
new KeyedProcessFunction<String, Row, String>() {<!-- -->
ValueState<String> seen;

@Override
public void open(Configuration parameters) {<!-- -->
seen = getRuntimeContext().getState(
new ValueStateDescriptor<>("seen", String.class));
}

@Override
public void processElement(Row row, Context ctx, Collector<String> out)
throws Exception {<!-- -->
String name = row.getFieldAs("name");
if (seen.value() == null) {<!-- -->
seen.update(name);
out.collect(name);
}
}
})
.print();
// alan
// alanchan
// alanchanchn

5. Handling of (Insert-Only) Streams processing (insert-only) streams

StreamTableEnvironment provides the following methods for datastream conversion API:

  • fromDataStream(DataStream): Interprets only changes and any type of stream into a table. By default, event times and watermarks are not propagated.
  • fromDataStream(DataStream, Schema): Interprets only changes and any type of stream as a table. Optional modes allow enriching column data types and adding time attributes, watermarking strategies, additional calculated columns, or primary keys.
  • createTemporaryView(String, DataStream): Register a stream name (virtual table, view) that can be accessed in sql. It is a shortcut for createTemporaryView(String, fromDataStream(DataStream)).
  • createTemporaryView(String, DataStream, Schema): Register a stream name (virtual table, view) that can be accessed in sql. It is a shortcut for createTemporaryView(String, fromDataStream(DataStream, Schema)).
  • toDataStream(Table): Converts a table into a stream that only inserts changes. The default flow record type is org.apache.flink.types.Row. Write a single rowtime attribute column back to the record in the DataStream API. Watermarks also propagate.
  • toDataStream(Table, AbstractDataType): Converts a table into a stream that only inserts changes. This method accepts a data type to represent the required stream record type. The planner can insert implicit conversions and reorder columns to map columns to fields of (possibly nested) data types.
  • toDataStream(Table, Class): Shortcut to toDataStream(Table, DataTypes.of (Class)), used to quickly create the required data type reflectively.

From a Table API perspective, conversion to the DataStream API is similar to reading or writing a virtual table connector defined in SQL using the CREATE Table DDL.

The schema portion in a virtual CREATE TABLE name(schema) WITH(options) statement can be automatically derived from the DataStream’s type information, enriched, or completely manually defined using org.apache.flink.table.api.Schema.

The virtual DataStream table connector exposes the following metadata for every row:
The virtual DataStream table connector exposes the following metadata for each row:

Key Data Type Description R/W
rowtime TIMESTAMP_LTZ(3) NOT NULL Stream record’s timestamp. R/W

The virtual DataStream table source implements SupportsSourceWatermark, thus allowing calls to the source_WATERMARK() built-in function as a watermarking strategy to adopt watermarks from the DataStream API.

1), fromDataStream example

The code below shows how to use fromDataStream for different scenarios. The output results are in the output comment section of each step.

import java.time.Instant;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
public class TestFromDataStreamDemo {<!-- -->
\t
@NoArgsConstructor
@AllArgsConstructor
@Data
public static class User {<!-- -->
public String name;
public Integer score;
public Instant event_time;
}

public static void test1() throws Exception {<!-- -->
// 1. Create a running environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
\t\t
// 2. Create data source
DataStream<User> dataStream =
env.fromElements(
new User("alan", 4, Instant.ofEpochMilli(1000)),
new User("alanchan", 6, Instant.ofEpochMilli(1001)),
new User("alanchanchn", 10, Instant.ofEpochMilli(1002)));
\t\t
//Example 1, display the data type of table
// Illustrated simple use case when time-based operations are not required.
Table table = tenv.fromDataStream(dataStream);
// table.printSchema();
//(
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9)
// )
\t\t
//Example 2, add a column and display the data type of the table
// The most common use cases where these time-based operations should work within // handling time.
Table table2 = tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.build());
// table2.printSchema();
//(
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
// )
\t\t\t
//Example 3, add rowtime column and watermark
Table table3 =
tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
.build());
// table3.printSchema();
//(
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )
\t\t\t\t
//Example 4, add rowtime column and watermark (SOURCE_WATERMARK() watermark strategy assumes that it has been implemented, this part is just to show the usage)
// The most common use case is when time-based operations such as window or interval joins should be part of the pipeline.
Table table4 =
tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
// table4.printSchema();
//(
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )
\t\t
//Example 5. Modify the event_time type length and add event_time watermark strategy (SOURCE_WATERMARK() watermark strategy is assumed to have been implemented, this part is just to show the usage)
// Completely dependent on user declaration. This is useful for replacing generic types in the DataStream API (RAW in the Table API) with appropriate data types.
Table table5 =
tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.column("event_time", "TIMESTAMP_LTZ(3)")
.column("name", "STRING")
.column("score", "INT")
.watermark("event_time", "SOURCE_WATERMARK()")
.build());
table5.printSchema();
//(
// `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
// `name` STRING,
// `score` INT
// )
\t\t
env.execute();
}

public static void main(String[] args) throws Exception {<!-- -->
test1();
}

}

Since DataType is richer than TypeInformation, we can easily enable immutable POJOs and other complex data structures.
The following Java example shows what is possible.
Also check the DataStream API’s Data Types and Serialization page for more information about the types supported there.

package org.tablesql.convert;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
public class TestFromDataStreamDemo {<!-- -->

// The attributes of user2 are all added with the final modifier
public static class User2 {<!-- -->
public final String name;
public final Integer score;
public User2(String name, Integer score) {<!-- -->
this.name = name;
this.score = score;
}

}
\t
public static void test2() throws Exception {<!-- -->
// 1. Create a running environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
\t\t
//the DataStream API does not support immutable POJOs yet, the class will result in a generic type that is a RAW type in Table API by defaul
//DataStream API does not yet support immutable POJO. The result of this class will be a generic RAW type in Table API by default.
// 2. Create data source
DataStream<User2> dataStream = env.fromElements(
new User2("Alice", 4),
new User2("Bob", 6),
new User2("Alice", 10));
\t\t
//Example 1: Output table structure
Table table = tenv.fromDataStream(dataStream);
// table.printSchema();
//(
// `f0` RAW('org.tablesql.convert.TestFromDataStreamDemo$User2', '...')
// )

//Example 2: Declarative output table structure
// Use the table API's type system in the custom schema to declare more useful data types for the columns and rename the columns in the "as" projection below
Table table2 = tenv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column("f0", DataTypes.of(User2.class))
.build())
.as("user");
// table2.printSchema();
//(
// `user` *org.tablesql.convert.TestFromDataStreamDemo$User2<`name` STRING, `score` INT>*
// )
\t\t
//Example 3: Data types can be extracted reflectively as described above or defined explicitly
//
Table table3 = tenv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column(
"f0",
DataTypes.STRUCTURED(
User2.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT())))
.build())
.as("user");
table3.printSchema();
//(
// `user` *org.tablesql.convert.TestFromDataStreamDemo$User2<`name` STRING, `score` INT>*
// )
\t\t
env.execute();
}

public static void main(String[] args) throws Exception {<!-- -->
test2();
\t\t
}

}

2), createTemporaryView example

DataStream can be registered directly as a view.

Views created from a DataStream can only be registered as temporary views. Due to their inline/anonymous nature, they cannot be registered in the permanent catalog.
The code below shows how to use createTemporaryView for different scenarios. The results of each example run are shown as comments in the output section.

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author alanchan
 *
 */
public class TestCreateTemporaryViewDemo {<!-- -->

public static void test1() throws Exception {<!-- -->
// 1. Create a running environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

// 2. Create data source
DataStream<Tuple2<Long, String>> dataStream = env.fromElements(Tuple2.of(12L, "alan"), Tuple2.of(0L, "alanchan"));

//Example 1: Create view and output table structure
tenv.createTemporaryView("MyView", dataStream);
tenv.from("MyView").printSchema();
//(
// `f0` BIGINT NOT NULL,
// `f1` STRING
// )
\t\t
//Example 2: Create a view, output table structure, and use Schema to display defined columns, similar to the definition of fromDataStream
//In this example, the output NOT NULL is not defined
tenv.createTemporaryView(
"MyView",
dataStream,
Schema.newBuilder()
.column("f0", "BIGINT")
.column("f1", "STRING")
.build());
tenv.from("MyView").printSchema();
//(
// `f0` BIGINT,
// `f1` STRING
// )
\t\t
//Example 3: Create a view and output the table structure
// Modify (or define) the column name before creating the view. As generally refers to renaming. The original names are f0 and f1.
tenv.createTemporaryView(
"MyView",
tenv.fromDataStream(dataStream).as("id", "name"));
tenv.from("MyView").printSchema();
//(
// `id` BIGINT NOT NULL,
// `name` STRING
// )
\t\t
env.execute();
}

/**
* @param args
* @throwsException
*/
public static void main(String[] args) throws Exception {<!-- -->
test1();
}

}

3), toDataStream example

The code below shows how to use toDataStream in different scenarios. The results of each example run are shown as comments in the output section.

import java.time.Instant;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
public class TestToDataStreamDemo {<!-- -->
@NoArgsConstructor
@AllArgsConstructor
@Data
public static class User {<!-- -->
public String name;
public Integer score;
public Instant event_time;
}

static final String SQL = "CREATE TABLE GeneratedTable "
+ "("
+ "name STRING,"
+ "score INT,"
+ "event_time TIMESTAMP_LTZ(3),"
+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
+ ")"
+ "WITH ('connector'='datagen')";
\t
\t
public static void test1() throws Exception {<!-- -->
// 1. Create a running environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
\t\t
// 2. Create table
tenv.executeSql(SQL);
Table table = tenv.from("GeneratedTable");
\t\t
// Example 1: table to datastream
// Use the default Row instance conversion
// Since `event_time` is a single row time attribute, it is inserted into the DataStream metadata and the watermark is propagated
// DataStream<Row> dataStream = tenv.toDataStream(table);
// dataStream.print();
//The following is an example output, which is actually continuous data
// 10> + I[9b979ecef142c06746ff2be0f79f4afe7ef7089f60f267184e052c12ef5f2c2a144c73d3653bee51b351ed5b20ecaf0673ec, -1424631858, 2023-11-14T02:5 8:56.071Z]
// 1> + I[444998c8992accc54e2c10cac4f4a976cda516d84817a8fd728c9d013da3d87e91d28537a564f09fb07308142ca83c2548e9, -1240938499, 2023-11-1 4T02:58:56.071Z]
// 12> + I[fa42df01fe1f789535df26f81c2e58c02feaeba60338e4cfb7c8fdb06ed96c69b46e9a966d93d0cf811b24dd9434a8ef2253, 2039663083, 2023-11-14T0 2:58:56.070Z]
// 1> + I[25aa121a0d656a5355c32148a0c68cc39ac05443bd7de6a0c499a2daae85868422dd024c6803598133dc26a607cd1e60e747, 1912789884, 2023-11-1 4T02:58:56.071Z]

//Example 2: table to datastream
// Extract the data type from class "User", the planner reorders the fields and inserts implicit conversions where possible to convert the internal data structure to the required structured type
// Since `event_time` is a single row time attribute, it is inserted into the DataStream metadata and the watermark is propagated
DataStream<User> dataStream2 = tenv.toDataStream(table, User.class);
// dataStream2.print();
//The following is an example output, which is actually continuous data
// 4> TestToDataStreamDemo.User(name=e80b612e48443a292c11e28159c73475b9ef9531b91d5712420753d5d6041a06f5de634348210b151f4fc220b4ec91ed5c72, score=214656012 1, event_time=2023-11-14T03:01:17.657Z)
// 14> TestToDataStreamDemo.User(name=290b48dea62368bdb35567f31e5e2690ad8b5dd50c1c0f7184f15d2e85b24ea84155f1edef875f4c96e3a2133a320fcb6e41, score=2062379192 , event_time=2023-11-14T03:01:17.657Z)
// 12> TestToDataStreamDemo.User(name=a0b31a03ad951b53876445001bbc74178c9818ece7d5e53166635d40cb8ef07980eabd7463ca6be38b34b1f0fbd4e2251df0, score=16953697, event_ time=2023-11-14T03:01:17.657Z)

//Example 3: table to datastream
// Data types can be extracted reflectively as described above or defined explicitly
DataStream<User> dataStream3 =
tenv.toDataStream(
table,
DataTypes.STRUCTURED(
User.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT()),
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));
dataStream3.print();
//The following is an example output, which is actually continuous data
// 9> TestToDataStreamDemo.User(name=49550693e3cb3a41cd785504c699684bf2015f0ebff5918dbdea454291c265d316773f2d9507ce73dd18f91a2f5fdbd6e500, score=744771891, event_time=2023-11-14T03:06:13.010Z)
// 2> TestToDataStreamDemo.User(name=60589709fe41decb647fcf4e2f91d45c82961bbe64469f3ea8a9a12b0cac071481ec9cfd65a9c218e3799986dd72ab80e457, score=-1056249244, event_time= 2023-11-14T03:06:13.010Z)
// 15> TestToDataStreamDemo.User(name=d0a179f075c8b521bf5ecb08a32f6c715b5f2c616f815f8173c0a1c2961c53774faf396ddf55a44db49abe8085772f35d75c, score=86265136 1, event_time=2023-11-14T03:06:13.010Z)

env.execute();
}

public static void main(String[] args) throws Exception {<!-- -->
test1();
}

}

toDataStream only supports non-updating tables. In general, time-based operations (such as windows, interval joins or MATCH_RECOGNIZE clauses) are well suited to simple operations (such as projections and filtering) in insert-only pipelines.
Pipelines with operations that generate updates can use toChangelogStream.

Above, this article is the second article on the integration of Flink table api and datastream api. It mainly focuses on integration and insert-only processing in batch mode, and explains it with specific examples.