diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
index 53f97362734f..d6db68e8cc61 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
@@ -37,10 +37,22 @@
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
+import java.util.Set;
@Slf4j
+/**
+ * Split reader for incremental source (snapshot + incremental phase).
+ *
+ *
Thread safety: This class is NOT thread-safe and is expected to be used from a single
+ * thread. The {@link #fetch()} method should be called sequentially without concurrent access. The
+ * {@link #close()} method should be called from the same thread or after all fetch calls have
+ * completed.
+ *
+ * @param The type of source configuration.
+ */
public class IncrementalSourceSplitReader
implements SplitReader {
private final Queue splits;
@@ -49,6 +61,7 @@ public class IncrementalSourceSplitReader
private Fetcher currentFetcher;
private String currentSplitId;
+ private String emittedFinishedSplitId;
private final DataSourceDialect dataSourceDialect;
private final C sourceConfig;
private final SchemaChangeResolver schemaChangeResolver;
@@ -70,6 +83,9 @@ public RecordsWithSplitIds fetch() throws IOException {
checkSplitOrStartNext();
checkNeedStopBinlogReader();
+ if (hasEmittedCurrentSplitFinished()) {
+ return NoSplitRecords.INSTANCE;
+ }
Iterator dataIt = null;
try {
dataIt = currentFetcher.pollSplitRecords();
@@ -77,9 +93,27 @@ public RecordsWithSplitIds fetch() throws IOException {
log.warn("fetch data failed.", e);
throw new IOException(e);
}
- return dataIt == null
- ? finishedSnapshotSplit()
- : ChangeEventRecords.forRecords(currentSplitId, dataIt);
+ if (dataIt == null) {
+ return finishedSnapshotSplit();
+ }
+ if (currentSplitId == null) {
+ log.warn(
+ "Invalid state: currentSplitId is null when emitting records. "
+ + "emittedFinishedSplitId={}, currentFetcher={}, isFinished={}",
+ emittedFinishedSplitId,
+ currentFetcher != null ? currentFetcher.getClass().getSimpleName() : "null",
+ currentFetcher != null && currentFetcher.isFinished());
+ throw new IOException(
+ String.format(
+ "Invalid state: currentSplitId is null when emitting records. "
+ + "emittedFinishedSplitId=%s, currentFetcher=%s, isFinished=%s",
+ emittedFinishedSplitId,
+ currentFetcher != null
+ ? currentFetcher.getClass().getSimpleName()
+ : "null",
+ currentFetcher != null && currentFetcher.isFinished()));
+ }
+ return ChangeEventRecords.forRecords(currentSplitId, dataIt);
}
@Override
@@ -100,10 +134,14 @@ public void wakeUp() {}
@Override
public void close() throws Exception {
- if (currentFetcher != null) {
- log.info("Close current fetcher {}", currentFetcher.getClass().getCanonicalName());
- currentFetcher.close();
+ try {
+ if (currentFetcher != null) {
+ log.info("Close current fetcher {}", currentFetcher.getClass().getCanonicalName());
+ currentFetcher.close();
+ }
+ } finally {
currentSplitId = null;
+ emittedFinishedSplitId = null;
}
}
@@ -123,6 +161,7 @@ protected void checkSplitOrStartNext() throws IOException {
throw new IOException("Cannot fetch from another split - no split remaining.");
}
currentSplitId = nextSplit.splitId();
+ emittedFinishedSplitId = null;
if (nextSplit.isSnapshotSplit()) {
if (currentFetcher == null) {
@@ -152,10 +191,52 @@ public boolean canAssignNextSplit() {
return currentFetcher == null || currentFetcher.isFinished();
}
- private ChangeEventRecords finishedSnapshotSplit() {
- final ChangeEventRecords finishedRecords =
- ChangeEventRecords.forFinishedSplit(currentSplitId);
- currentSplitId = null;
- return finishedRecords;
+ private boolean hasEmittedCurrentSplitFinished() {
+ return currentSplitId != null && currentSplitId.equals(emittedFinishedSplitId);
+ }
+
+ private RecordsWithSplitIds finishedSnapshotSplit() throws IOException {
+ final String splitId = currentSplitId;
+ if (splitId == null) {
+ log.warn(
+ "Invalid state: currentSplitId is null when finishing snapshot split. "
+ + "emittedFinishedSplitId={}, currentFetcher={}, isFinished={}",
+ emittedFinishedSplitId,
+ currentFetcher != null ? currentFetcher.getClass().getSimpleName() : "null",
+ currentFetcher != null && currentFetcher.isFinished());
+ throw new IOException(
+ String.format(
+ "Invalid state: currentSplitId is null when finishing snapshot split. "
+ + "emittedFinishedSplitId=%s, currentFetcher=%s, isFinished=%s",
+ emittedFinishedSplitId,
+ currentFetcher != null
+ ? currentFetcher.getClass().getSimpleName()
+ : "null",
+ currentFetcher != null && currentFetcher.isFinished()));
+ }
+ if (splitId.equals(emittedFinishedSplitId)) {
+ return NoSplitRecords.INSTANCE;
+ }
+ emittedFinishedSplitId = splitId;
+ return ChangeEventRecords.forFinishedSplit(splitId);
+ }
+
+ private static final class NoSplitRecords implements RecordsWithSplitIds {
+ private static final NoSplitRecords INSTANCE = new NoSplitRecords();
+
+ @Override
+ public String nextSplit() {
+ return null;
+ }
+
+ @Override
+ public SourceRecords nextRecordFromSplit() {
+ throw new IllegalStateException("No split assigned");
+ }
+
+ @Override
+ public Set finishedSplits() {
+ return Collections.emptySet();
+ }
}
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java
index 44c80a5ec5ed..cd81883d11ff 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java
@@ -74,7 +74,17 @@ public static ChangeEventRecords forRecords(
return new ChangeEventRecords(splitId, recordsForSplit, Collections.emptySet());
}
+ /**
+ * Creates a {@link ChangeEventRecords} that only indicates a split is finished.
+ *
+ * @param splitId the ID of the finished split, must not be null
+ * @return a new {@link ChangeEventRecords} instance
+ * @throws IllegalArgumentException if splitId is null
+ */
public static ChangeEventRecords forFinishedSplit(final String splitId) {
+ if (splitId == null) {
+ throw new IllegalArgumentException("splitId must not be null");
+ }
return new ChangeEventRecords(null, null, Collections.singleton(splitId));
}
}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java
new file mode 100644
index 000000000000..11b452fd3a3c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.source.reader;
+
+import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
+import org.apache.seatunnel.connectors.cdc.base.source.reader.external.Fetcher;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Collections;
+
+class IncrementalSourceSplitReaderTest {
+
+ @Test
+ void testFetchFinishedSnapshotSplitEmitsFinishedOnlyOnce() throws Exception {
+ DataSourceDialect dialect = Mockito.mock(DataSourceDialect.class);
+ SourceConfig config = Mockito.mock(SourceConfig.class);
+ SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class);
+
+ IncrementalSourceSplitReader reader =
+ new IncrementalSourceSplitReader(0, dialect, config, resolver) {
+ @Override
+ protected void checkSplitOrStartNext() {}
+ };
+
+ @SuppressWarnings("unchecked")
+ Fetcher fetcher = Mockito.mock(Fetcher.class);
+ Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);
+
+ setField(reader, "currentFetcher", fetcher);
+ setField(reader, "currentSplitId", "split-1");
+
+ RecordsWithSplitIds first = reader.fetch();
+ RecordsWithSplitIds second = reader.fetch();
+
+ Assertions.assertEquals(Collections.singleton("split-1"), first.finishedSplits());
+ Assertions.assertFalse(first.finishedSplits().contains(null));
+ Assertions.assertEquals(Collections.emptySet(), second.finishedSplits());
+ Assertions.assertFalse(second.finishedSplits().contains(null));
+ Mockito.verify(fetcher, Mockito.times(1)).pollSplitRecords();
+ }
+
+ @Test
+ void testFetchFinishedSnapshotSplitFailFastWhenCurrentSplitIdIsNull() throws Exception {
+ DataSourceDialect dialect = Mockito.mock(DataSourceDialect.class);
+ SourceConfig config = Mockito.mock(SourceConfig.class);
+ SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class);
+
+ IncrementalSourceSplitReader reader =
+ new IncrementalSourceSplitReader(0, dialect, config, resolver) {
+ @Override
+ protected void checkSplitOrStartNext() {}
+ };
+
+ @SuppressWarnings("unchecked")
+ Fetcher fetcher = Mockito.mock(Fetcher.class);
+ Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);
+
+ setField(reader, "currentFetcher", fetcher);
+ setField(reader, "currentSplitId", null);
+
+ Assertions.assertThrows(IOException.class, reader::fetch);
+ }
+
+ @Test
+ void testFetchFinishedSnapshotSplitSupportsNextSplitAfterIdChanges() throws Exception {
+ DataSourceDialect dialect = Mockito.mock(DataSourceDialect.class);
+ SourceConfig config = Mockito.mock(SourceConfig.class);
+ SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class);
+
+ IncrementalSourceSplitReader reader =
+ new IncrementalSourceSplitReader(0, dialect, config, resolver) {
+ @Override
+ protected void checkSplitOrStartNext() {}
+ };
+
+ @SuppressWarnings("unchecked")
+ Fetcher fetcher = Mockito.mock(Fetcher.class);
+ Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);
+
+ setField(reader, "currentFetcher", fetcher);
+ setField(reader, "currentSplitId", "split-1");
+
+ RecordsWithSplitIds first = reader.fetch();
+ RecordsWithSplitIds idle = reader.fetch();
+
+ setField(reader, "currentSplitId", "split-2");
+ RecordsWithSplitIds second = reader.fetch();
+
+ Assertions.assertEquals(Collections.singleton("split-1"), first.finishedSplits());
+ Assertions.assertEquals(Collections.emptySet(), idle.finishedSplits());
+ Assertions.assertEquals(Collections.singleton("split-2"), second.finishedSplits());
+ Mockito.verify(fetcher, Mockito.times(2)).pollSplitRecords();
+ }
+
+ @Test
+ void testCloseClearsState() throws Exception {
+ DataSourceDialect dialect = Mockito.mock(DataSourceDialect.class);
+ SourceConfig config = Mockito.mock(SourceConfig.class);
+ SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class);
+
+ IncrementalSourceSplitReader reader =
+ new IncrementalSourceSplitReader(0, dialect, config, resolver) {
+ @Override
+ protected void checkSplitOrStartNext() {}
+ };
+
+ @SuppressWarnings("unchecked")
+ Fetcher fetcher = Mockito.mock(Fetcher.class);
+
+ setField(reader, "currentFetcher", fetcher);
+ setField(reader, "currentSplitId", "split-1");
+ setField(reader, "emittedFinishedSplitId", "split-1");
+
+ reader.close();
+
+ Assertions.assertNull(getField(reader, "currentSplitId"));
+ Assertions.assertNull(getField(reader, "emittedFinishedSplitId"));
+ Mockito.verify(fetcher, Mockito.times(1)).close();
+ }
+
+ private static void setField(
+ IncrementalSourceSplitReader> reader, String fieldName, Object value)
+ throws Exception {
+ Field field = IncrementalSourceSplitReader.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(reader, value);
+ }
+
+ private static Object getField(IncrementalSourceSplitReader> reader, String fieldName)
+ throws Exception {
+ Field field = IncrementalSourceSplitReader.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(reader);
+ }
+}