Build the Transformation tree

Transformation refers to the conversion operation of generating another DataStream from one DataStream, just like a piece of paper becomes a paper airplane after some folding operations, and this “folding operation” is just Transformation. DataStream provides a series of transformation methods, which naturally correspond to their own xxxTransformation. Along with calling a series of transformation methods of DataStream, a “Transformation tree” (including: physical Transformation and virtual Transformation) is gradually formed. It is based on this “Transformation tree” that the StreamGraph structure can be generated.

For physical Transformation: Along with the physical transformation operation, the corresponding StreamOperator (operator) and physical Transformation will be generated, and the physical Transformation will be added to the List of StreamExecutionEnvironment, so that it can be used to build StreamGraph later.

/**
 * Call the map conversion method provided by DataStream
 */
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {<!-- -->

    // Get the output type of this map conversion. The getType() method will get the output type of the previous Transformation, which is the input type of this Transformation
    TypeInformation<R> outType = TypeExtractor. getMapReturnTypes(clean(mapper), getType(),
                                                                 Utils. getCallLocationName(), true);

    // According to the xxxFunction function and output type of this transformation, construct the xxxTransformation corresponding to this transformation
    return map(mapper, outType);
}

/**
 * map() conversion method: the created StreamOperator is a StreamMap, and the conversion logic (that is, xxxFunction) is handed over to the StreamOperator to hold
 */
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {<!-- -->
    // Parameters: operator name, output type, factory of StreamOperator
    // The custom MapFunction will be used as a parameter to build StreamMap, which is the implementation subclass of StreamOperator
    return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}

public <R> SingleOutputStreamOperator<R> transform(
    String operatorName, // operator name
    TypeInformation<R> outTypeInfo, // The output type of this conversion
    // StreamMap operator inherits AbstractUdfStreamOperator and implements OneInputStreamOperator interface
    OneInputStreamOperator<T, R> operator) {<!-- -->

    // SimpleOperatorFactory.of() method: Create SimpleOperatorFactory according to the type of StreamOperator, and hand over the newly created StreamOperator to OperatorFactory
    return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

/**
 * Calling the physical transformation method will generate the corresponding StreamOperator and physical xxxTransformation,
 * And xxxTransformation will be added to the List of StreamExecutionEnvironment to form a Transformation tree. It will be used later to build StreamGraph
 */
protected <R> SingleOutputStreamOperator<R> doTransform(
    String operatorName,
    TypeInformation<R> outTypeInfo,
    StreamOperatorFactory<R> operatorFactory) {<!-- -->

    // read the output type of the input Transform to coax out errors about MissingTypeInfo
    // To be on the safe side, ensure that InvalidTypesException will not occur, and ensure that there will be no problems with this conversion operation
    transformation. getOutputType();

    // Create the xxxTransformation instance corresponding to this physical transformation
    // The physical xxxTransformation will hold the StreamOperator (indirectly through the StreamOperatorFactory) through the "member variable holding" method
    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
        this.transformation, // last Transformation conversion operation
        operatorName, // the name of the current operator
        operatorFactory, // MapFunction-->StreamMap (StreamOperator) -->StreamOperatorFactory
        outTypeInfo, // the output type corresponding to the current operator (for this conversion)
        // The parallelism of the current operator, the default is the overall parallelism of env
        environment. getParallelism());

    // SingleOutputStreamOperator: After each conversion operation is completed, the data structure returned to the developer to continue the operation
    @SuppressWarnings({<!-- -->"unchecked", "rawtypes"})
    SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

    // Add the xxxTransformation generated by this transformation to the List<Transformation> list, which will be used to generate StreamGraph
    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}

When calling the map conversion method provided by DataStream, the corresponding StreamMap operator and corresponding Transformation will be created. Moreover, through member variables, Transformation holds StreamOperator, and StreamOperator holds custom Function.

In the end, all physical Transformations are saved in the List of StreamExecutionEnvironment, gradually forming a Transformation tree:

// Transformation operations between DataStreams will generate Transformation objects. Transformation is the root of DataStream.
// For ds.shuffle() operation, ExecutionEnvironment will create PartitionTransformation for it and add it to the transformation collection,
// Construct the StreamGraph object through the transformation collection
protected final List<Transformation<?>> transformations = new ArrayList<>();


@Internal
public void addOperator(Transformation<?> transformation) {<!-- -->
    Preconditions. checkNotNull(transformation, "transformation must not be null.");
    // Save the generated Transformation to List<Transformation>
    this. transformations. add(transformation);
}

For “Virtual Transformation”: Along with the virtual transformation operation, only “Virtual Transformation” will be generated. When parsing the “virtual Transformation” later, a virtual node will be added, and its essential operation is to save it in the Map structure according to the mapping relationship: “virtual node id: Tuple3<>(Transformation ID, Partitioner, ShuffleMode)”.

/**
 * Shuffle operation
 */
public DataStream<T> shuffle() {<!-- -->
    return setConnectionType(new ShufflePartitioner<T>());
}

/**
 * Generate "Virtual Transformation"
 */
protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {<!-- -->
    // Generate "virtual Transformation", this "virtual Transformation" will not be added to the List of StreamExecutionEnvironment
    return new DataStream<>(this. getExecutionEnvironment(), new PartitionTransformation<>(this. getTransformation(), partitioner));
}

Note that when a Transformation is created, the Transformation corresponding to the last transformation of this transformation will be recorded through the input pointer in the form of member variable “holding” to form an association relationship. That is: post-Transformation -> pre-Transformation

OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
    this.transformation, // last Transformation conversion operation
    operatorName, // the name of the current operator
    operatorFactory, // MapFunction-->StreamMap (StreamOperator) -->StreamOperatorFactory
    outTypeInfo, // the output type corresponding to the current operator (for this conversion)
    // The parallelism of the current operator, the default is the overall parallelism of env
    environment. getParallelism());

The pre-Transformation and the post-Transformation have a relationship, and it will appear reasonable, reasonable, and legal when constructing the StreamGraph structure.