使用Netty定义Client与Server端的协议

client 端协议

flink-client-netty.png

server 端协议

flink-server-netty.png

org.apache.flink.runtime.io.network.netty.NettyProtocol

CreditBasedPartitionRequestClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
decodeMsg(msg);
} catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}

private void decodeMsg(Object msg) throws Throwable {
final Class<?> msgClazz = msg.getClass();

// ---- Buffer --------------------------------------------------------
if (msgClazz == NettyMessage.BufferResponse.class) {
NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
if (inputChannel == null || inputChannel.isReleased()) {
bufferOrEvent.releaseBuffer();

cancelRequestFor(bufferOrEvent.receiverId);

return;
}

try {
// 解码buffer
decodeBufferOrEvent(inputChannel, bufferOrEvent);
} catch (Throwable t) {
inputChannel.onError(t);
}

} else if (msgClazz == NettyMessage.ErrorResponse.class) {
.....
}
}

private void decodeBufferOrEvent(
RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent)
throws Throwable {
if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
// TODO:触发反压?
inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
} else if (bufferOrEvent.getBuffer() != null) {
// 承接下面的源码
inputChannel.onBuffer(
bufferOrEvent.getBuffer(), bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
} else {
throw new IllegalStateException(
"The read buffer is null in credit-based input channel.");
}
}

RemoteInputChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// 承接上面的源码(bufferOrEvent 方法)
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
boolean recycleBuffer = true;

try {
if (expectedSequenceNumber != sequenceNumber) {
onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
return;
}

final boolean wasEmpty;
boolean firstPriorityEvent = false;
synchronized (receivedBuffers) {
NetworkActionsLogger.traceInput(
"RemoteInputChannel#onBuffer",
buffer,
inputGate.getOwningTaskName(),
channelInfo,
channelStatePersister,
sequenceNumber);
// Similar to notifyBufferAvailable(), make sure that we never add a buffer
// after releaseAllResources() released all buffers from receivedBuffers
// (see above for details).
if (isReleased.get()) {
return;
}

wasEmpty = receivedBuffers.isEmpty();

SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, sequenceNumber);
DataType dataType = buffer.getDataType();
if (dataType.hasPriority()) {
// 如果有优先级, 添加到队列头部
firstPriorityEvent = addPriorityBuffer(sequenceBuffer);
} else {
receivedBuffers.add(sequenceBuffer);
if (dataType.requiresAnnouncement()) {
// 针对 CheckpointBarrier 元素
firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer));
}
}
channelStatePersister
.checkForBarrier(sequenceBuffer.buffer)
.filter(id -> id > lastBarrierId)
.ifPresent(
id -> {
// checkpoint was not yet started by task thread,
// so remember the numbers of buffers to spill for the time when
// it will be started
lastBarrierId = id;
lastBarrierSequenceNumber = sequenceBuffer.sequenceNumber;
});
channelStatePersister.maybePersist(buffer);
++expectedSequenceNumber;
}
recycleBuffer = false;

if (firstPriorityEvent) {
// 通知 有优先级事件(此方法调用InputGate方法)
notifyPriorityEvent(sequenceNumber);
}
if (wasEmpty) {
// 通知下游消费(此方法调用InputGate方法)
notifyChannelNonEmpty();
}

if (backlog >= 0) {
// 如果上游 ResultSubpartition 有囤积的backlog
onSenderBacklog(backlog);
}
} finally {
if (recycleBuffer) {
// 释放buffer资源
buffer.recycleBuffer();
}
}
}

flink-通信栈.png