Flink Sql 侧流输出(二)

使用示例如下:

1、定义处理逻辑

1
2
3
4
5
6
7
8
public static class MyProcessFunction extends ScalarFunction {

@DataTypeHint("ROW<id1 STRING, id2 STRING> NOT NULL")
public Row eval(String id1, String id2) {
return Row.of(id1, id2);
}
}

2、注册UDF函数

1
tEnv.createTemporarySystemFunction("MyProcessFunction", MyProcessFunction.class);

3、创建sink表

1
2
3
4
5
6
7
## sideOutput 输出端
CREATE TABLE sideOutput_table(
`data` Row < id1 INT,
id2 VARCHAR >
) WITH (
...
)

4、查询

1
2
3
4
5
## sideOutput_table 为表名
## functionName 为注册的处理函数
## SIDE_OUT_PUT 为关键字

"SELECT /*+ SIDE_OUT_PUT('tableName'='sideOutput_table', 'functionName'='MyProcessFunction') */ T.id2 FROM T"

一、添加提示信息

1
2
3
4
5
6
7
8
9
10
11
public abstract class FlinkHints {
public static final String HINT_SIDE_OUT_PUT = "SIDE_OUT_PUT";

public static Map<String, String> getHintedSideOutput(List<RelHint> tableHints) {
return tableHints.stream()
.filter(hint -> hint.hintName.equalsIgnoreCase(HINT_SIDE_OUT_PUT))
.findFirst()
.map(hint -> hint.kvOptions)
.orElse(Collections.emptyMap());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public abstract class FlinkHintStrategies {

/**
* Customize the {@link HintStrategyTable} which contains hint strategies supported by Flink.
*/
public static HintStrategyTable createHintStrategyTable() {
return HintStrategyTable.builder()
// Configure to always throw when we encounter any hint errors
// (either the non-registered hint or the hint format).
.errorHandler(Litmus.THROW)
.hintStrategy(
FlinkHints.HINT_NAME_OPTIONS,
HintStrategy.builder(HintPredicates.TABLE_SCAN)
.optionChecker(
(hint, errorHandler) ->
errorHandler.check(
hint.kvOptions.size() > 0,
"Hint [{}] only support non empty key value options",
hint.hintName))
.build())
.hintStrategy(
FlinkHints.HINT_SIDE_OUT_PUT,
HintStrategy.builder(HintPredicates.PROJECT)
.optionChecker(
(hint, errorHandler) ->
errorHandler.check(
hint.kvOptions.size() > 0,
"Hint [{}] only support non empty key value options",
hint.hintName))
.build())
.build();
}
}

二、SqlToOperationConverter 添加校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
// transform to a relational tree
RelRoot relational = planner.rel(validated);

if (!relational.hints.isEmpty()) {
PlannerQueryOperation queryOperation = new PlannerQueryOperation(
relational.project(),
relational.hints);
Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
try {
List<String> allTables = catalog.listTables(catalogManager.getCurrentDatabase());
if (!allTables.contains(queryOperation.getSideOutputHints().get("tableName"))) {
throw new RuntimeException("must register sideOutput table:"
+ queryOperation.getSideOutputHints().get("tableName"));
}
return queryOperation;
} catch (DatabaseNotExistException e) {
e.printStackTrace();
}
}

return new PlannerQueryOperation(relational.project(), relational.hints);
}

三、RelNodeBlock中修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import java.util.{Collections, Optional}

import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.logical.LogicalProject
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.table.catalog.{CatalogTable, ConnectorCatalogTable, FunctionLookup, ObjectPath, UnresolvedIdentifier}
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkTypeFactory}
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
import org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.sinks.{DataStreamTableSink, SelectTableSinkSchemaConverter, StreamSelectTableSink}
import org.apache.flink.table.planner.utils.ShortcutUtils
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.inference.TypeInferenceUtil

import org.apache.calcite.rex.{RexBuilder, RexCall, RexCallBinding, RexNode}
import org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, LogicalLegacySink}
import org.apache.flink.table.api.{DataTypes, TableConfig, TableSchema}



def buildRelNodeBlockPlan(
sinkNodes: Seq[RelNode],
planner: PlannerBase,
config: TableConfig): Seq[RelNodeBlock] = {
require(sinkNodes.nonEmpty)

// TODO: 新添加方法, 解析 project中有关side_output 的提示信息, 最后会返回多个RelNode
val newSinkNodes = expandProject(sinkNodes, planner)

// expand QueryOperationCatalogViewTable in TableScan
val shuttle = new ExpandTableScanShuttle
val convertedRelNodes = newSinkNodes.map(_.accept(shuttle))

if (convertedRelNodes.size == 1) {
Seq(new RelNodeBlock(convertedRelNodes.head))
} else {
// merge multiple RelNode trees to RelNode dag
val relNodeDag = reuseRelNodes(convertedRelNodes, config)
val builder = new RelNodeBlockPlanBuilder(config)
builder.buildRelNodeBlockPlan(relNodeDag)
}
}


def expandProject(sinkNodes: Seq[RelNode],
planner: PlannerBase): Seq[RelNode] = {
// 提取 提示信息 以及 project
sinkNodes.map(node => expandHintsOfSideOutput(node, planner))
.map(t => Seq(t._1).++(t._2)).flatMap(_.toList)

// sinkNodes.map(expandHintsOfSideOutput)
// .map(t => Seq().++(t._2)).flatMap(_.toList)
}

def expandHintsOfSideOutput(relNode: RelNode,
planner: PlannerBase): (RelNode, Seq[RelNode]) = {
import scala.collection.JavaConverters._

relNode match {
case project: LogicalProject =>

val tableNames = project.getHints.asScala
.filter(_.hintName.equals(FlinkHints.HINT_SIDE_OUT_PUT))
.map(_.kvOptions.asScala)
if (tableNames.nonEmpty) {
(project.withHints(Collections.emptyList()), createSink(tableNames, project, planner))
} else (project, Seq.empty)

case _ =>
if(!relNode.getInputs.isEmpty) {
val tuples = relNode.getInputs.asScala.map(node => expandHintsOfSideOutput(node, planner))
val newRelNode = relNode.copy(relNode.getTraitSet, new util.ArrayList[RelNode](tuples.map(_._1).asJava))
val extractRelNodes = tuples.map(_._2).flatMap(_.toList)
(newRelNode, extractRelNodes)
} else (relNode, Seq.empty)

}
}

def createSink(tables: Seq[mutable.Map[String, String]],
project: LogicalProject,
planner: PlannerBase): Seq[LogicalLegacySink] = {
val rexBuilder = project.getCluster.getRexBuilder
val context = project.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext])
val manager = context.getCatalogManager
val catalog = manager.getCatalog(manager.getCurrentCatalog)
val functionCatalog = context.getFunctionCatalog
val flinkTypeFactory = ShortcutUtils.unwrapTypeFactory(project.getCluster)
val size = project.getProjects.size()

tables.map { tableMap =>
val tableName = tableMap("tableName")
val functionName = tableMap("functionName")
val tableFullName = s"${manager.getCurrentDatabase}.$tableName"
val table = catalog.get().getTable(ObjectPath.fromString(tableFullName))
val functionLookup = functionCatalog.lookupFunction(UnresolvedIdentifier.of(functionName))
table match {
case catalogTable: CatalogTable =>
val schema = table.getSchema
val sinkSchema = SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp(
SelectTableSinkSchemaConverter.changeDefaultConversionClass(schema))

val op = toOutputTypeInformation(
context, rexBuilder, schema, flinkTypeFactory, functionLookup, functionName, size)
op match {
case Some((typeInfo, rexNode, relDataType)) =>

val newProjects = new util.ArrayList[RexNode]()
newProjects.add(rexNode)

val newProject = project.copy(
project.getTraitSet,
project.getInput,
newProjects,
project.getInput.getRowType)

val sink = new StreamSelectTableSink(FlinkTypeFactory.toTableSchema(relDataType))
planner.getExecEnv.addProvider(sink.getSelectResultProvider)

/**
* TODO: 可根据 table信息实现多种 Sink
*
* 此处的实现会报错, 没有在 StreamSelectTableSink 中设置 jobClient
* 可在 StreamExecutionEnvironment 中添加 List,存储 sideOutput 的 sink provider,
* <code>
* public final List<Object> sinkProvider = new ArrayList<>();
*
* public void addProvider(Object provider) {
* sinkProvider.add(provider);
* }
* </code>
* 然后在 TableEnvironmentImpl 中获取 provider
* <code>
* execEnv.getSinkProvide().forEach(provide -> ((SelectResultProvider)provide).setJobClient(jobClient));
* </code>
*/
LogicalLegacySink.create(newProject, sink, "collect", ConnectorCatalogTable.sink(sink, false))
// LogicalLegacySink.create(newProject,
// // new StreamSelectTableSink(sinkSchema),
// new DataStreamTableSink(sinkSchema, typeInfo, false, false),
// "sideOutputSink", catalogTable)
}
}
}
}.asJava

def toOutputTypeInformation(
context: FlinkContext,
rexBuilder: RexBuilder,
schema: TableSchema,
flinkTypeFactory: FlinkTypeFactory,
result: Optional[FunctionLookup.Result],
functionName: String,
index: Int): Option[(TypeInformation[_], RexNode, RelDataType)] = {
if (result.isPresent) {
val functionLookup = result.get()
val definition = functionLookup.getFunctionDefinition
val function = BridgingSqlFunction.of(
context,
flinkTypeFactory,
functionLookup.getFunctionIdentifier,
definition)

val dataType = flinkTypeFactory.buildRelNodeRowType(schema)
val operands = new util.ArrayList[RexNode](rexBuilder.identityProjects(dataType))
val rexCall = rexBuilder.makeCall(dataType, function, operands)

val udf = function.getDefinition.asInstanceOf[UserDefinedFunction]

val inference = function.getTypeInference

val callContext = new OperatorBindingCallContext(
function.getDataTypeFactory,
udf,
RexCallBinding.create(
function.getTypeFactory,
rexCall.asInstanceOf[RexCall],
Collections.emptyList()))

val adaptedCallContext = TypeInferenceUtil.adaptArguments(
inference,
callContext,
null)

val functionReturnType: DataType = TypeInferenceUtil.inferOutputType(
adaptedCallContext,
inference.getOutputTypeStrategy)

val relDataType = flinkTypeFactory.createFieldTypeFromLogicalType(
DataTypes.ROW(DataTypes.FIELD(s"EXPR$index", functionReturnType)).getLogicalType)

val clazz = TypeInferenceUtil.inferOutputType(
adaptedCallContext,
inference.getOutputTypeStrategy).getConversionClass
Some((TypeExtractor.createTypeInfo(clazz), rexCall, relDataType))

} else {
println(s"Not found table: $functionName")
None
}
}

四、修改 PlannerBase

1
2
3
4
5
6
7
8
  @VisibleForTesting
private[flink] def optimize(relNodes: Seq[RelNode]): Seq[RelNode] = {
val optimizedRelNodes = getOptimizer.optimize(relNodes)
// 因为optimize会返回多个 relNode, 所以以下断言需注释掉
// require(optimizedRelNodes.size == relNodes.size)
optimizedRelNodes
}

sideoutput(二).png

注意事项

  • 本章内容提供实现思路
  • 本章内容没有应用生产实践
  • 本章实现属于半成品