Instructions for calling python-udf using Java flink (sql and table)

1. Environment description

The following tests are based on centos7.8 + flink1.14 + jdk1.8 + python3.8 + pyflink1.14. This article only introduces the installation of python3.8 and pyflink1.14. The installation of other environments is done by Baidu.

2. Environment installation of Python 3.8

  • Python3.8 installation package download link: Baidu network disk download extraction code: f25h

Upload and decompress the installation package

tar -zxvf Python-3.8.0

Environment preparation

The environment GCC required to install python3.8

yum -y install gcc

Install the components required for python3 (all the way to y)

yum install openssl-devel bzip2-devel expat-develgdbm-devel
yum install readline-develsqlite*-develmysql-devellibffi-devel

Execute the compilation and installation scripts in the directory of the Python decompression package, and execute the following scripts in sequence:

sudo ./configure
sudo make
sudo make install

View python installation results

If there is version information, the installation is successful.

python3 --version

Check the default version of the system. The default version of python for centos7 is generally 2.7.

python --version

It does not match the version information required by pyflink. You need to create a soft connection so that python uses version 3.8 by default.

Create a soft link

Back up python under /usr/bin, otherwise it will report that python already exists.

mv python python_bak
ln -s /usr/local/bin/python3 /usr/bin/python # python points to python3, bin depends on the location of the environment, not necessarily under your installation package

The following picture appears, indicating that the python installation has been completed:

3. Install pyflink1.14

When using the pip command to install some python libraries, the download takes a long time because the default server is abroad. This article recommends using the Tsinghua mirror source.

Execute the following commands respectively to permanently replace the server with the Tsinghua mirror source

pip3 install pip -U
pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
# "pip install pip -U" is the command used to execute the upgrade of pip;

Install pyflink1.14

python -m pip install apache-flink==1.14.4

If no error is reported, the installation is successful.

4. Test cases

Write java flink task as follows

package com.xw.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class testPyhton {
    public static void main(String[] args) throws Exception {

// StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("172.16.100.9",8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// EnvironmentSettings build = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        //Task name setting
        configuration.setString("pipeline.name", "dddddd");

//The location of the python-udf file on the server
        configuration.setString(PythonOptions.PYTHON_FILES, "/usr/local/phfile/test1.py");
// python execution environment
        configuration.setString(PythonOptions.PYTHON_CLIENT_EXECUTABLE, "/usr/local/bin/python3");
        configuration.setString(PythonOptions.PYTHON_EXECUTABLE,"/usr/local/bin/python3");
        tableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION FunctionName AS 'test1.FunctionName' LANGUAGE PYTHON");

        String sourceDDL = "CREATE TABLE table_source(" +
                    "column_name INT, " +
                    "vonvon STRING, " +
                    "bbm DOUBLE )" +
                "WITH(" +
                " 'connector' = 'jdbc', " +
                " 'driver'='com.mysql.cj.jdbc.Driver', " +
                " 'url'='jdbc:mysql://172.16.0.68:3306/titandb?rewriteBatchedStatements=true & amp;useUnicode=true & amp;characterEncoding=utf8 & amp;autoReconnect=true & amp;useSSL=false & amp; zeroDateTimeBehavior=convertToNull & amp;allowMultiQueries=true & amp;serverTimezone=GMT+8', " +
                " 'table-name'='abcd', " +
                " 'username'='root', " +
                " 'password'='root'," +
                " 'sink.buffer-flush.max-rows' = '20000'," +
                " 'sink.buffer-flush.interval' = '3000'" +
                ")";
        //Create table
        tableEnv.executeSql(sourceDDL);


        String tableSink = "CREATE TABLE table_sink (" +
                   "a INT," +
                    "b STRING," +
                    "c DOUBLE" +
                ") WITH (" +
                    "'connector'='print')";

        System.out.println(tableSink);
        //Create table
        tableEnv.executeSql(tableSink);
        // Python's custom function is used here
        String inserttable = "insert into table_sink(a,b,c) select column_name,FunctionName(vonvon),bbm from table_source";

        tableEnv.executeSql(inserttable);
// env.execute();
    }


}

python custom function

#!/usr/bin/env python3

from pyflink.table import DataTypes
from pyflink.table.udf import udf, ScalarFunction


@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())
def FunctionName(id):
    # output temperature: pandas.Series
    return f"{id}|123"

After packaging, submit it to flink-web for running, and you can see the output in the taskmanager console. The above code example is adjusted according to your actual situation. Of course, there needs to be a corresponding jar package under flink’s lib. Of course, the table API can also call the python-udf function normally, which has been verified.