19. Custom functions and examples in Flink’s Table API and SQL (4)

Flink series of articles

1. Flink column

The Flink column systematically introduces a certain knowledge point and explains it with specific examples.

  • 1. Flink deployment series
    This section introduces the basic content related to the deployment and configuration of Flink.

  • 2. Flink basic series
    This part introduces the basic parts of Flink, such as terminology, architecture, programming model, programming guide, basic datastream API usage, four cornerstones, etc.

  • 3. Flik Table API and SQL basic series
    This section introduces the basic usage of Flink Table API and SQL, such as Table API and SQL creation library, table usage, query, window function, catalog, etc.

  • 4. Flik Table API and SQL improvement and application series
    This part is the application part of table api and sql, which is more closely related to actual production applications and contains content that is difficult to develop.

  • 5. Flink monitoring series
    This part is related to actual operation, maintenance and monitoring work.

2. Flink example column

The Flink example column is an auxiliary explanation for the Flink column. It generally does not introduce information on knowledge points, but more often provides examples that can be used concretely. This column is no longer divided into categories, and the content of the introduction can be seen through the links.

Click for entry to all articles in the two columns: Summary index of Flink series articles

Article directory

  • Flink series of articles
    • 7. Apply custom functions in sql clinet
      • 1), implement custom functions
      • 2) Package and upload the jar to the lib directory of flink
      • 3), verification
        • 1. Create table
        • 2. Initialize table data
        • 3. Register function
        • 4. Verify custom functions
    • 8. Pojo data type application example-table-valued function

This article shows the application of custom functions in Flink sql client and examples of using pojo in custom functions.
This article relies on flink and kafka clusters to be used normally.
This article is divided into two parts, namely the application of custom functions in Flink sql client and the use of pojo data types in custom functions.
The examples in this article are run in Flink version 1.17 unless otherwise specified.

7. Apply custom functions in sql clinet

This example packages the customized functions above and applies them in flink sql client.

1), implement custom functions

All the examples in this article need to rely on maven, see the previous article of this article: 17. Flink’s Table API: Operations supported by the Table API (1)
Or introduce

 <!-- flink dependency introduction -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
  • Sample code
package org.table_sql;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

/**
 * @author alanchan
 *
 */

@FunctionHint(output = @DataTypeHint("ROW<id int, name String, age int, balance int, rowtime string>"))
public class Alan_SplitFunction extends TableFunction<Row> {<!-- -->
\t
public void eval(String str) {<!-- -->
String[] row = str.split(",");
collect(Row.of(Integer.valueOf(row[0]), row[1], Integer.valueOf(row[2]), Integer.valueOf(row[3]), row[4]));
}
\t
}

2), package and upload the jar to the lib directory of flink

Package the file into a jar file. In particular, pay attention to whether the flink running environment conflicts with the jar files introduced by the package. The recommended method is to only package the jar files that the custom function depends on, and use the jar files of the flink deployment environment for other jars.
The packaged file name of this example: Alan_SplitFunction.jar
After uploading the jar file, restart the flink cluster.

3), verification

1. Create table
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.

Flink SQL> CREATE TABLE alan_split_table (
> userString STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'alan_split',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.

Flink SQL> select * from alan_split_table;
[INFO] Result retrieval canceled.

2. Initialize table data

This example is data inserted through the kafka queue, provided that the kafka environment is easy to use.

[alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_split
>"11,alan,18,20,1699341167461"
>"12,alan,19,25,1699341168464"
>"13,alan,20,30,1699341169472"
>"14,alanchan,18,22,1699341170479"
>"15,alanchan,19,25,1699341171482"


Flink SQL> select * from alan_split_table;
 + ---- + -------------------------------- +
| op | userString |
 + ---- + -------------------------------- +
| + I | 11,alan,18,20,1699341167461 |
| + I | 12,alan,19,25,1699341168464 |
| + I | 13,alan,20,30,1699341169472 |
| + I | 14,alanchan,18,22,169934117... |
| + I | 15,alanchan,19,25,169934117... |
3. Register function

Register the custom function as a temporary function of flink. The temporary function only works in the current session. If you register it as another function, refer to the following syntax

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
  [IF NOT EXISTS] [[catalog_name.]db_name.]function_name
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
  [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]

#TEMPORARY
# Create a temporary catalog function with catalog and database namespace, and overwrite the original catalog function.

# TEMPORARY SYSTEM
# Create a temporary system catalog function without a database namespace and override the system's built-in functions.

This example is registered as a temporary function, as follows

Flink SQL> CREATE TEMPORARY FUNCTION alan_split AS 'org.table_sql.Alan_SplitFunction';
[INFO] Execute statement succeed.

Flink SQL> show functions;
 + ----------------------- +
| function name |
 + ----------------------- +
| IFNULL |
| SOURCE_WATERMARK |
| TYPEOF |
| abs |
| acos |
| alan_split |
| and |
| array |
. . . . . .
4. Verify custom function
Flink SQL> SELECT userString, t_id, t_name,t_age,t_balance,t_rowtime
> FROM alan_split_table
> LEFT JOIN LATERAL TABLE(alan_split(userString)) AS T(t_id, t_name,t_age,t_balance,t_rowtime) ON TRUE;
 + ---- + -------------------------------- + ----------- -- + -------------------------------- + ------------- + ------------- + -------------------------------- +
| op | userString | t_id | t_name | t_age | t_balance | t_rowtime |
 + ---- + -------------------------------- + ----------- -- + -------------------------------- + ------------- + ------------- + -------------------------------- +
| + I | 11,alan,18,20,1699341167461 | 11 | alan | 18 | 20 | 1699341167461 |
| + I | 12,alan,19,25,1699341168464 | 12 | alan | 19 | 25 | 1699341168464 |
| + I | 13,alan,20,30,1699341169472 | 13 | alan | 20 | 30 | 1699341169472 |
| + I | 14,alanchan,18,22,169934117... | 14 | alanchan | 18 | 22 | 1699341170479 |
| + I | 15,alanchan,19,25,169934117... | 15 | alanchan | 19 | 25 | 1699341171482 |

At this point, the verification of custom function registration to flink sql client is completed.

8. Pojo data type application example-table-valued function

Function reference 19. [4. Table-valued functions-custom function description and examples] in Flink’s Table API and SQL (2)
This example only shows the use of pojo objects in custom functions.

This example is just an implementation method. You can also override getTypeInference and provide all components programmatically, so I won’t go into details.

This example only uses table-valued functions as an example, and other custom functions are similar.

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

import java.util.Arrays;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
public class TestUDTableFunctionDemo2 {<!-- -->

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class User {<!-- -->
private int id;
private String name;
private int age;
private int balance;
private String rowtime;
}

// @FunctionHint(output = @DataTypeHint("User<id int, name String, age int, balance int, rowtime string>"))
// public static class OverloadedFunction extends TableFunction<Row> {<!-- -->
@FunctionHint(output =@DataTypeHint(bridgedTo = User.class))
public static class OverloadedFunction extends TableFunction<User> {<!-- -->

public void eval(String str) {<!-- -->
String[] user = str.split(",");
//Use Row data type
// collect(Row.of(Integer.valueOf(user[0]), user[1], Integer.valueOf(user[2]), Integer.valueOf(user[3]), user[4]));
//Use User pojo data type
collect(new User(Integer.valueOf(user[0]), user[1], Integer.valueOf(user[2]), Integer.valueOf(user[3]), user[4]));
}
\t\t
}

public static void main(String[] args) throws Exception {<!-- -->
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

DataStream<String> row = env.fromCollection(
//id name age balance rowtime
Arrays.asList(
"11,alan,18,20,1699341167461",
"12,alan,19,25,1699341168464",
"13,alan,20,30,1699341169472",
"14,alanchan,18,22,1699341170479",
"15,alanchan,19,25,1699341171482"
)
);
Table usersTable2 = tenv.fromDataStream(row, $("userString"));
tenv.createTemporarySystemFunction("OverloadedFunction", OverloadedFunction.class);
Table result5 = usersTable2
.leftOuterJoinLateral(call("OverloadedFunction", $("userString")).as("t_id","t_name","t_age","t_balance","t_rowtime "))
.select($("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime")
// .select($("userString"),$("t_id"),$("t_name"),$("t_age"),$("t_balance"), $("t_rowtime")
) ;
\t\t\t
DataStream<Tuple2<Boolean, Row>> result5DS = tenv.toRetractStream(result5, Row.class);
result5DS.print();
// 15> (true, + I[15, alanchan, 19, 25, 1699341171482])
// 12> (true, + I[12, alan, 19, 25, 1699341168464])
// 13> (true, + I[13, alan, 20, 30, 1699341169472])
// 11> (true, + I[11, alan, 18, 20, 1699341167461])
// 14> (true, + I[14, alanchan, 18, 22, 1699341170479])
\t\t
env.execute();
}

}

The above shows the application of custom functions in Flink sql client and examples of using pojo in custom functions.