diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
index 0cb573d466f69..69ee4b07914b1 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
@@ -34,6 +34,7 @@
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -106,6 +107,8 @@ public class CompactorOperator
     // submitted again while restoring
     private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;
 
+    private long lastKnownCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
+
     public CompactorOperator(
             FileCompactStrategy strategy,
             SimpleVersionedSerializer<FileSinkCommittable> committableSerializer,
@@ -136,15 +139,16 @@ public void processElement(StreamRecord<CompactorRequest> element) throws Except
     @Override
     public void endInput() throws Exception {
         // add collecting requests into the final snapshot
-        checkpointRequests.put(CommittableMessage.EOI, collectingRequests);
+        long checkpointId = lastKnownCheckpointId + 1;
+        checkpointRequests.put(checkpointId, collectingRequests);
         collectingRequests = new ArrayList<>();
 
         // submit all requests and wait until they are done
-        submitUntil(CommittableMessage.EOI);
+        submitUntil(checkpointId);
         assert checkpointRequests.isEmpty();
 
         getAllTasksFuture().join();
-        emitCompacted(CommittableMessage.EOI);
+        emitCompacted(checkpointId);
         assert compactingRequests.isEmpty();
     }
 
@@ -222,6 +226,8 @@ private void submitUntil(long checkpointId) {
     }
 
     private void emitCompacted(long checkpointId) throws Exception {
+        lastKnownCheckpointId = checkpointId;
+
         List<FileSinkCommittable> compacted = new ArrayList<>();
         Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> iter =
                 compactingRequests.iterator();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
index 7db0c29ecc6bc..4a2049dbce869 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
@@ -28,8 +28,10 @@ public interface CommittableMessage<CommT> {
     /**
      * Special value for checkpointId for the end of input in case of batch commit or final
      * checkpoint.
+     *
+     * @deprecated the special value is not used anymore at all (remove with Flink 2.2)
      */
-    long EOI = Long.MAX_VALUE;
+    @Deprecated long EOI = Long.MAX_VALUE;
 
     /** The subtask that created this committable. */
     int getSubtaskId();
@@ -49,6 +51,8 @@ default OptionalLong getCheckpointId() {
     /**
      * Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch
      * commit.
+     *
+     * @deprecated the special value EOI is not used anymore
      */
     long getCheckpointIdOrEOI();
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java
index 5a745238d69ee..cd02789219324 100755
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java
@@ -35,6 +35,12 @@ public interface BoundedOneInput {
     /**
      * It is notified that no more data will arrive from the input.
      *
+     * <p>Stateful operators need to be aware that a restart with rescaling may occur after
+     * receiving this notification. A changed source split assignment may imply that the same
+     * subtask of this operator that received endInput, has its state after endInput snapshotted,
+     * and will receive new data after restart. Hence, the state should not contain any finalization
+     * that would make it impossible to process new data.
+     *
      * <p><b>WARNING:</b> It is not safe to use this method to commit any transactions or other side
      * effects! You can use this method to flush any buffered data that can later on be committed
      * e.g. in a {@link StreamOperator#notifyCheckpointComplete(long)}.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 10ae86cf10de0..6954ad24e36f2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -25,6 +25,7 @@
 import org.apache.flink.configuration.SinkOptions;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -51,7 +52,6 @@
 import java.util.Collections;
 import java.util.OptionalLong;
 
-import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -76,11 +76,9 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage
     private SinkCommitterMetricGroup metricGroup;
     private Committer<CommT> committer;
     private CommittableCollector<CommT> committableCollector;
-    private long lastCompletedCheckpointId = -1;
+    private long lastCompletedCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
     private int maxRetries;
 
-    private boolean endInput = false;
-
     /** The operator's state descriptor. */
     private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
             new ListStateDescriptor<>(
@@ -131,11 +129,11 @@ public void initializeState(StateInitializationContext context) throws Exception
                                 getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
                                 getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
                                 metricGroup));
-        if (context.isRestored()) {
+        if (checkpointId.isPresent()) {
             committableCollectorState.get().forEach(cc -> committableCollector.merge(cc));
             lastCompletedCheckpointId = checkpointId.getAsLong();
             // try to re-commit recovered transactions as quickly as possible
-            commitAndEmitCheckpoints();
+            commitAndEmitCheckpoints(lastCompletedCheckpointId);
         }
     }
 
@@ -148,24 +146,23 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
 
     @Override
     public void endInput() throws Exception {
-        endInput = true;
         if (!isCheckpointingEnabled || isBatchMode) {
             // There will be no final checkpoint, all committables should be committed here
-            commitAndEmitCheckpoints();
+            commitAndEmitCheckpoints(lastCompletedCheckpointId + 1);
         }
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId);
-        commitAndEmitCheckpoints();
+        commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, checkpointId));
     }
 
-    private void commitAndEmitCheckpoints() throws IOException, InterruptedException {
-        long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId;
+    private void commitAndEmitCheckpoints(long checkpointId)
+            throws IOException, InterruptedException {
+        lastCompletedCheckpointId = checkpointId;
         for (CheckpointCommittableManager<CommT> checkpointManager :
-                committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
+                committableCollector.getCheckpointCommittablesUpTo(checkpointId)) {
             // ensure that all committables of the first checkpoint are fully committed before
             // attempting the next committable
             commitAndEmit(checkpointManager);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index 7fb78f37c0d81..31397f48b3705 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -23,7 +23,6 @@
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
 import org.apache.flink.api.connector.sink2.Sink;
@@ -52,8 +51,6 @@
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.UserCodeClassLoader;
 
-import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
-
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -62,6 +59,7 @@
 import java.util.List;
 import java.util.OptionalLong;
 
+import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -91,13 +89,6 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
     @Nullable private final SimpleVersionedSerializer<CommT> committableSerializer;
     private final List<CommT> legacyCommittables = new ArrayList<>();
 
-    /**
-     * Used to remember that EOI has already happened so that we don't emit the last committables of
-     * the final checkpoints twice.
-     */
-    private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC =
-            new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE);
-
     /** The runtime information of the input element. */
     private final Context<InputT> context;
 
@@ -115,10 +106,7 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
     private final MailboxExecutor mailboxExecutor;
 
     private boolean endOfInput = false;
-    /**
-     * Remembers the endOfInput state for (final) checkpoints iff the operator emits committables.
-     */
-    @Nullable private ListState<Boolean> endOfInputState;
+    private long lastKnownCheckpointId = INITIAL_CHECKPOINT_ID - 1;
 
     SinkWriterOperator(
             Sink<InputT> sink,
@@ -146,8 +134,10 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
     @Override
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
-        WriterInitContext initContext = createInitContext(context.getRestoredCheckpointId());
-        if (context.isRestored()) {
+        OptionalLong restoredCheckpointId = context.getRestoredCheckpointId();
+        WriterInitContext initContext = createInitContext(restoredCheckpointId);
+        if (restoredCheckpointId.isPresent()) {
+            lastKnownCheckpointId = restoredCheckpointId.getAsLong();
             if (committableSerializer != null) {
                 final ListState<List<CommT>> legacyCommitterState =
                         new SimpleVersionedListState<>(
@@ -161,41 +151,12 @@ public void initializeState(StateInitializationContext context) throws Exception
         }
 
         sinkWriter = writerStateHandler.createWriter(initContext, context);
-
-        if (emitDownstream) {
-            // Figure out if we have seen end of input before and if we can suppress creating
-            // transactions and sending them downstream to the CommitterOperator. We have the
-            // following
-            // cases:
-            // 1. state is empty:
-            //   - First time initialization
-            //   - Restoring from a previous version of Flink that didn't handle EOI
-            //   - Upscaled from a final or regular checkpoint
-            // In all cases, we regularly handle EOI, potentially resulting in duplicate summaries
-            // that the CommitterOperator needs to handle.
-            // 2. state is not empty:
-            //   - This implies Flink restores from a version that handles EOI.
-            //   - If there is one entry, no rescaling happened (for this subtask), so if it's true,
-            //     we recover from a final checkpoint (for this subtask) and can ignore another EOI
-            //     else we have a regular checkpoint.
-            //   - If there are multiple entries, Flink downscaled, and we need to check if all are
-            //     true and do the same as above. As soon as one entry is false, we regularly start
-            //     the writer and potentially emit duplicate summaries if we indeed recovered from a
-            //     final checkpoint.
-            endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
-            ArrayList<Boolean> previousState = Lists.newArrayList(endOfInputState.get());
-            endOfInput = !previousState.isEmpty() && !previousState.contains(false);
-        }
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
         writerStateHandler.snapshotState(context.getCheckpointId());
-        if (endOfInputState != null) {
-            endOfInputState.clear();
-            endOfInputState.add(this.endOfInput);
-        }
     }
 
     @Override
@@ -225,17 +186,16 @@ public void processWatermark(Watermark mark) throws Exception {
 
     @Override
     public void endInput() throws Exception {
+        LOG.info("Received endInput");
         if (!endOfInput) {
             endOfInput = true;
-            if (endOfInputState != null) {
-                endOfInputState.add(true);
-            }
             sinkWriter.flush(true);
-            emitCommittables(CommittableMessage.EOI);
+            emitCommittables(lastKnownCheckpointId + 1);
         }
     }
 
     private void emitCommittables(long checkpointId) throws IOException, InterruptedException {
+        lastKnownCheckpointId = checkpointId;
         if (!emitDownstream) {
             // To support SinkV1 topologies with only a writer we have to call prepareCommit
             // although no committables are forwarded
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index da4491cda617d..816bd55543e51 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -95,6 +95,7 @@ void addSummary(CommittableSummary<CommT> summary) {
                         summary.getSubtaskId(),
                         checkpointId,
                         metricGroup);
+        // Remove branch once CommittableMessage.EOI has been removed (earliest 2.2)
         if (checkpointId == CommittableMessage.EOI) {
             SubtaskCommittableManager<CommT> merged =
                     subtasksCommittableManagers.merge(
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
index 4e49d73279e48..96585a632d107 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
@@ -33,7 +33,6 @@
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
@@ -49,7 +48,6 @@
  */
 @Internal
 public class CommittableCollector<CommT> {
-    private static final long EOI = Long.MAX_VALUE;
     /** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. */
     private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>>
             checkpointCommittables;
@@ -143,15 +141,6 @@ public Collection<? extends CheckpointCommittableManager<CommT>> getCheckpointCo
         return new ArrayList<>(checkpointCommittables.headMap(checkpointId, true).values());
     }
 
-    /**
-     * Returns {@link CheckpointCommittableManager} belonging to the last input.
-     *
-     * @return {@link CheckpointCommittableManager}
-     */
-    public Optional<CheckpointCommittableManager<CommT>> getEndOfInputCommittable() {
-        return Optional.ofNullable(checkpointCommittables.get(EOI));
-    }
-
     /**
      * Returns whether all {@link CheckpointCommittableManager} currently hold by the collector are
      * either committed or failed.
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
index 641a651e2e406..24f9422d30b72 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
@@ -32,7 +32,6 @@
 import java.util.Collection;
 import java.util.List;
 
-import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class GlobalCommitterOperatorTest {
@@ -138,38 +137,6 @@ void testStateRestore() throws Exception {
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Exception {
-        final MockCommitter committer = new MockCommitter();
-        final OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness =
-                createTestHarness(committer, commitOnInput);
-        testHarness.open();
-
-        final CommittableSummary<Integer> committableSummary =
-                new CommittableSummary<>(1, 2, EOI, 1, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableSummary<Integer> committableSummary2 =
-                new CommittableSummary<>(2, 2, EOI, 1, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary2));
-
-        final CommittableWithLineage<Integer> first = new CommittableWithLineage<>(1, EOI, 1);
-        testHarness.processElement(new StreamRecord<>(first));
-        final CommittableWithLineage<Integer> second = new CommittableWithLineage<>(2, EOI, 2);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        // commitOnInput implies that the global committer is not using notifyCheckpointComplete
-        if (commitOnInput) {
-            assertThat(committer.committed).containsExactly(1, 2);
-        } else {
-            assertThat(committer.committed).isEmpty();
-            testHarness.notifyOfCompletedCheckpoint(EOI);
-            assertThat(committer.committed).containsExactly(1, 2);
-        }
-
-        assertThat(testHarness.getOutput()).isEmpty();
-    }
-
     private OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> createTestHarness(
             Committer<Integer> committer, boolean commitOnInput) throws Exception {
         return new OneInputStreamOperatorTestHarness<>(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index 756ea0c8022f0..c8b37943846af 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -35,7 +35,6 @@
 
 import java.util.function.IntSupplier;
 
-import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
 import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
 import static org.assertj.core.api.Assertions.as;
@@ -126,45 +125,6 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
         testHarness.close();
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception {
-        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(sinkAndCounters.sink, isBatchMode, !isBatchMode);
-        testHarness.open();
-
-        final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(1, 2, EOI, 1, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableSummary<String> committableSummary2 =
-                new CommittableSummary<>(2, 2, EOI, 1, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary2));
-
-        final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", EOI, 1);
-        testHarness.processElement(new StreamRecord<>(first));
-        final CommittableWithLineage<String> second = new CommittableWithLineage<>("1", EOI, 2);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        testHarness.endInput();
-        if (!isBatchMode) {
-            assertThat(testHarness.getOutput()).isEmpty();
-            // notify final checkpoint complete
-            testHarness.notifyOfCompletedCheckpoint(1);
-        }
-
-        ListAssert<CommittableMessage<String>> records =
-                assertThat(testHarness.extractOutputValues()).hasSize(3);
-        records.element(0, as(committableSummary()))
-                .hasFailedCommittables(0)
-                .hasOverallCommittables(2)
-                .hasPendingCommittables(0);
-        records.element(1, as(committableWithLineage())).isEqualTo(first.withSubtaskId(0));
-        records.element(2, as(committableWithLineage())).isEqualTo(second.withSubtaskId(0));
-        testHarness.close();
-    }
-
     @Test
     void testStateRestore() throws Exception {
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
index d6513ab738a6d..e0425dc56c85e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
@@ -66,7 +66,6 @@
 import java.util.stream.Collectors;
 
 import static org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID;
-import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
 import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
 import static org.assertj.core.api.Assertions.as;
@@ -178,7 +177,7 @@ void testEmitOnEndOfInputInBatchMode() throws Exception {
 
         testHarness.processElement(1, 1);
         testHarness.endInput();
-        assertBasicOutput(testHarness.extractOutputValues(), 1, EOI);
+        assertBasicOutput(testHarness.extractOutputValues(), 1, 1L);
     }
 
     @ParameterizedTest
@@ -467,6 +466,43 @@ public SinkWriter<String> createWriter(InitContext context) {
         testHarness.close();
     }
 
+    @Test
+    void testDoubleEndOfInput() throws Exception {
+        InspectableSink sink = sinkWithCommitter();
+
+        OperatorSubtaskState snapshot;
+        try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink.getSink()))) {
+            testHarness.open();
+            testHarness.processElement(1, 1);
+
+            testHarness.endInput();
+            testHarness.prepareSnapshotPreBarrier(1);
+            snapshot = testHarness.snapshot(1, 1);
+
+            assertBasicOutput(testHarness.extractOutputValues(), 1, 1L);
+        }
+
+        final InspectableSink restoredSink = sinkWithCommitter();
+        try (OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>>
+                restoredTestHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new SinkWriterOperatorFactory<>(restoredSink.getSink()))) {
+            restoredTestHarness.setRestoredCheckpointId(1L);
+            restoredTestHarness.initializeState(snapshot);
+            restoredTestHarness.open();
+            restoredTestHarness.processElement(2, 2);
+
+            restoredTestHarness.endInput();
+            restoredTestHarness.prepareSnapshotPreBarrier(3);
+            restoredTestHarness.snapshot(3, 1);
+
+            // asserts the guessed checkpoint id which needs
+            assertBasicOutput(restoredTestHarness.extractOutputValues(), 1, 2L);
+        }
+    }
+
     private static void assertContextsEqual(
             Sink.InitContext initContext, WriterInitContext original) {
         assertThat(initContext.getUserCodeClassLoader().asClassLoader())
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
index 6e55adcc0c572..3181c21361aa8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
@@ -24,9 +24,6 @@
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Optional;
-
-import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class CommittableCollectorTest {
@@ -44,22 +41,5 @@ void testGetCheckpointCommittablesUpTo() {
         committableCollector.addMessage(new CommittableSummary<>(1, 1, 3L, 1, 0, 0));
 
         assertThat(committableCollector.getCheckpointCommittablesUpTo(2)).hasSize(2);
-
-        assertThat(committableCollector.getEndOfInputCommittable()).isNotPresent();
-    }
-
-    @Test
-    void testGetEndOfInputCommittable() {
-        final CommittableCollector<Integer> committableCollector =
-                new CommittableCollector<>(METRIC_GROUP);
-        CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, EOI, 1, 0, 0);
-        committableCollector.addMessage(first);
-
-        Optional<CheckpointCommittableManager<Integer>> endOfInputCommittable =
-                committableCollector.getEndOfInputCommittable();
-        assertThat(endOfInputCommittable).isPresent();
-        assertThat(endOfInputCommittable)
-                .get()
-                .returns(EOI, CheckpointCommittableManager::getCheckpointId);
     }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index aa58c4ea8ccd3..8935c61865d97 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -176,6 +176,8 @@ public <K> InternalTimeServiceManager<K> create(
 
     private volatile boolean wasFailedExternally = false;
 
+    private long restoredCheckpointId = 0;
+
     public AbstractStreamOperatorTestHarness(
             StreamOperator<OUT> operator, int maxParallelism, int parallelism, int subtaskIndex)
             throws Exception {
@@ -388,6 +390,10 @@ public StreamConfig getStreamConfig() {
         return config;
     }
 
+    public void setRestoredCheckpointId(long restoredCheckpointId) {
+        this.restoredCheckpointId = restoredCheckpointId;
+    }
+
     /** Get all the output from the task. This contains StreamRecords and Events interleaved. */
     public ConcurrentLinkedQueue<Object> getOutput() {
         return outputList;
@@ -596,16 +602,16 @@ public void initializeState(
             jmTaskStateSnapshot.putSubtaskStateByOperatorID(
                     operator.getOperatorID(), jmOperatorStateHandles);
 
-            taskStateManager.setReportedCheckpointId(0);
+            taskStateManager.setReportedCheckpointId(restoredCheckpointId);
             taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(
-                    Collections.singletonMap(0L, jmTaskStateSnapshot));
+                    Collections.singletonMap(restoredCheckpointId, jmTaskStateSnapshot));
 
             if (tmOperatorStateHandles != null) {
                 TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot();
                 tmTaskStateSnapshot.putSubtaskStateByOperatorID(
                         operator.getOperatorID(), tmOperatorStateHandles);
                 taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId(
-                        Collections.singletonMap(0L, tmTaskStateSnapshot));
+                        Collections.singletonMap(restoredCheckpointId, tmTaskStateSnapshot));
             }
         }