finalboolean wasEmpty; boolean firstPriorityEvent = false; synchronized (receivedBuffers) { NetworkActionsLogger.traceInput( "RemoteInputChannel#onBuffer", buffer, inputGate.getOwningTaskName(), channelInfo, channelStatePersister, sequenceNumber); // Similar to notifyBufferAvailable(), make sure that we never add a buffer // after releaseAllResources() released all buffers from receivedBuffers // (see above for details). if (isReleased.get()) { return; }
wasEmpty = receivedBuffers.isEmpty();
SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, sequenceNumber); DataType dataType = buffer.getDataType(); if (dataType.hasPriority()) { // 如果有优先级, 添加到队列头部 firstPriorityEvent = addPriorityBuffer(sequenceBuffer); } else { receivedBuffers.add(sequenceBuffer); if (dataType.requiresAnnouncement()) { // 针对 CheckpointBarrier 元素 firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer)); } } channelStatePersister .checkForBarrier(sequenceBuffer.buffer) .filter(id -> id > lastBarrierId) .ifPresent( id -> { // checkpoint was not yet started by task thread, // so remember the numbers of buffers to spill for the time when // it will be started lastBarrierId = id; lastBarrierSequenceNumber = sequenceBuffer.sequenceNumber; }); channelStatePersister.maybePersist(buffer); ++expectedSequenceNumber; } recycleBuffer = false;
if (firstPriorityEvent) { // 通知 有优先级事件(此方法调用InputGate方法) notifyPriorityEvent(sequenceNumber); } if (wasEmpty) { // 通知下游消费(此方法调用InputGate方法) notifyChannelNonEmpty(); }