1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
| package org.apache.flink.table.planner.plan.nodes.physical.stream
import java.util import java.util.{Objects, Optional}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Calc import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} import org.apache.flink.api.dag.Transformation import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.table.api.ValidationException import org.apache.flink.table.catalog.{Catalog, CatalogBaseTable, FunctionLookup, ObjectPath, UnresolvedIdentifier} import org.apache.flink.table.catalog.exceptions.CatalogException import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.{JList, JMap} import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkTypeFactory} import org.apache.flink.table.planner.codegen.{CalcCodeGenerator, CodeGeneratorContext} import org.apache.flink.table.planner.delegation.StreamPlanner import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction import org.apache.flink.table.planner.hint.FlinkHints import org.apache.flink.table.planner.utils.ShortcutUtils import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.flink.table.types.logical.LogicalType
import scala.collection.JavaConverters._
class StreamExecSideOutputCalc( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, calcProgram: RexProgram, outputRowType: RelDataType, hints: JList[RelHint]) extends StreamExecCalcBase(cluster, traitSet, inputRel, calcProgram, outputRowType, hints) {
type ColumnInfo = JList[(Int, String, LogicalType)] type TableInfo = JMap[String, ColumnInfo] type BSF = BridgingSqlFunction
val sideOutputTableInfo: TableInfo = new util.HashMap[String, ColumnInfo]() val tableAndFunction: JMap[String, BSF] = new util.HashMap[String, BSF]() val tableAndRexCall: JMap[String, RexCall] = new util.HashMap[String, RexCall]() val tableAndTagName: JMap[String, String] = new util.HashMap[String, String]()
def toScalaOption[T](op: Optional[T]): Option[T] = { if (op.isPresent) { Some(op.get()) } else None }
def generatorTableToFieldInfo(): Unit = { val context = this.cluster.getPlanner.getContext.unwrap(classOf[FlinkContext]) val hintsOp = Option(getHints.asScala .filter(_.hintName.equals(FlinkHints.HINT_SIDE_OUT_PUT)) .flatMap(_.listOptions.asScala))
val flinkTypeFactory = ShortcutUtils.unwrapTypeFactory(cluster) val manager = context.getCatalogManager val functionCatalog = context.getFunctionCatalog
val rexBuilder = input.getCluster.getRexBuilder val inputRowType = input.getRowType
toScalaOption[Catalog](manager.getCatalog(manager.getCurrentCatalog)).foreach { case catalog: Catalog => hintsOp match { case Some(sideOutputTables) => sideOutputTables.foreach(tableName => { val tableFullName = s"${manager.getCurrentDatabase}.$tableName" val table: CatalogBaseTable = catalog.getTable(ObjectPath.fromString(tableFullName)) val types: ColumnInfo = new util.ArrayList[(Int, String, LogicalType)]() val functionName = validateExits(table.getOptions.get("functionName")) val tagName = validateExits(table.getOptions.get("tagName"))
tableAndTagName.put(tableName, tagName); val schema = table.getSchema
schema.getTableColumns.asScala.zipWithIndex.foreach(t => { types.add((t._2, t._1.getName, t._1.getType.getLogicalType)) })
sideOutputTableInfo.put(tableName, types)
val result = functionCatalog.lookupFunction(UnresolvedIdentifier.of(functionName)) toScalaOption[FunctionLookup.Result](result) match { case Some(functionLookup) => val definition = functionLookup.getFunctionDefinition val function = BridgingSqlFunction.of( context, flinkTypeFactory, functionLookup.getFunctionIdentifier, definition)
tableAndFunction.put(tableName, function)
val dataType = flinkTypeFactory.buildRelNodeRowType(schema) val operands = new util.ArrayList[RexNode](rexBuilder.identityProjects(dataType)) val rexCall = rexBuilder.makeCall(dataType, function, operands) tableAndRexCall.put(tableName, rexCall.asInstanceOf[RexCall]) case _ =>
} }) case _ => throw new ValidationException("Only support sideOutput hints ...") } case _ => throw new CatalogException(s"${manager.getCurrentCatalog} is null!") } }
override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { new StreamExecSideOutputCalc(cluster, traitSet, child, program, outputRowType, getHints) }
private def validateExits[T](v: T): T = { if (Objects.isNull(v)) throw new ValidationException(s"value is null: $v") else v }
private def validatePrepareInfo(): Unit = { if (sideOutputTableInfo.isEmpty || tableAndFunction.isEmpty || tableAndRexCall.isEmpty) { throw new ValidationException(s"Must call generatorTableToFieldInfo method!") } }
override protected def translateToPlanInternal( planner: StreamPlanner): Transformation[RowData] = { val config = planner.getTableConfig val inputTransform = getInputNodes.get(0).translateToPlan(planner) .asInstanceOf[Transformation[RowData]] val condition = if (calcProgram.getCondition != null) { Some(calcProgram.expandLocalRef(calcProgram.getCondition)) } else { None }
val ctx = CodeGeneratorContext(config).setOperatorBaseClass( classOf[AbstractProcessStreamOperator[RowData]]) val outputType = FlinkTypeFactory.toLogicalRowType(getRowType) val sideOutputCode = if (!getHints.isEmpty) { generatorTableToFieldInfo() validatePrepareInfo() CalcCodeGenerator.generateSideOutputCode( ctx, inputTransform, outputType, calcProgram, condition, retainHeader = true, sideOutputTableInfo, tableAndFunction, tableAndRexCall, tableAndTagName ) } else ""
val substituteStreamOperator = CalcCodeGenerator.generateCalcOperator( ctx, inputTransform, outputType, calcProgram, condition, retainHeader = true, sideOutputCode, "StreamExecCalc" ) val ret = new OneInputTransformation( inputTransform, getRelDetailedDescription, substituteStreamOperator, InternalTypeInfo.of(outputType), inputTransform.getParallelism)
if (inputsContainSingleton()) { ret.setParallelism(1) ret.setMaxParallelism(1) } ret } }
|