Flink Sql 侧流输出(一)
Flink Sql 侧流输出(一)
本章并没有实现完成, 到最后发现 sink 端不好搞,不感兴趣的可以直接看第二篇内容(本章内容中涉及到的代码与第二章大部分一样,只是处理的环节不同)
实现原理
预想的使用示例如下:
1、定义处理逻辑
12345678910public class MyProcessFunction extends ScalarFunction { public Atest eval(Integer id1, String id2) { Atest a = new Atest(); a.setId1(id1); a.setId2(id2); return a; }}
2、注册UDF函数
1tEnv.createTemporarySystemFunction("SumFunction", SumFunction.class);
3、创建sink表
123456789## sideOutput 输出端CREATE TABLE sideOutpu ...
Mini-Batch 维表 Join
Mini-Batch 维表 Join背景主要针对一些 I/O 请求比较高,系统又支持 batch 请求的能力,比如说 RPC、HBase、Redis 等。以往的方式都是逐条的请求,且 Async I/O 只能解决 I/O 延迟的问题,并不能解决访问量的问题。通过实现 Mini-Batch 版本的维表算子,大量降低维表关联访问外部存储次数
DAG在 Lookup join 的 operator 前加一个攒批的operator
加规则前
加规则后
实现Rule实现12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061package org.apache.flink.table.planner.plan.rules.physical.streamimport java.util.Collectionsimport org.apache.calcite.plan.RelOptRule.{any, operand ...
Flink Sql解析(二)
Flink Sql解析
Flink sql执行过程
SQL -> SqlNode1234567891011121314@Overridepublic List<Operation> parse(String statement) { CalciteParser parser = calciteParserSupplier.get(); FlinkPlannerImpl planner = validatorSupplier.get(); // parse the sql query // 最终由 FlinkSqlParserImpl 执行解析 SqlNode parsed = parser.parse(statement); // SqlNode 会封装成 Operation Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed) . ...
Flink SQL LookupJoin With KeyBy
背景维表 Join 的场景下因为维表经常发生变化尤其是新增维度,而 Join 操作发生在维度新增之前,经常导致关联不上。如果 Join 不到,则暂时将数据缓存起来之后再进行尝试,并且可以控制尝试次数,能够自定义延迟 Join 的规则。
Flink SQL 在字节跳动的优化与实践
先看下实现后的效果图:
实现思路Flink sql 整个的执行流程梳理
在哪实现(where)?实现的地方很多,图1-1 中最引人注意的是 TEMPORAL_JOIN_REWRITE(至少我是这么想的…), 但后来实现过程中由于对 calcite API 不熟悉,实在无奈,只能另辟蹊径了。
最后看到 PHYSICAL_RWRITE, 先大致看了下这个规则下Rule的实现过程, 主要就是对已生成的 physical node 进行重写。
怎么实现(how)?接下来就是依葫芦画瓢了
KeyByLookupRule 实现
由于实现过程需要 temporalTable 和 calcOnTemporalTable, 而 CommonLookupJoin 中并没有获取这两个对象的方法,因此只能自己手动添加了
如何校验 ...
Flink DataStream与Transformation
DataStream 如何转换为 Transformation ?简介本文就以中WordCount为例
123456789final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.fromElements(WordCountData.WORDS);text.flatMap(new Tokenizer()) .keyBy(value -> value.f0).sum(1) .addSink(new CustomSink());env.execute("Streaming WordCount");
DataStreamSource12345678910111213141516public DataStreamSource( StreamExecutionEnvironment environment, TypeInformation<T> ...
SparkStreaming
SparkStreaming
参考github地址
Netty在Flink中的应用
使用Netty定义Client与Server端的协议client 端协议
server 端协议
org.apache.flink.runtime.io.network.netty.NettyProtocol
CreditBasedPartitionRequestClientHandler
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { decodeMsg(msg); } catch (Throwable t) { notifyAllChannelsOfErrorAndClose(t); }}private void decodeMsg(Object ...
Flink1.12 Checkpoint源码解析
CheckpointCheckpointCoordinatorstartTriggeringCheckpoint123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899private void startTriggeringCheckpoint(CheckpointTriggerRequest request) { ... // 1、初始化checkpointId和checkpoint存储state位置 final CompletableFuture<PendingCheckpoint> pendingCheckpointCompletableFuture = initializeCheckpoint(req ...
Unaligned Checkpointing
Unaligned Checkpointing背景Flink Checkpoint 基于 Chandy-Lamport 算法实现的。
目前的 Checkpoint 算法在大多数情况下运行良好,然而当作业出现反压时,阻塞式的 Barrier 对齐反而会加剧作业的反压,甚至导致作业的不稳定。
当前checkpoint机制
当operator 接收到 第一个 barrier (b1) 时,会把其所在的后续数据写入 buffer ,直到 buffer 写满阻塞 channel
同时处理其它未接受到 barrier (b1) 的 channel,这些 channel 的数据会输出到该 operator 的 outputChannel中,往下游节点发送
当所有channel接受到 barrier (b1) 后,该 operator 会先往 outputChannel 发送 b1,再把 buffer 中的数据与 channel 中的后续数据输出到下游
Unaligned Checkpoint
当第一个barrier (b1) 快到达 operator 时,会优先处理 barrier(b1), ...
FLP-30 Unified Catalog APIs译文
背景随着其在流处理中的广泛采用,Flink也显示了其在批处理中的潜力。 改进Flink的批处理,尤其是在SQL方面,将使Flink在流处理之外得到更大的采用,并为用户提供一套完整的解决方案,以满足他们的流和批处理需求。另一方面,Hive已将重点放在大数据技术及其完整的生态系统上。 对于大多数大数据用户而言,Hive不仅是用于大数据分析和ETL的SQL引擎,还是一个数据管理平台,可以在该平台上查询,定义和演化数据。 换句话说,Hive是Hadoop上大数据的事实上的标准。因此,Flink必须与Hive生态系统集成,以进一步将其扩展到批处理和SQL用户。 为此,必须与Hive元数据和数据集成。
Hive Metastore集成有两个方面:1.使Hive的元对象(如表和视图)可供Flink使用,并且Flink也能够为Hive和在Hive中创建此类元对象; 2.使用Hive Metastore作为持久性存储,使Flink的元对象(表,视图和UDF)持久化。本文档涵盖Flink和Hive生态系统集成的三个部分之一。 它不仅涉及Hive集成,还涉及重新构建目录界面以及TableEnvironmen ...