Flink Sql解析(一)
Flink Sql1. Parse:语法树解析,把sql语句转换成为一个抽象语法树(AST),在Calcite中用SqlNode来表示
2. Validate:语法校验,根据元数据信息进行校验,例如查询的表、使用的函数是否存在等,校验完成后仍然是SqlNode构成的语法树
3. Optimize:查询计划优化
首先将SqlNode语法树转换成关系表达式,RelNode构成的逻辑树
然后使用优化器基于规则进行等价交换,例如我们熟悉的谓词下推、列裁剪等,经过优化后得到最有查询计划
4. Execute:将逻辑查询计划翻译成物理执行计划,生成对应的可执行代码,提交运行
Flink parserorg.apache.flink.table.delegation.Planner
getParser(): 将SQL字符串转换为Table API特定的对象,例如Operation tree
提供plan,优化并且转换 Operation( ModifyOperation ) 成可运行的Transformation123456@Internalpublic interface Planner & ...
Spark Rpc
Dispatcher
Rpc客户端发送请求
MapOutputTracker用于跟踪map任务的输出状态
Spark Sql
Spark SQL
Analysis
从SQL或者DataFrame API中解析得到抽象语法树,依据catalog元数据校验语法树(表名、列名,或列类型),将Unresolved Logical Plan解析成Resolved Logical Plan
多个性质类似的Rule组成一个Batch,多个Batch构成一个Batchs,这些batches会由RuleExecutor执行,先按一个一个Batch顺序执行,然后对Batch里面的每个Rule顺序执行。每个Batch会执行一次会多次。
Logical Optimizations
基于规则优化,其中包含谓词下推、列裁剪、常亮折叠等。利用Rule(规则)将Resolved Logical Plan解析成Optimized Logical Plan,同样是由RuleExecutor执行
Physical Planning
前面的Logical Plan不能被Spark执行,这个过程是把Logical Plan转换成多个Physical Plan(物理计划),然后利用Cost Mode(代价模型)选择最佳的执行计划;
和前面的逻辑计划 ...
Spark 任务调度
概述主要分为以下四个部分:
1、构建DAG提交的 job 将首先被转化为RDD并通过RDD之间的依赖关系构建DAG,提交到调度系统
2、切分stageDAGScheduler负责接受由RDD构成的DAG,将一系列RDD划分到不同的Stage(ResultStage 和 ShuffleMapStage 两种),给Stage中未完成的Partition创建不同类型的task(ResultTask 和 ShuffleMapTask),DAGScheduler最后将每个Stage中的task以TaskSet的形式提交给TaskScheduler继续处理。
3、调度taskTaskScheduler负责从DAGScheduler接受TaskSet,创建TaskSetManager对TaskSet进行管理,并将TaskSetManager添加到调度池,最后将对Task调度提交给后端接口(SchedulerBackend)处理。
4、执行task执行任务,并将任务中间结果和最终结果存入存储体系。
Task 本地行级别获取task的本地性级别时,都会等待一段时间,超过时间会退而求其次。
...
Spark Shuffle
Spark Shuffle简介初始化SparkContext时候会createSparkEnv,创建ShuffleManager
12345678910...// Let the user specify short names for shuffle managersval shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffle ...
Spark 扩展功能
SMJ 扩展打印信息执行SortMergeJoinExec(SparkPlan)时打印左右表信息1234// 执行SortMergeJoinExec类中任意位置val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)val queryExecution = SQLExecution.getQueryExecution(executionId.toLong)// 打印 queryExecution.analyzed
执行SortMergeJoinExec(SparkPlan)时输出operator分区数,左右表的输入行数SortMergeJoinExec
1234567891011121314151617181920override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows") ...
Spark
Spark
https://databricks.com/blog
基本概念RDD特点RDD具有容错机制,并且只读不能修改,RDD具有以下几个属性:
只读:不能修改,只能通过转换操作生成新的RDD
分布式:可以分布在多台机器上进行并行处理
弹性:计算过程中内存不够时会和磁盘进行数据交换
基于内存:可以全部或部分缓存在内存中,在多次计算间重用
RDD血缘关系RDD 的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得来的。如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来。
RDD依赖类型根据不同的转换操作,RDD血缘关系的依赖分为宽依赖和窄依赖。窄依赖是指父RDD的每个分区都只被子RDD的一个分区使用。宽依赖是指父RDD的每个分区都被子RDD的分区所依赖。map、filter、union 等操作是窄依赖,而 groupByKey、reduceByKey 等操作是宽依赖。join 操作有两种情况,如果 join 操作中使用的每个 Partition 仅仅和固定个 Partition 进行 join,则该 join 操 ...
Hello World
Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.
Quick StartCreate a new post1$ hexo new "My New Post"
More info: Writing
Run server1$ hexo server
More info: Server
Generate static files1$ hexo generate
More info: Generating
Deploy to remote sites1$ hexo deploy
More info: Deployment