diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java index 0cb573d466f69..69ee4b07914b1 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java @@ -34,6 +34,7 @@ import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; @@ -106,6 +107,8 @@ public class CompactorOperator // submitted again while restoring private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState; + private long lastKnownCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; + public CompactorOperator( FileCompactStrategy strategy, SimpleVersionedSerializer<FileSinkCommittable> committableSerializer, @@ -136,15 +139,16 @@ public void processElement(StreamRecord<CompactorRequest> element) throws Except @Override public void endInput() throws Exception { // add collecting requests into the final snapshot - checkpointRequests.put(CommittableMessage.EOI, collectingRequests); + long checkpointId = lastKnownCheckpointId + 1; + checkpointRequests.put(checkpointId, collectingRequests); collectingRequests = new ArrayList<>(); // submit all requests and wait until they are done - submitUntil(CommittableMessage.EOI); + submitUntil(checkpointId); assert checkpointRequests.isEmpty(); getAllTasksFuture().join(); - emitCompacted(CommittableMessage.EOI); + emitCompacted(checkpointId); assert compactingRequests.isEmpty(); } @@ -222,6 +226,8 @@ private void submitUntil(long checkpointId) { } private void emitCompacted(long checkpointId) throws Exception { + lastKnownCheckpointId = checkpointId; + List<FileSinkCommittable> compacted = new ArrayList<>(); Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> iter = compactingRequests.iterator(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java index 7db0c29ecc6bc..4a2049dbce869 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java @@ -28,8 +28,10 @@ public interface CommittableMessage<CommT> { /** * Special value for checkpointId for the end of input in case of batch commit or final * checkpoint. + * + * @deprecated the special value is not used anymore at all (remove with Flink 2.2) */ - long EOI = Long.MAX_VALUE; + @Deprecated long EOI = Long.MAX_VALUE; /** The subtask that created this committable. */ int getSubtaskId(); @@ -49,6 +51,8 @@ default OptionalLong getCheckpointId() { /** * Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch * commit. + * + * @deprecated the special value EOI is not used anymore */ long getCheckpointIdOrEOI(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java index 5a745238d69ee..cd02789219324 100755 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java @@ -35,6 +35,12 @@ public interface BoundedOneInput { /** * It is notified that no more data will arrive from the input. * + * <p>Stateful operators need to be aware that a restart with rescaling may occur after + * receiving this notification. A changed source split assignment may imply that the same + * subtask of this operator that received endInput, has its state after endInput snapshotted, + * and will receive new data after restart. Hence, the state should not contain any finalization + * that would make it impossible to process new data. + * * <p><b>WARNING:</b> It is not safe to use this method to commit any transactions or other side * effects! You can use this method to flush any buffered data that can later on be committed * e.g. in a {@link StreamOperator#notifyCheckpointComplete(long)}. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 10ae86cf10de0..6954ad24e36f2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.SinkOptions; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -51,7 +52,6 @@ import java.util.Collections; import java.util.OptionalLong; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -76,11 +76,9 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage private SinkCommitterMetricGroup metricGroup; private Committer<CommT> committer; private CommittableCollector<CommT> committableCollector; - private long lastCompletedCheckpointId = -1; + private long lastCompletedCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; private int maxRetries; - private boolean endInput = false; - /** The operator's state descriptor. */ private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>( @@ -131,11 +129,11 @@ public void initializeState(StateInitializationContext context) throws Exception getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(), metricGroup)); - if (context.isRestored()) { + if (checkpointId.isPresent()) { committableCollectorState.get().forEach(cc -> committableCollector.merge(cc)); lastCompletedCheckpointId = checkpointId.getAsLong(); // try to re-commit recovered transactions as quickly as possible - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(lastCompletedCheckpointId); } } @@ -148,24 +146,23 @@ public void snapshotState(StateSnapshotContext context) throws Exception { @Override public void endInput() throws Exception { - endInput = true; if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(lastCompletedCheckpointId + 1); } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); - commitAndEmitCheckpoints(); + commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, checkpointId)); } - private void commitAndEmitCheckpoints() throws IOException, InterruptedException { - long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId; + private void commitAndEmitCheckpoints(long checkpointId) + throws IOException, InterruptedException { + lastCompletedCheckpointId = checkpointId; for (CheckpointCommittableManager<CommT> checkpointManager : - committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { + committableCollector.getCheckpointCommittablesUpTo(checkpointId)) { // ensure that all committables of the first checkpoint are fully committed before // attempting the next committable commitAndEmit(checkpointManager); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index 7fb78f37c0d81..31397f48b3705 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.BooleanSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.Sink; @@ -52,8 +51,6 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.UserCodeClassLoader; -import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; - import javax.annotation.Nullable; import java.io.IOException; @@ -62,6 +59,7 @@ import java.util.List; import java.util.OptionalLong; +import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -91,13 +89,6 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab @Nullable private final SimpleVersionedSerializer<CommT> committableSerializer; private final List<CommT> legacyCommittables = new ArrayList<>(); - /** - * Used to remember that EOI has already happened so that we don't emit the last committables of - * the final checkpoints twice. - */ - private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC = - new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE); - /** The runtime information of the input element. */ private final Context<InputT> context; @@ -115,10 +106,7 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab private final MailboxExecutor mailboxExecutor; private boolean endOfInput = false; - /** - * Remembers the endOfInput state for (final) checkpoints iff the operator emits committables. - */ - @Nullable private ListState<Boolean> endOfInputState; + private long lastKnownCheckpointId = INITIAL_CHECKPOINT_ID - 1; SinkWriterOperator( Sink<InputT> sink, @@ -146,8 +134,10 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - WriterInitContext initContext = createInitContext(context.getRestoredCheckpointId()); - if (context.isRestored()) { + OptionalLong restoredCheckpointId = context.getRestoredCheckpointId(); + WriterInitContext initContext = createInitContext(restoredCheckpointId); + if (restoredCheckpointId.isPresent()) { + lastKnownCheckpointId = restoredCheckpointId.getAsLong(); if (committableSerializer != null) { final ListState<List<CommT>> legacyCommitterState = new SimpleVersionedListState<>( @@ -161,41 +151,12 @@ public void initializeState(StateInitializationContext context) throws Exception } sinkWriter = writerStateHandler.createWriter(initContext, context); - - if (emitDownstream) { - // Figure out if we have seen end of input before and if we can suppress creating - // transactions and sending them downstream to the CommitterOperator. We have the - // following - // cases: - // 1. state is empty: - // - First time initialization - // - Restoring from a previous version of Flink that didn't handle EOI - // - Upscaled from a final or regular checkpoint - // In all cases, we regularly handle EOI, potentially resulting in duplicate summaries - // that the CommitterOperator needs to handle. - // 2. state is not empty: - // - This implies Flink restores from a version that handles EOI. - // - If there is one entry, no rescaling happened (for this subtask), so if it's true, - // we recover from a final checkpoint (for this subtask) and can ignore another EOI - // else we have a regular checkpoint. - // - If there are multiple entries, Flink downscaled, and we need to check if all are - // true and do the same as above. As soon as one entry is false, we regularly start - // the writer and potentially emit duplicate summaries if we indeed recovered from a - // final checkpoint. - endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC); - ArrayList<Boolean> previousState = Lists.newArrayList(endOfInputState.get()); - endOfInput = !previousState.isEmpty() && !previousState.contains(false); - } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); writerStateHandler.snapshotState(context.getCheckpointId()); - if (endOfInputState != null) { - endOfInputState.clear(); - endOfInputState.add(this.endOfInput); - } } @Override @@ -225,17 +186,16 @@ public void processWatermark(Watermark mark) throws Exception { @Override public void endInput() throws Exception { + LOG.info("Received endInput"); if (!endOfInput) { endOfInput = true; - if (endOfInputState != null) { - endOfInputState.add(true); - } sinkWriter.flush(true); - emitCommittables(CommittableMessage.EOI); + emitCommittables(lastKnownCheckpointId + 1); } } private void emitCommittables(long checkpointId) throws IOException, InterruptedException { + lastKnownCheckpointId = checkpointId; if (!emitDownstream) { // To support SinkV1 topologies with only a writer we have to call prepareCommit // although no committables are forwarded diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index da4491cda617d..816bd55543e51 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -95,6 +95,7 @@ void addSummary(CommittableSummary<CommT> summary) { summary.getSubtaskId(), checkpointId, metricGroup); + // Remove branch once CommittableMessage.EOI has been removed (earliest 2.2) if (checkpointId == CommittableMessage.EOI) { SubtaskCommittableManager<CommT> merged = subtasksCommittableManagers.merge( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java index 4e49d73279e48..96585a632d107 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java @@ -33,7 +33,6 @@ import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Objects; -import java.util.Optional; import java.util.TreeMap; import java.util.stream.Collectors; @@ -49,7 +48,6 @@ */ @Internal public class CommittableCollector<CommT> { - private static final long EOI = Long.MAX_VALUE; /** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. */ private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>> checkpointCommittables; @@ -143,15 +141,6 @@ public Collection<? extends CheckpointCommittableManager<CommT>> getCheckpointCo return new ArrayList<>(checkpointCommittables.headMap(checkpointId, true).values()); } - /** - * Returns {@link CheckpointCommittableManager} belonging to the last input. - * - * @return {@link CheckpointCommittableManager} - */ - public Optional<CheckpointCommittableManager<CommT>> getEndOfInputCommittable() { - return Optional.ofNullable(checkpointCommittables.get(EOI)); - } - /** * Returns whether all {@link CheckpointCommittableManager} currently hold by the collector are * either committed or failed. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java index 641a651e2e406..24f9422d30b72 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java @@ -32,7 +32,6 @@ import java.util.Collection; import java.util.List; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class GlobalCommitterOperatorTest { @@ -138,38 +137,6 @@ void testStateRestore() throws Exception { } } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Exception { - final MockCommitter committer = new MockCommitter(); - final OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness = - createTestHarness(committer, commitOnInput); - testHarness.open(); - - final CommittableSummary<Integer> committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary<Integer> committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage<Integer> first = new CommittableWithLineage<>(1, EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage<Integer> second = new CommittableWithLineage<>(2, EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - // commitOnInput implies that the global committer is not using notifyCheckpointComplete - if (commitOnInput) { - assertThat(committer.committed).containsExactly(1, 2); - } else { - assertThat(committer.committed).isEmpty(); - testHarness.notifyOfCompletedCheckpoint(EOI); - assertThat(committer.committed).containsExactly(1, 2); - } - - assertThat(testHarness.getOutput()).isEmpty(); - } - private OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> createTestHarness( Committer<Integer> committer, boolean commitOnInput) throws Exception { return new OneInputStreamOperatorTestHarness<>( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index 756ea0c8022f0..c8b37943846af 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -35,7 +35,6 @@ import java.util.function.IntSupplier; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; import static org.assertj.core.api.Assertions.as; @@ -126,45 +125,6 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception { testHarness.close(); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception { - SinkAndCounters sinkAndCounters = sinkWithPostCommit(); - final OneInputStreamOperatorTestHarness< - CommittableMessage<String>, CommittableMessage<String>> - testHarness = createTestHarness(sinkAndCounters.sink, isBatchMode, !isBatchMode); - testHarness.open(); - - final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary)); - final CommittableSummary<String> committableSummary2 = - new CommittableSummary<>(2, 2, EOI, 1, 1, 0); - testHarness.processElement(new StreamRecord<>(committableSummary2)); - - final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", EOI, 1); - testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage<String> second = new CommittableWithLineage<>("1", EOI, 2); - testHarness.processElement(new StreamRecord<>(second)); - - testHarness.endInput(); - if (!isBatchMode) { - assertThat(testHarness.getOutput()).isEmpty(); - // notify final checkpoint complete - testHarness.notifyOfCompletedCheckpoint(1); - } - - ListAssert<CommittableMessage<String>> records = - assertThat(testHarness.extractOutputValues()).hasSize(3); - records.element(0, as(committableSummary())) - .hasFailedCommittables(0) - .hasOverallCommittables(2) - .hasPendingCommittables(0); - records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0)); - records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0)); - testHarness.close(); - } - @Test void testStateRestore() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java index d6513ab738a6d..e0425dc56c85e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java @@ -66,7 +66,6 @@ import java.util.stream.Collectors; import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID; -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary; import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage; import static org.assertj.core.api.Assertions.as; @@ -178,7 +177,7 @@ void testEmitOnEndOfInputInBatchMode() throws Exception { testHarness.processElement(1, 1); testHarness.endInput(); - assertBasicOutput(testHarness.extractOutputValues(), 1, EOI); + assertBasicOutput(testHarness.extractOutputValues(), 1, 1L); } @ParameterizedTest @@ -467,6 +466,43 @@ public SinkWriter<String> createWriter(InitContext context) { testHarness.close(); } + @Test + void testDoubleEndOfInput() throws Exception { + InspectableSink sink = sinkWithCommitter(); + + OperatorSubtaskState snapshot; + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(sink.getSink()))) { + testHarness.open(); + testHarness.processElement(1, 1); + + testHarness.endInput(); + testHarness.prepareSnapshotPreBarrier(1); + snapshot = testHarness.snapshot(1, 1); + + assertBasicOutput(testHarness.extractOutputValues(), 1, 1L); + } + + final InspectableSink restoredSink = sinkWithCommitter(); + try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> + restoredTestHarness = + new OneInputStreamOperatorTestHarness<>( + new SinkWriterOperatorFactory<>(restoredSink.getSink()))) { + restoredTestHarness.setRestoredCheckpointId(1L); + restoredTestHarness.initializeState(snapshot); + restoredTestHarness.open(); + restoredTestHarness.processElement(2, 2); + + restoredTestHarness.endInput(); + restoredTestHarness.prepareSnapshotPreBarrier(3); + restoredTestHarness.snapshot(3, 1); + + // asserts the guessed checkpoint id which needs + assertBasicOutput(restoredTestHarness.extractOutputValues(), 1, 2L); + } + } + private static void assertContextsEqual( Sink.InitContext initContext, WriterInitContext original) { assertThat(initContext.getUserCodeClassLoader().asClassLoader()) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java index 6e55adcc0c572..3181c21361aa8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java @@ -24,9 +24,6 @@ import org.junit.jupiter.api.Test; -import java.util.Optional; - -import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class CommittableCollectorTest { @@ -44,22 +41,5 @@ void testGetCheckpointCommittablesUpTo() { committableCollector.addMessage(new CommittableSummary<>(1, 1, 3L, 1, 0, 0)); assertThat(committableCollector.getCheckpointCommittablesUpTo(2)).hasSize(2); - - assertThat(committableCollector.getEndOfInputCommittable()).isNotPresent(); - } - - @Test - void testGetEndOfInputCommittable() { - final CommittableCollector<Integer> committableCollector = - new CommittableCollector<>(METRIC_GROUP); - CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, EOI, 1, 0, 0); - committableCollector.addMessage(first); - - Optional<CheckpointCommittableManager<Integer>> endOfInputCommittable = - committableCollector.getEndOfInputCommittable(); - assertThat(endOfInputCommittable).isPresent(); - assertThat(endOfInputCommittable) - .get() - .returns(EOI, CheckpointCommittableManager::getCheckpointId); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index aa58c4ea8ccd3..8935c61865d97 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -176,6 +176,8 @@ public <K> InternalTimeServiceManager<K> create( private volatile boolean wasFailedExternally = false; + private long restoredCheckpointId = 0; + public AbstractStreamOperatorTestHarness( StreamOperator<OUT> operator, int maxParallelism, int parallelism, int subtaskIndex) throws Exception { @@ -388,6 +390,10 @@ public StreamConfig getStreamConfig() { return config; } + public void setRestoredCheckpointId(long restoredCheckpointId) { + this.restoredCheckpointId = restoredCheckpointId; + } + /** Get all the output from the task. This contains StreamRecords and Events interleaved. */ public ConcurrentLinkedQueue<Object> getOutput() { return outputList; @@ -596,16 +602,16 @@ public void initializeState( jmTaskStateSnapshot.putSubtaskStateByOperatorID( operator.getOperatorID(), jmOperatorStateHandles); - taskStateManager.setReportedCheckpointId(0); + taskStateManager.setReportedCheckpointId(restoredCheckpointId); taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId( - Collections.singletonMap(0L, jmTaskStateSnapshot)); + Collections.singletonMap(restoredCheckpointId, jmTaskStateSnapshot)); if (tmOperatorStateHandles != null) { TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot(); tmTaskStateSnapshot.putSubtaskStateByOperatorID( operator.getOperatorID(), tmOperatorStateHandles); taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId( - Collections.singletonMap(0L, tmTaskStateSnapshot)); + Collections.singletonMap(restoredCheckpointId, tmTaskStateSnapshot)); } }