Flink Sql
1. Parse:语法树解析,把sql语句转换成为一个抽象语法树(AST),在Calcite中用SqlNode来表示
2. Validate:语法校验,根据元数据信息进行校验,例如查询的表、使用的函数是否存在等,校验完成后仍然是SqlNode构成的语法树
3. Optimize:查询计划优化
- 首先将SqlNode语法树转换成关系表达式,RelNode构成的逻辑树
- 然后使用优化器基于规则进行等价交换,例如我们熟悉的谓词下推、列裁剪等,经过优化后得到最有查询计划
4. Execute:将逻辑查询计划翻译成物理执行计划,生成对应的可执行代码,提交运行
Flink parser
org.apache.flink.table.delegation.Planner
- getParser(): 将SQL字符串转换为Table API特定的对象,例如Operation tree
- 提供plan,优化并且转换 Operation( ModifyOperation ) 成可运行的Transformation
1 2 3 4 5 6
| @Internal public interface Planner { Parser getParser(); List<Transformation<?>> translate(List<ModifyOperation> modifyOperations); ... }
|
- org.apache.flink.table.planner.delegation.PlannerBase*
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private val planningConfigurationBuilder: PlanningConfigurationBuilder = new PlanningConfigurationBuilder( config, functionCatalog, internalSchema, expressionBridge)
private val parser: Parser = new ParserImpl( catalogManager, new JSupplier[FlinkPlannerImpl] { override def get(): FlinkPlannerImpl = getFlinkPlanner }, new JSupplier[CalciteParser] { override def get(): CalciteParser = planningConfigurationBuilder.createCalciteParser() } )
override def getParser: Parser = parser
|
org.apache.flink.table.planner.ParserImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Override public List<Operation> parse(String statement) { CalciteParser parser = calciteParserSupplier.get(); FlinkPlannerImpl planner = validatorSupplier.get(); SqlNode parsed = parser.parse(statement); Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed) .orElseThrow(() -> new TableException( "Unsupported SQL query! parse() only accepts SQL queries of type " + "SELECT, UNION, INTERSECT, EXCEPT, VALUES, ORDER_BY or INSERT;" + "and SQL DDLs of type " + "CREATE TABLE")); return Collections.singletonList(operation); }
|
org.apache.flink.table.sqlexec.SqlToOperationConverter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public static Optional<Operation> convert( FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) { final SqlNode validated = flinkPlanner.validate(sqlNode); SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner, catalogManager); if (validated instanceof SqlCreateTable) { return Optional.of(converter.convertCreateTable((SqlCreateTable) validated)); } else if ... else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { return Optional.empty(); } } private Operation convertSqlQuery(SqlNode node) { return toQueryOperation(flinkPlanner, node); } private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) { RelRoot relational = planner.rel(validated); return new PlannerQueryOperation(relational.rel); }
|
SQL 转换及优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| abstract class PlannerBase( executor: Executor, config: TableConfig, val functionCatalog: FunctionCatalog, val catalogManager: CatalogManager, isStreamingMode: Boolean) extends Planner {
override def translate( modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = { if (modifyOperations.isEmpty) { return List.empty[Transformation[_]] } getExecEnv.configure( getTableConfig.getConfiguration, Thread.currentThread().getContextClassLoader) overrideEnvParallelism() val relNodes = modifyOperations.map(translateToRel) val optimizedRelNodes = optimize(relNodes) val execNodes = translateToExecNodePlan(optimizedRelNodes) translateToPlan(execNodes) } }
|
Operation(ModifyOperation)转化为RelNode
,是通过QueryOperationConverter
(访问者)和QueryOperation
组成的访问者模式转化为RelNode。
在得到RelNode
后,就进入Calcite
对RelNode
的优化流程
CommonSubGraphBasedOptimizer 基于公共子图的优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| abstract class CommonSubGraphBasedOptimizer extends Optimizer { override def optimize(roots: Seq[RelNode]): Seq[RelNode] = { val sinkBlocks = doOptimize(roots) val optimizedPlan = sinkBlocks.map { block => val plan = block.getOptimizedPlan require(plan != null) plan } expandIntermediateTableScan(optimizedPlan) } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) extends CommonSubGraphBasedOptimizer {
override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = { val config = planner.getTableConfig val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, config) ... sinkBlocks.foreach(b => optimizeBlock(b, isSinkBlock = true)) sinkBlocks } }
|
参考:
https://blog.jrwang.me/2019/flink-source-code-sql-overview/