背景

用户经常抱怨写了许多小文件。 小文件将影响文件读取和DFS系统的性能,甚至会影响DFS系统的稳定性。

目标:

  • 压缩这个job 单次checkpoint产生的所有文件
  • 通过压缩,用户可以设置更小的检查点间隔,甚至可以减少到秒级。

场景:

checkpoint 间隔内,task不能写足够大的文件

  • 第一种情况:分区策略不仅涉及时间,而且还可能包括业务关键字段,例如applicationId。对于具有大量数据的分区,它可以写出所需的文件大小;但是,某些分区的数据量很小,并且无法生成足够大的文件,因此需要进行压缩。
  • 第二种情况:流作业中存在数据高峰和低谷,但是接收器的并行性是相同的。 在低谷状态时,将出现较少的数据,并且无法在检查点间隔内写入所需的文件大小。

未满足的场景:

  • 每个task持续从上游接收数据。 每个task都可以填满CPU /资源。 然后将检查点间隔设置得太小,从而导致大量文件产生。 这时,压缩器无法在这么小的检查点延迟中完成压缩:
  • 一般来说,除非检查点间隔太小,否则不会发生这种情况(测试后,parquet格式文件可以在1秒内写入10MB,也就是说,它可以在12秒内写入足够大的文件)

分析

压缩的时机

单次checkpoint跨task压缩文件:

  • 文件写完(可见)后压缩:首先使文件可见,然后转到分区以选择适当的文件进行压缩。这样,写过程和压缩过程解耦。但是,如果压缩没有原子性,会存在一致性问题。
  • 文件可见之前进行比较:如何使文件不可见?让文件在隐藏路径下。请注意,根据Flink中的检查点周期,文件是可见的,因此这意味着仅单次checkpoint内的文件是压实的。而且,这些文件在压缩完成之前是不可见的。因此,这肯定会增加写文件的延迟。

我们在文件可见之前选择压缩。 我们在单次checkpoint内执行压缩以简化设计。

1
2
3
                        compact cp-1         compact cp-2
------------------|---------------------|---------------------|
cp-1 cp-2 cp-3

我们只能在checkpoint完成后执行压缩(写程序只能生成临时文件,而不能生成最终文件),因为我们的目标是压缩由不同任务生成的文件。 如果有能力,单个任务可以产生足够大的文件。

压缩可视性

  • 如果压缩发生在下一个检查点,这些文件什么时候可见? (这与延迟有关)例如,CP-1文件在CP-2中被压缩:

压缩文件在CP-2后可见(压缩仍在CP-2中进行,但最终文件最终承诺在CP-2后最终可见),这将导致更大的延迟, 甚至翻倍

  • 在CP-2中,压缩完成后将立即显示结果文件。

确定性

  • 输入临时文件是确定性的:一旦获得CP-2中的所有CP-1文件,我们就可以确认这些文件的列表是固定的。 无论在CP-2中是否有fail over,它们都是这些文件。
  • 压缩文件是确定性的:只要binpack策略是确定性的,再加上文件列表和文件大小是确定性的,则输出是确定性的。

即使在CP-2中(即在压缩过程中)发生故障转移,由于我们的结果是确定的,也不会改变并且一致性也不会受到影响。 因此,我们可以选择使压缩结果立即可见。

注意:我们无法在压缩后立即删除临时文件,因为在此检查点周期内可能会发生FO。 安全的做法是在检查点结束后删除过期的临时文件。

如何压缩,有2种情况

压缩多个文件:

  • Reader:在新的FileSource中使用BulkFormat
  • Writer:使用StreamingFileSinkBucketWriter(用于一致性/可见性保证)

压缩单个文件(实际上,对于满足大小要求的文件,我们只需要原子重命名):

  • HDFS的优化:“重命名”可以直接使用,因为“重命名”具有一致性/可见性保证。
  • 对象存储的优化:我们可以使用RecoverableWriter进行原子字节复制。

依赖

对于StreamingFileSink,我们需要注册一个FileLifecycleListener。 需要获取此检查点间隔的临时文件。

目前,我们只能监视文件的生成,不能监视文件的关闭,这意味着此功能只能用于CheckpointRollingPolicy(批量格式)。 (实际上,对于RowWise writer,压缩是没有用的)

flinkstreamsink.png

配置

  • auto-compaction
  • compaction.file-size: compact target file size, default is rolling-file-size

参考

https://docs.google.com/document/d/1cdlyoqgBq9yJEiHFBziimIoKHapQiEY2-0Tn8IF6G-c/edit?usp=sharing