1. Syntax analysis in Flink extension calcite
1) Define the required SqlNode node class – take SqlShowCatalogs as an example
a) Class location
flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java
Core methods:
@Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("SHOW CATALOGS"); }
b) Kind of blood relationship
2) Modify the .ftl file in the includes directory and add syntax logic in the parserImpls.ftl file
a) File location
b) Grammar examples
/** * Parse a "Show Catalogs" metadata query command. */ SqlShowCatalogs SqlShowCatalogs() : { } { <SHOW> <CATALOGS> { return new SqlShowCatalogs(getPos()); } }
3) Copy the config.fmpp file in the Calcite source code to the src/main/codegen directory of the project, modify the content to declare the extended part
a) File location
b) config.fmpp content
data: { #Parser file path parser: tdd(../data/Parser.tdd) } # Directory of expansion files freemarkerLinks: { includes: includes/ }
c) Part of Parser.tdd
# Generated parser package path package: "org.apache.flink.sql.parser.impl", #Parser name class: "FlinkSqlParserImpl", #Introduced dependency classes "org.apache.flink.sql.parser.dql.SqlShowCatalogs" # New keyword keywords: [ "CATALOGS" ] #New grammar parsing method statementParserMethods: [ "SqlShowCatalogs()" ] #Extended syntax files included implementationFiles: [ "parserImpls.ftl" ]
4) Compile template files and grammar files
5) Configure extended parser class
withParserFactory(FlinkSqlParserImpl.FACTORY)
2. Customized extension of Flink’s Parser syntax
1) Define the SqlNode class
package org.apache.flink.sql.parser.dql; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlSpecialOperator; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; import java.util.Collections; import java.util.List; /** XSHOW CATALOGS sql call. */ public class SqlXShowCatalogs extends SqlCall { public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("XSHOW CATALOGS", SqlKind.OTHER); public SqlXShowCatalogs(SqlParserPos pos) { super(pos); } @Override public SqlOperator getOperator() { return OPERATOR; } @Override public List<SqlNode> getOperandList() { return Collections.emptyList(); } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("XSHOW CATALOGS"); } }
2) Modify the parserImpls.ftl file in the includes directory
/** * Parse a "XShow Catalogs" metadata query command. */ SqlXShowCatalogs SqlXShowCatalogs() : { } { <XSHOW> <CATALOGS> { return new SqlXShowCatalogs(getPos()); } }
3) Modify the Parser.tdd file and add a new part to declare the extension
imports: "org.apache.flink.sql.parser.dql.SqlXShowCatalogs" keywords: "XSHOW" statementParserMethods: "SqlXShowCatalogs()"
4) Recompile
mvn generate-resources
5) Execute test cases
It can be seen that the error reported by the custom SQL changed from a parsing failure to a verification failure.
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class CustomFlinkSql { public static void main(String[] args) throws Exception { TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance() .useBlinkPlanner() .build()); \t\t\t\t //Expand custom syntax xshow catalogs before // SQL parse failed. Non-query expression encountered in illegal context tEnv.executeSql("xshow catalogs").print(); //After extending the custom syntax xshow catalogs // SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall } }
6) View the generated extended parser class
You can see that in FlinkSqlParserImpl, the custom parsing syntax has been generated.
3. Overview of validate
After adding custom parsing rules to Flink, the error message is as follows:
SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
Modify the code in the validate part
1) FlinkPlannerImpl#validate
Function: Verify SqlNode, and return directly if it is show catalogs syntax.
sqlNode.isInstanceOf[SqlXShowCatalogs]
2) SqlToOperationConverter#convert
Function: Convert the verified SqlNode into Operator.
else if (validated instanceof SqlXShowCatalogs) { return Optional.of(converter.convertXShowCatalogs((SqlXShowCatalogs) validated)); }
3) SqlToOperationConverter#convertXShowCatalogs
/** Convert SHOW CATALOGS statement. */ private Operation convertXShowCatalogs(SqlXShowCatalogs sqlXShowCatalogs) { return new XShowCatalogsOperation(); }
4) XShowCatalogsOperation
package org.apache.flink.table.operations; public class XShowCatalogsOperation implements ShowOperation { @Override public String asSummaryString() { return "SHOW CATALOGS"; } }
4. Execute test cases
package org.apache.flink.table.examples.java.custom; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class CustomFlinkSql { public static void main(String[] args) throws Exception { TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance() .useBlinkPlanner() .build()); //The syntax originally supported by FlinkSQL tEnv.executeSql("show catalogs").print(); // Custom syntax tEnv.executeSql("xshow catalogs").print(); } }
5. Summary-FlinkSQL execution process
1. Verify SQL final SqlNode validated = flinkPlanner.validate(sqlNode); 2. Pre-verification rewrite Insert statement 3. Call SqlNode.validate() for verification 1) If it is: ExtendedSqlNode [SqlCreateHiveTable, SqlCreateTable, SqlTableLike] 2) If it is: SqlKind.DDL, SqlKind.INSERT, etc., no verification is required and SqlNode is returned directly 3) If yes: SqlRichExplain 4) Others: validator.validate(sqlNode) \t\t 1. Verify scope and expression: validateScopedExpression(topNode, scope) a) Standardize and rewrite SqlNode b) If the SQL is [TOP_LEVEL = concat(QUERY, DML, DDL)], register the query in the parent scope c) Verification validateQuery i)validateFeature ii)validateNamespace iii)validateModality iv)validateAccess v)validateSnapshot d) If SQL is not [TOP_LEVEL = concat(QUERY, DML, DDL)] perform type deduction 2. Get the node type after verification 2. Convert SQLNode to Operation converter.convertSqlQuery(validated) 1) Generate logical execution plan RelNode RelRoot relational = planner.rel(validated); \t\t 1. Transform the query sqlToRelConverter.convertQuery(validatedSqlNode) \t\t 2) Create PlannerQueryOperation new PlannerQueryOperation(relational.project()); \t 3. Convert Operation to List<Transformation<?>> List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation)); 1) Optimize the RelNode logical execution plan and obtain optimizedRelNodes val optimizedRelNodes = optimize(relNodes) \t 2) Convert optimizedRelNodes to execGraph val execGraph = translateToExecNodeGraph(optimizedRelNodes) \t 3) Convert execGraph to transformations \t 1. Use code generation technology to generate Function, which can be called reflectively later. val convertFunc = CodeGenUtils.genToInternalConverter(ctx, inputType)