[Apache Flink] Basic use of Flink DataStream API

Basic use of Flink DataStream API

Article directory

  • Preface
  • 1. Basic usage
  • 2. Core sample code
  • 3. Complete the project code
    • pom.xml
    • WordCountExample
    • Test verification
  • 4. Stream execution environment
  • 5. Reference documentation

Foreword

The Flink DataStream API is mainly used to handle unbounded and bounded data streams.
Unbounded data flow is a data source that continuously generates data without a clear end point, such as real-time transaction data or sensor data. This type of data flow requires continuous processing and analysis using Apache Flink’s real-time processing capabilities.

A bounded data stream is a data set with a definite start and end point, such as a file or database table. This type of data flow is typically used in batch processing scenarios where all the data is already available and can be processed in one go.

Flink’s DataStream API provides a rich set of operators, such as map, filter, reduce, aggregations, windowing, join, etc., to support various complex data processing and analysis requirements. In addition, the DataStream API also provides fault-tolerance guarantees, ensuring that in the event of a failure, the application can recover from the most recent checkpoint, thereby achieving exactly-once processing semantics.

1. Basic usage

  1. Create execution environment:

    Every Flink program needs to create a StreamExecutionEnvironment (execution environment), which can be used to set parameters and create streams for reading data from external systems.

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
  2. Create data flow:

    You can create data streams from various data sources, such as local collections, files, sockets, etc. The following code is an example of creating a data stream from a local collection:

    DataStream<String> dataStream = env.fromElements("hello", "flink");
    
  3. Conversion operation:

    Flink provides a wealth of conversion operations, such as map, filter, reduce, etc. The following code first maps each string to its length, then filters out elements with length greater than 5:

    DataStream<Integer> transformedStream = dataStream
        .map(s -> s.length())
        .filter(l -> l > 5);
    
  4. Data output:

    Flink supports outputting data streams to various storage systems, such as files, sockets, databases, etc. The following code streams data to standard output:

    transformedStream.print();
    
  5. execute program:

    Put all the above steps in the main function, and call the env.execute() method at the end to start the program. The Flink program is lazy loaded and will only start execution when the execute method is called.

    env.execute("Flink Basic API Usage");
    

2. Core sample code

Use the Flink DataStream API to build a real-time Word Count program that reads text data from a socket port, counts the occurrences of each word, and outputs the results to standard output.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountExample {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        // 1. Create execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. To create a data stream and receive data from the socket, you need to start a socket server with port 9000 locally.
        DataStream<String> textStream = env.socketTextStream("localhost", 9000);

        // 3. Conversion operation
        DataStream<Tuple2<String, Integer>> wordCountStream = textStream
                .flatMap(new LineSplitter()) // Split text lines into words
                .keyBy(0) //Group by word
                .sum(1); // Sum the count of each word

        // 4. Data output
        wordCountStream.print();

        // 5. Execute program
        env.execute("Socket Word Count Example");
    }

    // Customize a FlatMapFunction to divide each line of input text into words and output it as Tuple2. The first element is the word and the second element is the count (initial value is 1)
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {<!-- -->
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {<!-- -->
            for (String word : line.split(" ")) {<!-- -->
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

3. Complete the project code

Below is the complete engineering code of a real-time word counting application based on Apache Flink, including the Pom.xml file and all Java classes.

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>flink-wordcount-example</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <flink.version>1.13.2</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

WordCountExample

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountExample {<!-- -->
    public static void main(String[] args) throws Exception {<!-- -->
        // 1. Create execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. To create a data stream and receive data from the socket, you need to start a socket server with port 9000 locally.
        DataStream<String> textStream = env.socketTextStream("localhost", 9000);

        // 3. Conversion operation
        DataStream<Tuple2<String, Integer>> wordCountStream = textStream
                .flatMap(new LineSplitter()) // Split text lines into words
                .keyBy(0) //Group by word
                .sum(1); // Sum the count of each word

        // 4. Data output
        wordCountStream.print();

        // 5. Execute program
        env.execute("Socket Word Count Example");
    }

    // Customize a FlatMapFunction to divide each line of input text into words and output it as Tuple2. The first element is the word and the second element is the count (initial value is 1)
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {<!-- -->
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {<!-- -->
            for (String word : line.split(" ")) {<!-- -->
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

Now, you can use Maven to compile and run this program. Before starting the program, you need to start a Socket server with port 9000 locally. This can be done using the Netcat tool (nc -lk 9000) or any other tool that can open a port. Then, you can enter lines of text, and the Flink program will count the number of occurrences of each word and print the results in real time.

Test verification

Use py to start a socket server locally and listen to port 9000.

Python is relatively simple to implement a socket communication. Write a Python to verify the example written above.

import socket

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 9000))
server_socket.listen(1)

print("Waiting for connection...")
client_socket, client_address = server_socket.accept()
print("Connected to:", client_address)

while True:
    data = input("Enter text: ")
    client_socket.sendall(data.encode())

Run the Flink program and Python socket server, and then enter text in the Python program. You will see that the Flink program counts the number of occurrences of each word in real time and outputs it to the console.

4. Stream execution environment

There is no need to pay attention during the development and learning process. Every Flink application requires an execution environment, which in this example is env. Streaming applications require StreamExecutionEnvironment.

The DataStream API builds your application as a job graph and attaches it to a StreamExecutionEnvironment. When env.execute() is called, this graph is packaged and sent to the JobManager, which processes the job in parallel and distributes its subtasks to the Task Manager for execution. Each job’s parallel subtasks will be executed in task slots.

Note that if execute() is not called, the application will not run.

Flink runtime: client, job manager, task managers
This distributed runtime depends on whether your application is serializable. It also requires that all dependencies are available to every node in the cluster.

5. Reference documents

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/datastream_api/