Flink Stream文件压缩译文
背景
用户经常抱怨写了许多小文件。 小文件将影响文件读取和DFS系统的性能,甚至会影响DFS系统的稳定性。
目标:
- 压缩这个job 单次checkpoint产生的所有文件
- 通过压缩,用户可以设置更小的检查点间隔,甚至可以减少到秒级。
场景:
在checkpoint 间隔内,task不能写足够大的文件
- 第一种情况:分区策略不仅涉及时间,而且还可能包括业务关键字段,例如applicationId。对于具有大量数据的分区,它可以写出所需的文件大小;但是,某些分区的数据量很小,并且无法生成足够大的文件,因此需要进行压缩。
- 第二种情况:流作业中存在数据高峰和低谷,但是接收器的并行性是相同的。 在低谷状态时,将出现较少的数据,并且无法在检查点间隔内写入所需的文件大小。
未满足的场景:
- 每个task持续从上游接收数据。 每个task都可以填满CPU /资源。 然后将检查点间隔设置得太小,从而导致大量文件产生。 这时,压缩器无法在这么小的检查点延迟中完成压缩:
- 一般来说,除非检查点间隔太小,否则不会发生这种情况(测试后,parquet格式文件可以在1秒内写入10MB,也就是说,它可以在12秒内写入足够大的文件)
分析
压缩的时机
单次checkpoint跨task压缩文件:
- 文件写完(可见)后压缩:首先使文件可见,然后转到分区以选择适当的文件进行压缩。这样,写过程和压缩过程解耦。但是,如果压缩没有原子性,会存在一致性问题。
- 文件可见之前进行比较:如何使文件不可见?让文件在隐藏路径下。请注意,根据
Flink
中的检查点周期,文件是可见的,因此这意味着仅单次checkpoint内的文件是压实的。而且,这些文件在压缩完成之前是不可见的。因此,这肯定会增加写文件的延迟。
我们在文件可见之前选择压缩。 我们在单次checkpoint内执行压缩以简化设计。
1 | compact cp-1 compact cp-2 |
我们只能在checkpoint完成后执行压缩(写程序只能生成临时文件,而不能生成最终文件),因为我们的目标是压缩由不同任务生成的文件。 如果有能力,单个任务可以产生足够大的文件。
压缩可视性
- 如果压缩发生在下一个检查点,这些文件什么时候可见? (这与延迟有关)例如,CP-1文件在CP-2中被压缩:
压缩文件在CP-2后可见(压缩仍在CP-2中进行,但最终文件最终承诺在CP-2后最终可见),这将导致更大的延迟, 甚至翻倍
- 在CP-2中,压缩完成后将立即显示结果文件。
确定性
- 输入临时文件是确定性的:一旦获得CP-2中的所有CP-1文件,我们就可以确认这些文件的列表是固定的。 无论在CP-2中是否有
fail over
,它们都是这些文件。 - 压缩文件是确定性的:只要binpack策略是确定性的,再加上文件列表和文件大小是确定性的,则输出是确定性的。
即使在CP-2中(即在压缩过程中)发生故障转移,由于我们的结果是确定的,也不会改变并且一致性也不会受到影响。 因此,我们可以选择使压缩结果立即可见。
注意:我们无法在压缩后立即删除临时文件,因为在此检查点周期内可能会发生FO。 安全的做法是在检查点结束后删除过期的临时文件。
如何压缩,有2种情况
压缩多个文件:
- Reader:在新的
FileSource
中使用BulkFormat
- Writer:使用
StreamingFileSink
的BucketWriter
(用于一致性/可见性保证)
压缩单个文件(实际上,对于满足大小要求的文件,我们只需要原子重命名):
- HDFS的优化:“重命名”可以直接使用,因为“重命名”具有一致性/可见性保证。
- 对象存储的优化:我们可以使用
RecoverableWriter
进行原子字节复制。
依赖
对于StreamingFileSink
,我们需要注册一个FileLifecycleListener
。 需要获取此检查点间隔的临时文件。
目前,我们只能监视文件的生成,不能监视文件的关闭,这意味着此功能只能用于CheckpointRollingPolicy
(批量格式)。 (实际上,对于RowWise writer
,压缩是没有用的)
配置
- auto-compaction
- compaction.file-size: compact target file size, default is rolling-file-size
参考
https://docs.google.com/document/d/1cdlyoqgBq9yJEiHFBziimIoKHapQiEY2-0Tn8IF6G-c/edit?usp=sharing
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Asura7969 Blog!