(Second release) Flink modifies source code to expand SQL syntax

1. Syntax analysis in Flink extension calcite
1) Define the required SqlNode node class – take SqlShowCatalogs as an example
a) Class location

flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java

Core methods:

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("SHOW CATALOGS");
    }
b) Kind of blood relationship

2) Modify the .ftl file in the includes directory and add syntax logic in the parserImpls.ftl file
a) File location

b) Grammar examples
/**
* Parse a "Show Catalogs" metadata query command.
*/
SqlShowCatalogs SqlShowCatalogs() :
{
}
{
    <SHOW> <CATALOGS>
    {
        return new SqlShowCatalogs(getPos());
    }
}
3) Copy the config.fmpp file in the Calcite source code to the src/main/codegen directory of the project, modify the content to declare the extended part
a) File location

b) config.fmpp content
data: {
#Parser file path
  parser: tdd(../data/Parser.tdd)
}

# Directory of expansion files
freemarkerLinks: {
  includes: includes/
}
c) Part of Parser.tdd
# Generated parser package path
package: "org.apache.flink.sql.parser.impl",
#Parser name
class: "FlinkSqlParserImpl",
#Introduced dependency classes
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
# New keyword
keywords: [
    "CATALOGS"
  ]
#New grammar parsing method
statementParserMethods: [
    "SqlShowCatalogs()"
  ]
#Extended syntax files included
implementationFiles: [
    "parserImpls.ftl"
  ]
4) Compile template files and grammar files

5) Configure extended parser class
withParserFactory(FlinkSqlParserImpl.FACTORY)
2. Customized extension of Flink’s Parser syntax
1) Define the SqlNode class
package org.apache.flink.sql.parser.dql;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/** XSHOW CATALOGS sql call. */
public class SqlXShowCatalogs extends SqlCall {
    public static final SqlSpecialOperator OPERATOR =
            new SqlSpecialOperator("XSHOW CATALOGS", SqlKind.OTHER);

    public SqlXShowCatalogs(SqlParserPos pos) {
        super(pos);
    }

    @Override
    public SqlOperator getOperator() {
        return OPERATOR;
    }

    @Override
    public List<SqlNode> getOperandList() {
        return Collections.emptyList();
    }

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("XSHOW CATALOGS");
    }
}

2) Modify the parserImpls.ftl file in the includes directory
/**
* Parse a "XShow Catalogs" metadata query command.
*/
SqlXShowCatalogs SqlXShowCatalogs() :
{
}
{
    <XSHOW> <CATALOGS>
    {
       return new SqlXShowCatalogs(getPos());
    }
}
3) Modify the Parser.tdd file and add a new part to declare the extension
imports:

"org.apache.flink.sql.parser.dql.SqlXShowCatalogs"

keywords:

"XSHOW"

statementParserMethods:

"SqlXShowCatalogs()"
4) Recompile
 mvn generate-resources
5) Execute test cases

It can be seen that the error reported by the custom SQL changed from a parsing failure to a verification failure.

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class CustomFlinkSql {
    public static void main(String[] args) throws Exception {

        TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .build());
\t\t\t\t
//Expand custom syntax xshow catalogs before
        // SQL parse failed. Non-query expression encountered in illegal context
        tEnv.executeSql("xshow catalogs").print();

        //After extending the custom syntax xshow catalogs
        // SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
    }
}

6) View the generated extended parser class

You can see that in FlinkSqlParserImpl, the custom parsing syntax has been generated.

3. Overview of validate

After adding custom parsing rules to Flink, the error message is as follows:

SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
Modify the code in the validate part
1) FlinkPlannerImpl#validate

Function: Verify SqlNode, and return directly if it is show catalogs syntax.

sqlNode.isInstanceOf[SqlXShowCatalogs]
2) SqlToOperationConverter#convert

Function: Convert the verified SqlNode into Operator.

else if (validated instanceof SqlXShowCatalogs) {
            return Optional.of(converter.convertXShowCatalogs((SqlXShowCatalogs) validated));
}
3) SqlToOperationConverter#convertXShowCatalogs
/** Convert SHOW CATALOGS statement. */
private Operation convertXShowCatalogs(SqlXShowCatalogs sqlXShowCatalogs) {
     return new XShowCatalogsOperation();
}
4) XShowCatalogsOperation
package org.apache.flink.table.operations;

public class XShowCatalogsOperation implements ShowOperation {
    @Override
    public String asSummaryString() {
        return "SHOW CATALOGS";
    }
}
4. Execute test cases
package org.apache.flink.table.examples.java.custom;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class CustomFlinkSql {
    public static void main(String[] args) throws Exception {

        TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .build());

//The syntax originally supported by FlinkSQL
        tEnv.executeSql("show catalogs").print();
        
        // Custom syntax
        tEnv.executeSql("xshow catalogs").print();
    }
}

5. Summary-FlinkSQL execution process
1. Verify SQL

final SqlNode validated = flinkPlanner.validate(sqlNode);

2. Pre-verification rewrite Insert statement

3. Call SqlNode.validate() for verification

1) If it is: ExtendedSqlNode [SqlCreateHiveTable, SqlCreateTable, SqlTableLike]
2) If it is: SqlKind.DDL, SqlKind.INSERT, etc., no verification is required and SqlNode is returned directly
3) If yes: SqlRichExplain
4) Others: validator.validate(sqlNode)
\t\t
1. Verify scope and expression: validateScopedExpression(topNode, scope)
a) Standardize and rewrite SqlNode
          b) If the SQL is [TOP_LEVEL = concat(QUERY, DML, DDL)], register the query in the parent scope
          c) Verification validateQuery
          i)validateFeature
          ii)validateNamespace
          iii)validateModality
          iv)validateAccess
          v)validateSnapshot
          d) If SQL is not [TOP_LEVEL = concat(QUERY, DML, DDL)] perform type deduction
       
       2. Get the node type after verification

2. Convert SQLNode to Operation

converter.convertSqlQuery(validated)

1) Generate logical execution plan RelNode
RelRoot relational = planner.rel(validated);
\t\t
1. Transform the query
sqlToRelConverter.convertQuery(validatedSqlNode)
\t\t
2) Create PlannerQueryOperation
new PlannerQueryOperation(relational.project());
\t
3. Convert Operation to List<Transformation<?>>
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));

1) Optimize the RelNode logical execution plan and obtain optimizedRelNodes
val optimizedRelNodes = optimize(relNodes)
\t
2) Convert optimizedRelNodes to execGraph
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
\t
3) Convert execGraph to transformations
\t
1. Use code generation technology to generate Function, which can be called reflectively later.
val convertFunc = CodeGenUtils.genToInternalConverter(ctx, inputType)