Flink Hudi DataStream API code sample

A few days ago, I found a giant artificial intelligence learning website. It is easy to understand and humorous. I can’t help but share it with you. Click to jump to the website: https://www.captainai.net/dongkelun

Article directory

    • foreword
    • the code
      • pom
    • server running
    • Run and debug locally

Foreword

Summary Flink reads and writes the Hudi Demo example through the DataStream API, mainly for memo.

  • When I first learned Flink, I used Flink SQL, so I am familiar with reading and writing Hudi with Flink SQL. However, I am not familiar with writing code implementation, and some requirements are implemented based on Flink code, so I need to learn and summarize.
  • In order to meet the needs of reading and writing Hudi with code, there are actually two ways. One is to use Flink Table API in the code, that is, to execute Flink SQL in the code. This method is actually similar to the implementation through SQL. It is implemented through the DataStream API. (In reality, including online tutorials, the most used should be the Flink Table API)
  • This article mainly summarizes the DataStream API method
  • One advantage of the DataStream API method is that it is convenient for IDEA to debug the Hudi source code locally, which is easy to learn. Of course, the Table API can also be used to debug the source code locally, but because I am not familiar with the Flink SQL source code, it is more difficult to debug. The difficulty in debugging the source code of the Table API is that I don’t know where the entrance from the source code of Flink SQL to the source code of Hudi is, because it involves the source code of SQL parsing, which may be troublesome (I haven’t studied it). For example, the Hudi Spark SQL source code-related articles I summarized before: Hudi Spark SQL Source Code Learning Summary-Create Table

Code

GitHub address: https://github.com/dongkelun/hudi-demo/tree/master/hudi0.13_flink1.15

Official website address: https://hudi.apache.org/docs/flink-quick-start-guide/

package com.dkl.hudi.flink;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;

import java.util.HashMap;
import java.util.Map;

public class HudiDemo {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String targetTable = "t1";
        if (args. length > 0) {<!-- -->
            targetTable = args[0];
        }
        String basePath = "/tmp/flink/hudi/" + targetTable;

        Map<String, String> options = new HashMap<>();
        options.put(FlinkOptions.PATH.key(), basePath);
// options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
// options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
// options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
        options.put("hive_sync.mode", "hms");
        options.put("hive_sync.conf.dir", "/usr/hdp/3.1.0.0-78/hive/conf");
        options.put("hive_sync.db", "hudi");
        options.put("hive_sync.table", targetTable);
        options.put("hive_sync.partition_fields", "dt");
        options.put("hive_sync.partition_extractor_class", "org.apache.hudi.hive.HiveStylePartitionValueExtractor");
        options.put("hoodie.datasource.write.hive_style_partitioning", "true");
        options.put("hoodie.datasource.hive_sync.create_managed_table", "true");

// options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read
// options.put(FlinkOptions.READ_START_COMMIT.key(), "'20210316134557'"); // specifies the start commit instant time

        DataStream<RowData> dataStream = env. fromElements(
                GenericRowData.of(1, StringData.fromString("hudi1"), 1.1, 1000L, StringData.fromString("2023-04-07")),
                GenericRowData.of(2, StringData.fromString("hudi2"), 2.2, 2000L, StringData.fromString("2023-04-08"))
        );
// dataStream. print();
        HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
                .column("id int")
                .column("name string")
                .column("price double")
                .column("ts bigint")
                .column("dt string")
                .pk("id")
                .partition("dt")
                .options(options);

        builder. sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
        env.execute("Hudi_Api_Sink");
        DataStream<RowData> rowDataDataStream = builder.source(env);
        rowDataDataStream. print();
        env.execute("Hudi_Api_Source");
    }
}
  • Because it is troublesome to connect to hive on the server locally, if you run it locally, you need to turn off the synchronous hive. If you run it on the server, you can turn on the configuration item of the synchronous hive.
  • The code here is similar to the official document, mainly because the official document does not provide how to construct the data set DataStream for Hudi, here is a simple example

pom

I submit pom references on GitHub with many dependencies, because the dependencies required for local debugging of Idea are different from those required for running on the server. Local running requires more dependencies, and there are many dependency conflicts. If you only need to run on the server, you only need the following three dependencies:

 <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink1.15-bundle</artifactId>
            <version>${hudi.version}</version>
        </dependency>
    </dependencies>

The dependencies on github can be debugged locally, or packaged and run directly on the server. Because the dependencies are not packaged into the package, it is necessary to configure the corresponding jar package in advance under the flink lib on the server.

Server running

bin/flink run -c com.dkl.hudi.flink.HudiDemo /opt/dkl/hudi0.13_flink1.15-1.0.jar flink_hudi_dmeo

Local running and debugging