final Queue<CircularElement> cache = new ArrayDeque<>(); CircularElement element; boolean cacheOnly = false;
// ------------------- In-Memory Cache ------------------------ // fill cache while (isRunning()) { // take next currWriteBuffer from queue try { // 队列中获取 element element = this.queues.spill.take(); } catch (InterruptedException iex) { thrownew IOException("The spilling thread was interrupted."); }
final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
// loop as long as the thread is marked alive and we do not see the final currWriteBuffer while (isRunning()) { try { // 如果 cache 为空,则获取spill 队列中数据(阻塞),不为空则直接获取poll element = cache.isEmpty() ? queues.spill.take() : cache.poll(); } catch (InterruptedException iex) { if (isRunning()) { LOG.error("Spilling thread was interrupted (without being shut down) while grabbing a buffer. " + "Retrying to grab buffer..."); continue; } else { return; } }
// check if we are still running if (!isRunning()) { return; } // check if this is the end-of-work buffer if (element == EOF_MARKER) { break; }
if (element.buffer.getOccupancy() > 0) { // open next channel FileIOChannel.ID channel = enumerator.next(); channelManager.addChannel(channel);
AbstractChannelWriterOutputView output = null; int bytesInLastBuffer; int blockCount;
final List<ChannelWithMeta> spillChannelIDs = new ArrayList<>(); List<ChannelWithMeta> finalMergeChannelIDs = new ArrayList<>(); ChannelWithMeta channelID;
while (isRunning()) { try { //从merge队列中获取channelID channelID = this.queues.merge.take(); } catch (InterruptedException iex) { if (isRunning()) { LOG.error("Merging thread was interrupted (without being shut down) " + "while grabbing a channel with meta. Retrying..."); continue; } else { return; } }
if (!isRunning()) { return; } if (channelID == FINAL_MERGE_MARKER) { //判断该channelID是否是最终MERGE标记 finalMergeChannelIDs.addAll(spillChannelIDs); spillChannelIDs.clear(); // 依据block数量排序channel finalMergeChannelIDs.sort(Comparator.comparingInt(ChannelWithMeta::getBlockCount)); break; }
// check if we have spilled some data at all if (finalMergeChannelIDs.isEmpty()) { if (iterator == null) { // notify 调用方获取 Iterator setResultIterator(EmptyMutableObjectIterator.get()); } } else { // merge channels until sufficient file handles are available while (isRunning() && finalMergeChannelIDs.size() > this.maxFanIn) { finalMergeChannelIDs = merger.mergeChannelList(finalMergeChannelIDs); }
// Beginning final merge.
// no need to call `getReadMemoryFromHeap` again, // because `finalMergeChannelIDs` must become smaller