[BigData] Flink (framework principle)

Article directory

    • Flink overview
    • Flink vs SparkStreaming
    • Deployment mode
      • Session Mode
      • Per-Job Mode
      • application mode
    • YARN operating mode (key points)
    • Flink runtime architecture
      • Parallelism
      • Operator Chain
      • Task Slots
      • The relationship between task slots and parallelism
    • Assignment submission process

Flink Overview

What is Flink

Stateful stream processing
Stateful Stream Processing refers to a streaming computing model in which computing operations can remember and access previously processed data or state information. This is different from stateless stream processing, where each event is independent and has no memory or contextual information.

Flink features

Flink vs SparkStreaming

Deployment mode

Session Mode

Session Mode is actually the most consistent with conventional thinking. We need to start a cluster and maintain a session first. Submit jobs through the client in this session. All resources have been determined when the cluster is started, so all submitted jobs will compete for resources in the cluster.
Session mode is suitable for a large number of jobs that are small in size and short in execution time.

Per-Job Mode

To better isolate resources, we can consider launching a cluster for each submitted job.
Once the job completes, the cluster is shut down and all resources are released. These features make the single-job mode more stable in the production environment, so it is the preferred mode for practical applications.
It should be noted that Flink itself cannot run directly like this, so the single-job mode generally requires the help of some resource management framework to start the cluster, such as YARN, Kubemetes/

Application mode

The application code in the first two modes is executed on the client Client and then submitted to the JobManager by the client. The disadvantage is that the client needs to occupy a lot of network bandwidth to download dependencies and send binary data to the JobManager, which will increase the resource consumption of the node where the client is located.
Therefore, you can abandon the client and directly submit the application to the JobManager for running. This means that we need to start a separate JobManager for each submitted application, that is, create a cluster. This JobManager only exists to execute this application. After the execution is completed, JM will be closed. This is the application mode. ·

YARN operating mode (key points)

The process of deployment on YARN is: the client submits the Flink application to Yarn's ResourceManager, and Yarn's ResourceManager will apply for a container from Yarn's NodeManager. On these containers, Flink will deploy instances of JobManager and TaskManager to start the cluster. Flink will dynamically allocate TaskManager resources according to the number of Slots required by the jobs running on JobManger.

Flink runtime architecture

Tip: You can add technical details here

Parallelism

1) Parallel subtasks and parallelism
During Flink execution, each operator can contain one or more subtasks (operator subtasks), which are executed completely independently in different threads, different physical machines, or different containers.
The number of subtasks for a specific operator is called its parallelism. In this way, a data flow containing parallel subtasks is a parallel data flow, which requires multiple partitions (stream partitions) to distribute parallel tasks. Generally speaking, the degree of parallelism of a stream program can be considered to be the maximum degree of parallelism among all its operators. In a program, different operators may have different degrees of parallelism.

2) Parallelism setting
In code, parallelism can be set in different ways, with different valid ranges and priorities.
(1) Settings in the code (environment and operators)
In our code, we can simply call the setParallelism() method after the operator to set the parallelism of the current operator:
stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
The degree of parallelism set in this way is only valid for the current operator.
In addition, we can also directly call the setParallelism() method of the execution environment to set the degree of parallelism globally:
env.setParallelism(2);
In this way, the default parallelism of all operators in the code is 2. We generally do not set the global parallelism in the program, because if the global parallelism is hard-coded in the program, dynamic expansion will not be possible.
What should be noted here is that since keyBy is not an operator, the degree of parallelism cannot be set for keyBy.
(2) Set when submitting the application
When using the flink run command to submit an application, you can add the -p parameter to specify the parallelism of the current application execution. Its function is similar to the global settings of the execution environment:

bin/flink run –p 2 –c com.atguigu.wc.SocketStreamWordCount
./FlinkTutorial-1.0-SNAPSHOT.jar

If we submit the job directly on the Web UI, we can also directly add the degree of parallelism in the corresponding input box.

(3) Settings in configuration file
We can also directly change the default parallelism in the cluster’s configuration file flink-conf.yaml:
parallelism.default: 2
This setting is effective for all jobs submitted on the entire cluster. The initial value is 1. Whether it is set in the code or the -p parameter when submitting, it is not necessary; therefore, when the parallelism is not specified, the default parallelism of the cluster in the configuration file will be used. In the development environment, there is no configuration file, and the default parallelism is the number of CPU cores of the current machine.

Their priorities are
Operator>Code Environment>Submission>File Configuration

Operator Chain

1) Data transmission between operators
The form of a data stream transmitting data between operators can be one-to-one forwarding mode or shuffled redistributing mode. Which form is specific? , depends on the type of operator.
(1) One-to-one, forwarding
In this mode, the data flow maintains partitioning and ordering of elements. For example, between the source and map operators in the figure, after the source operator reads the data, it can be sent directly to the map operator for processing. There is no need to repartition between them or adjust the order of the data . This means that the number and order of elements seen by the map operator’s subtask are exactly the same as those produced by the source operator’s subtask, ensuring a “one-to-one” relationship. Operators such as map, filter, and flatMap all have this one-to-one correspondence. This relationship is similar to narrow dependencies in Spark.

Function: Combine different operators into one Task
Prerequisite: One-to-one operation, no task chain disabled, same degree of parallelism
(2) Redistributing
In this mode, the partitioning of the data stream changes. For example, there is such a relationship between the map in the figure and the subsequent keyBy/window operator, as well as between the keyBy/window operator and the Sink operator.
Each operator’s subtask will send data to different downstream target tasks according to the data transmission strategy. These transmission methods will cause the process of repartitioning, which is similar to the shuffle in Spark.

2) Merge operator chain
In Flink, one-to-one operator operations with the same degree of parallelism can be directly linked together to form a “large” task, so that the original operator becomes part of the real task. As shown below. Each task will be executed by a thread. Such technology is called “Operator Chain”.

By default, Flink will perform link merging according to the principle of operator chain. If we want to prohibit merging or define it ourselves, we can also make some specific settings for the operators in the code:

//Disable operator chain
.map(word -> Tuple2.of(word, 1L)).disableChaining();

// Start a new chain from the current operator
.map(word -> Tuple2.of(word, 1L)).startNewChain()

Task Slots

1) Task Slots
Each TaskManager in Flink is a JVM process, which can start multiple independent threads to execute multiple subtasks in parallel.

Obviously, the computing resources of TaskManager are limited. The more parallel tasks, the fewer resources each thread will have. How many tasks can a TaskManager handle in parallel? In order to control the amount of concurrency, we need to clearly divide the resources occupied by each task on the TaskManager. This is the so-called task slots.

Each task slot actually represents a fixed-size subset of the computing resources owned by the TaskManager. These resources are used to execute a subtask independently.

The relationship between task slots and parallelism

The number of slots required by a task is the sum of the maximum parallelism degrees of each sharing group.

Assignment submission process