DataStream 如何转换为 Transformation ?

简介

本文就以中WordCount为例

1
2
3
4
5
6
7
8
9
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.fromElements(WordCountData.WORDS);

text.flatMap(new Tokenizer())
.keyBy(value -> value.f0).sum(1)
.addSink(new CustomSink());

env.execute("Streaming WordCount");

flink-transformation.png

DataStreamSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public DataStreamSource(
StreamExecutionEnvironment environment,
TypeInformation<T> outTypeInfo,
StreamSource<T, ?> operator,
boolean isParallel,
String sourceName,
Boundedness boundedness) {
// 传递给父类 DataStream, 并赋值, 后续调用的 flatMap, keyBy, aadSink 等算子都是调用 DataStream 的API
super(environment, new LegacySourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism(), boundedness));

this.isParallel = isParallel;
if (!isParallel) {
setParallelism(1);
}
}

  • 调用 fromElements 算子,会构造出 DataStreamSource, LegacySourceTransformation(SourceTransformation)
  • 最后传递给父类 DataStream, 并赋值, 后续调用的 flatMap, keyBy, addSink 等算子都是调用 DataStream 的API

FlatMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {

// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();

OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());

@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
// 把这个 transform 添加到 env的 list中
getExecutionEnvironment().addOperator(resultTransform);

return returnStream;
}
  • getExecutionEnvironment() 方法就是上节传递给 DataStreamenv
  • addOperator 就是添加list元素

keyBy

1
2
3
4
5
6
7
8
9
10
KeyedStream(
DataStream<T> stream,
PartitionTransformation<T> partitionTransformation,
KeySelector<T, KEY> keySelector,
TypeInformation<KEY> keyType) {

super(stream.getExecutionEnvironment(), partitionTransformation);
this.keySelector = clean(keySelector);
this.keyType = validateKeyType(keyType);
}
  • keyBy 操作会重新构造构造 DataStream, 并把上游的 env 传递给新的 DataStream

addSink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();

// configure the type if needed
if (sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}

StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);

getExecutionEnvironment().addOperator(sink.getTransformation());
return sink;
}
  • flatMap 类似

execute

  • 调用 execute 方法后会构造StreamGraph