1. Environment description
The following tests are based on centos7.8 + flink1.14 + jdk1.8 + python3.8 + pyflink1.14. This article only introduces the installation of python3.8 and pyflink1.14. The installation of other environments is done by Baidu.
2. Environment installation of Python 3.8
- Python3.8 installation package download link: Baidu network disk download extraction code: f25h
Upload and decompress the installation package
tar -zxvf Python-3.8.0
Environment preparation
The environment GCC required to install python3.8
yum -y install gcc
Install the components required for python3 (all the way to y)
yum install openssl-devel bzip2-devel expat-develgdbm-devel yum install readline-develsqlite*-develmysql-devellibffi-devel
Execute the compilation and installation scripts in the directory of the Python decompression package, and execute the following scripts in sequence:
sudo ./configure sudo make sudo make install
View python installation results
If there is version information, the installation is successful.
python3 --version
Check the default version of the system. The default version of python for centos7 is generally 2.7.
python --version
It does not match the version information required by pyflink. You need to create a soft connection so that python uses version 3.8 by default.
Create a soft link
Back up python under /usr/bin, otherwise it will report that python already exists.
mv python python_bak
ln -s /usr/local/bin/python3 /usr/bin/python # python points to python3, bin depends on the location of the environment, not necessarily under your installation package
The following picture appears, indicating that the python installation has been completed:
3. Install pyflink1.14
When using the pip command to install some python libraries, the download takes a long time because the default server is abroad. This article recommends using the Tsinghua mirror source.
Execute the following commands respectively to permanently replace the server with the Tsinghua mirror source
pip3 install pip -U pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple # "pip install pip -U" is the command used to execute the upgrade of pip;
Install pyflink1.14
python -m pip install apache-flink==1.14.4
If no error is reported, the installation is successful.
4. Test cases
Write java flink task as follows
package com.xw.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.python.PythonOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class testPyhton { public static void main(String[] args) throws Exception { // StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("172.16.100.9",8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // EnvironmentSettings build = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Configuration configuration = tableEnv.getConfig().getConfiguration(); //Task name setting configuration.setString("pipeline.name", "dddddd"); //The location of the python-udf file on the server configuration.setString(PythonOptions.PYTHON_FILES, "/usr/local/phfile/test1.py"); // python execution environment configuration.setString(PythonOptions.PYTHON_CLIENT_EXECUTABLE, "/usr/local/bin/python3"); configuration.setString(PythonOptions.PYTHON_EXECUTABLE,"/usr/local/bin/python3"); tableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION FunctionName AS 'test1.FunctionName' LANGUAGE PYTHON"); String sourceDDL = "CREATE TABLE table_source(" + "column_name INT, " + "vonvon STRING, " + "bbm DOUBLE )" + "WITH(" + " 'connector' = 'jdbc', " + " 'driver'='com.mysql.cj.jdbc.Driver', " + " 'url'='jdbc:mysql://172.16.0.68:3306/titandb?rewriteBatchedStatements=true & amp;useUnicode=true & amp;characterEncoding=utf8 & amp;autoReconnect=true & amp;useSSL=false & amp; zeroDateTimeBehavior=convertToNull & amp;allowMultiQueries=true & amp;serverTimezone=GMT+8', " + " 'table-name'='abcd', " + " 'username'='root', " + " 'password'='root'," + " 'sink.buffer-flush.max-rows' = '20000'," + " 'sink.buffer-flush.interval' = '3000'" + ")"; //Create table tableEnv.executeSql(sourceDDL); String tableSink = "CREATE TABLE table_sink (" + "a INT," + "b STRING," + "c DOUBLE" + ") WITH (" + "'connector'='print')"; System.out.println(tableSink); //Create table tableEnv.executeSql(tableSink); // Python's custom function is used here String inserttable = "insert into table_sink(a,b,c) select column_name,FunctionName(vonvon),bbm from table_source"; tableEnv.executeSql(inserttable); // env.execute(); } }
python custom function
#!/usr/bin/env python3 from pyflink.table import DataTypes from pyflink.table.udf import udf, ScalarFunction @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING()) def FunctionName(id): # output temperature: pandas.Series return f"{id}|123"
After packaging, submit it to flink-web for running, and you can see the output in the taskmanager console. The above code example is adjusted according to your actual situation. Of course, there needs to be a corresponding jar package under flink’s lib. Of course, the table API can also call the python-udf function normally, which has been verified.