Flink Stream文件压缩译文
背景用户经常抱怨写了许多小文件。 小文件将影响文件读取和DFS系统的性能,甚至会影响DFS系统的稳定性。
目标:
压缩这个job 单次checkpoint产生的所有文件
通过压缩,用户可以设置更小的检查点间隔,甚至可以减少到秒级。
场景:在checkpoint 间隔内,task不能写足够大的文件
第一种情况:分区策略不仅涉及时间,而且还可能包括业务关键字段,例如applicationId。对于具有大量数据的分区,它可以写出所需的文件大小;但是,某些分区的数据量很小,并且无法生成足够大的文件,因此需要进行压缩。
第二种情况:流作业中存在数据高峰和低谷,但是接收器的并行性是相同的。 在低谷状态时,将出现较少的数据,并且无法在检查点间隔内写入所需的文件大小。
未满足的场景:
每个task持续从上游接收数据。 每个task都可以填满CPU /资源。 然后将检查点间隔设置得太小,从而导致大量文件产生。 这时,压缩器无法在这么小的检查点延迟中完成压缩:
一般来说,除非检查点间隔太小,否则不会发生这种情况(测试后,parquet格式文件可以在1秒内写入10MB,也就是说,它可以在12秒内写入足 ...
Flink Checkpoint 恢复流程
Checkpoint简介Flink定期保存数据,failover后从上次成功的保存点处恢复,并提供Exactly-Once的投递保障机制
CheckpointCoordinator123456789101112131415161718192021222324252627282930313233// 恢复保存点public boolean restoreSavepoint( String savepointPointer, boolean allowNonRestored, Map<JobVertexID, ExecutionJobVertex> tasks, ClassLoader userClassLoader) throws Exception { Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null."); LOG.info("Starting job ...
Flink方案记录
flink 快慢流双流join
1、自定义UDF函数,join不到sleep一下
2、在join operator处数据等一会再查
3、如果没有join到,把数据重新回流,等再次消费后再次join
4、如果source端的MQ支持延迟消息,直接使用MQ的延迟功能(例如kafka的时间轮)
5、扩展Flink Source,例如在Kafka Connector中加入time.wait属性,当用户设置这个属性就让source的数据等一会
6、对于未匹配上的数据可以先导入外部存储,后续进行匹配
7、如果source端是kafka,数据在写入kafka时候设置key(后续的join key)的值,使得同key的数据延迟度降低
flink 作业迁移注意事项flink对接kafka, kafka offset如何保证?背景:kafka与yarn集群同时迁移到新集群,保证flink作业不丢数据。使用kafka mirror,从原有kafka集群copy数据到新集群,flink作业做savepoint,地址是新集群地址,在新的yarn集群上启动flink作业。注意:必要时可以使用state pr ...
Flink Graph
StreamGraph是通过用户的Stream API编写的代码生成最初的图,用来表示程序的拓扑结构
StreamNode:用来代表operator的类,并具有所有相关的属性,如并发度、入边和出边等。
StreamEdge:表示连接两个StreamNode的边。
JobGraphStreamGraph经过优化后生成JobGraph,提交给JobManager数据结构。主要是将多个符合条件的节点chain在一起作为一个节点,这样可以减少数据在节点之间流动所需的序列化/反序列化/传输消耗。
JobVertex:经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。
IntermediateDataSet:表示JobVertex的输出,即经过operator处理产生的数据集。producer是JobVertex,consumer是JobEdge。
JobEdge:代表了job graph中的一条数据传输通 ...
预测算法
环境安装conda1234567891011121314151617181920# 安装的模块conda list# 创建指不定版本的环境conda create -n pEvn3_7 python=3.7# 环境激活脚本source activate pEvn3_7# 安装模块conda install -n pEvn3_7 [package]# 退出环境source deactivate# 安装tensorflowpip install tensorflow==1.15# 卸载tensorflowpip uninstall tensorflow
prophet1conda install -c conda-forge fbprophet
算法库statsmodels
算法示例指数平滑算法
Expand Flink Sql
扩展format
输入数据格式
12ANALYSIS,2020-08-06 17:20:24.066,2,{"condition":{"mail":"zhangsan@qq.com","guid":"1111111","request_id":"b1ca18645f03e01abfd9bcfe5b2e0f3a","status":"200"},"entityMata":{"ipAddress":"0.0.0.0","logValue":2.0,"metric":"metric1","service":"service1"},"traceId":"385307967614d7 ...
Flink Retraction
Flink Retraction MechanismFlink Sql最终生成Physical Planning后会对每个RelNode打标,判断当前RelNode是哪种ChangelogMode
FlinkChangelogModeInferenceProgram
12345678910111213141516171819202122232425262728293031323334353637383940414243444546override def optimize( root: RelNode, context: StreamOptimizeContext): RelNode = { // step1: 确定每个节点的变更类型 // 先从source节点开始标记节点属于哪种 ModifyKindSetTrait(I,U,D,Empty) val physicalRoot = root.asInstanceOf[StreamPhysicalRel] val rootWithModifyKindSet = SATISFY_MODIFY_KIND_SET ...
Flink Sql SMJ
Sort Merge Join简介
1.In most cases, its performance is weaker than HashJoin.
2.It is more stable than HashJoin, and most of the data can be sorted stably.
3.SortMergeJoin should be the best choice if sort can be omitted in the case of multi-level join cascade with the same key.
Join流程除了sort、spill、merge,join流程大致与spark类似
源码BinaryExternalSorter分别包含3个异步线程(sort, spill, merger),三个线程通过circularQueues实现通信
SortingThread
123456789101112131415161718192021222324252627282930313233343536373839404142434445 ...
Flink window
Flink WindowKeyed Windows
123456789stream .key(...) .window(...) [.trigger(...)] [.evictor(...)] [.allowedLateness(...)] [.sideOutputLateData(...)] .reduce/aggregate/fold/apply() [.getSideOutput(...)]
No-Keyed Windows
12345678stream .windowAll(...) [.trigger(...)] [.evictor(...)] [.allowedLateness(...)] [.sideOutputLateData(...)] .reduce/aggregate/fold/apply() [.getSideOutput(...)]
Evictor可以在执行window trigger之前或之后对window中 ...
GC
GC 算法重点关注的GC case1、**System.gc()**: 手动触发GC操作。
2、CMS: CMS GC 在执行过程中的一些动作,重点关注 CMS Initial Mark 和 CMS Final Remark 两个 STW 阶段
3、Promotion Failure: Old 区没有足够的空间分配给 Young 区晋升的对象(即使总可用内存足够大)
4、Concurrent Mode Failure: CMS GC 运行期间,Old 区预留的空间不足以分配给新的对象,此时收集器会发生退化,严重影响 GC 性能,下面的一个案例即为这种场景
5、GCLocker Initiated GC: 如果线程执行在 JNI 临界区时,刚好需要进行 GC,此时 GC Locker 将会阻止 GC 的发生,同时阻止其他线程进入 JNI 临界区,直到最后一个线程退出临界区时触发一次 GC
GC 问题分类:Unexpected GC: 意外发生的 GC,实际上不需要发生,我们可以通过一些手段去避免
Space Shock: 空间震荡问题,参见“场景一:动态扩容引起的空间震荡” 现象:服 ...