1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| public void checkpointState( CheckpointMetaData metadata, CheckpointOptions options, CheckpointMetricsBuilder metrics, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isCanceled) throws Exception {
checkNotNull(options); checkNotNull(metrics);
if (lastCheckpointId >= metadata.getCheckpointId()) { LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId()); channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true); checkAndClearAbortedStatus(metadata.getCheckpointId()); return; }
lastCheckpointId = metadata.getCheckpointId(); if (checkAndClearAbortedStatus(metadata.getCheckpointId())) { operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId())); LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId()); return; }
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
operatorChain.broadcastEvent( new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options), options.isUnalignedCheckpoint());
if (options.isUnalignedCheckpoint()) { channelStateWriter.finishOutput(metadata.getCheckpointId()); }
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators()); try { if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) { finishAndReportAsync(snapshotFutures, metadata, metrics, options); } else { cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined")); } } catch (Exception ex) { cleanup(snapshotFutures, metadata, metrics, ex); throw ex; } }
|