diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java index 53f97362734f..d6db68e8cc61 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java @@ -37,10 +37,22 @@ import java.io.IOException; import java.util.ArrayDeque; +import java.util.Collections; import java.util.Iterator; import java.util.Queue; +import java.util.Set; @Slf4j +/** + * Split reader for incremental source (snapshot + incremental phase). + * + *

Thread safety: This class is NOT thread-safe and is expected to be used from a single + * thread. The {@link #fetch()} method should be called sequentially without concurrent access. The + * {@link #close()} method should be called from the same thread or after all fetch calls have + * completed. + * + * @param The type of source configuration. + */ public class IncrementalSourceSplitReader implements SplitReader { private final Queue splits; @@ -49,6 +61,7 @@ public class IncrementalSourceSplitReader private Fetcher currentFetcher; private String currentSplitId; + private String emittedFinishedSplitId; private final DataSourceDialect dataSourceDialect; private final C sourceConfig; private final SchemaChangeResolver schemaChangeResolver; @@ -70,6 +83,9 @@ public RecordsWithSplitIds fetch() throws IOException { checkSplitOrStartNext(); checkNeedStopBinlogReader(); + if (hasEmittedCurrentSplitFinished()) { + return NoSplitRecords.INSTANCE; + } Iterator dataIt = null; try { dataIt = currentFetcher.pollSplitRecords(); @@ -77,9 +93,27 @@ public RecordsWithSplitIds fetch() throws IOException { log.warn("fetch data failed.", e); throw new IOException(e); } - return dataIt == null - ? finishedSnapshotSplit() - : ChangeEventRecords.forRecords(currentSplitId, dataIt); + if (dataIt == null) { + return finishedSnapshotSplit(); + } + if (currentSplitId == null) { + log.warn( + "Invalid state: currentSplitId is null when emitting records. " + + "emittedFinishedSplitId={}, currentFetcher={}, isFinished={}", + emittedFinishedSplitId, + currentFetcher != null ? currentFetcher.getClass().getSimpleName() : "null", + currentFetcher != null && currentFetcher.isFinished()); + throw new IOException( + String.format( + "Invalid state: currentSplitId is null when emitting records. " + + "emittedFinishedSplitId=%s, currentFetcher=%s, isFinished=%s", + emittedFinishedSplitId, + currentFetcher != null + ? currentFetcher.getClass().getSimpleName() + : "null", + currentFetcher != null && currentFetcher.isFinished())); + } + return ChangeEventRecords.forRecords(currentSplitId, dataIt); } @Override @@ -100,10 +134,14 @@ public void wakeUp() {} @Override public void close() throws Exception { - if (currentFetcher != null) { - log.info("Close current fetcher {}", currentFetcher.getClass().getCanonicalName()); - currentFetcher.close(); + try { + if (currentFetcher != null) { + log.info("Close current fetcher {}", currentFetcher.getClass().getCanonicalName()); + currentFetcher.close(); + } + } finally { currentSplitId = null; + emittedFinishedSplitId = null; } } @@ -123,6 +161,7 @@ protected void checkSplitOrStartNext() throws IOException { throw new IOException("Cannot fetch from another split - no split remaining."); } currentSplitId = nextSplit.splitId(); + emittedFinishedSplitId = null; if (nextSplit.isSnapshotSplit()) { if (currentFetcher == null) { @@ -152,10 +191,52 @@ public boolean canAssignNextSplit() { return currentFetcher == null || currentFetcher.isFinished(); } - private ChangeEventRecords finishedSnapshotSplit() { - final ChangeEventRecords finishedRecords = - ChangeEventRecords.forFinishedSplit(currentSplitId); - currentSplitId = null; - return finishedRecords; + private boolean hasEmittedCurrentSplitFinished() { + return currentSplitId != null && currentSplitId.equals(emittedFinishedSplitId); + } + + private RecordsWithSplitIds finishedSnapshotSplit() throws IOException { + final String splitId = currentSplitId; + if (splitId == null) { + log.warn( + "Invalid state: currentSplitId is null when finishing snapshot split. " + + "emittedFinishedSplitId={}, currentFetcher={}, isFinished={}", + emittedFinishedSplitId, + currentFetcher != null ? currentFetcher.getClass().getSimpleName() : "null", + currentFetcher != null && currentFetcher.isFinished()); + throw new IOException( + String.format( + "Invalid state: currentSplitId is null when finishing snapshot split. " + + "emittedFinishedSplitId=%s, currentFetcher=%s, isFinished=%s", + emittedFinishedSplitId, + currentFetcher != null + ? currentFetcher.getClass().getSimpleName() + : "null", + currentFetcher != null && currentFetcher.isFinished())); + } + if (splitId.equals(emittedFinishedSplitId)) { + return NoSplitRecords.INSTANCE; + } + emittedFinishedSplitId = splitId; + return ChangeEventRecords.forFinishedSplit(splitId); + } + + private static final class NoSplitRecords implements RecordsWithSplitIds { + private static final NoSplitRecords INSTANCE = new NoSplitRecords(); + + @Override + public String nextSplit() { + return null; + } + + @Override + public SourceRecords nextRecordFromSplit() { + throw new IllegalStateException("No split assigned"); + } + + @Override + public Set finishedSplits() { + return Collections.emptySet(); + } } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java index 44c80a5ec5ed..cd81883d11ff 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/ChangeEventRecords.java @@ -74,7 +74,17 @@ public static ChangeEventRecords forRecords( return new ChangeEventRecords(splitId, recordsForSplit, Collections.emptySet()); } + /** + * Creates a {@link ChangeEventRecords} that only indicates a split is finished. + * + * @param splitId the ID of the finished split, must not be null + * @return a new {@link ChangeEventRecords} instance + * @throws IllegalArgumentException if splitId is null + */ public static ChangeEventRecords forFinishedSplit(final String splitId) { + if (splitId == null) { + throw new IllegalArgumentException("splitId must not be null"); + } return new ChangeEventRecords(null, null, Collections.singleton(splitId)); } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java new file mode 100644 index 000000000000..11b452fd3a3c --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReaderTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.cdc.base.source.reader; + +import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; +import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect; +import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver; +import org.apache.seatunnel.connectors.cdc.base.source.reader.external.Fetcher; +import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords; +import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; +import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Collections; + +class IncrementalSourceSplitReaderTest { + + @Test + void testFetchFinishedSnapshotSplitEmitsFinishedOnlyOnce() throws Exception { + DataSourceDialect dialect = Mockito.mock(DataSourceDialect.class); + SourceConfig config = Mockito.mock(SourceConfig.class); + SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class); + + IncrementalSourceSplitReader reader = + new IncrementalSourceSplitReader(0, dialect, config, resolver) { + @Override + protected void checkSplitOrStartNext() {} + }; + + @SuppressWarnings("unchecked") + Fetcher fetcher = Mockito.mock(Fetcher.class); + Mockito.when(fetcher.pollSplitRecords()).thenReturn(null); + + setField(reader, "currentFetcher", fetcher); + setField(reader, "currentSplitId", "split-1"); + + RecordsWithSplitIds first = reader.fetch(); + RecordsWithSplitIds second = reader.fetch(); + + Assertions.assertEquals(Collections.singleton("split-1"), first.finishedSplits()); + Assertions.assertFalse(first.finishedSplits().contains(null)); + Assertions.assertEquals(Collections.emptySet(), second.finishedSplits()); + Assertions.assertFalse(second.finishedSplits().contains(null)); + Mockito.verify(fetcher, Mockito.times(1)).pollSplitRecords(); + } + + @Test + void testFetchFinishedSnapshotSplitFailFastWhenCurrentSplitIdIsNull() throws Exception { + DataSourceDialect dialect = Mockito.mock(DataSourceDialect.class); + SourceConfig config = Mockito.mock(SourceConfig.class); + SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class); + + IncrementalSourceSplitReader reader = + new IncrementalSourceSplitReader(0, dialect, config, resolver) { + @Override + protected void checkSplitOrStartNext() {} + }; + + @SuppressWarnings("unchecked") + Fetcher fetcher = Mockito.mock(Fetcher.class); + Mockito.when(fetcher.pollSplitRecords()).thenReturn(null); + + setField(reader, "currentFetcher", fetcher); + setField(reader, "currentSplitId", null); + + Assertions.assertThrows(IOException.class, reader::fetch); + } + + @Test + void testFetchFinishedSnapshotSplitSupportsNextSplitAfterIdChanges() throws Exception { + DataSourceDialect dialect = Mockito.mock(DataSourceDialect.class); + SourceConfig config = Mockito.mock(SourceConfig.class); + SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class); + + IncrementalSourceSplitReader reader = + new IncrementalSourceSplitReader(0, dialect, config, resolver) { + @Override + protected void checkSplitOrStartNext() {} + }; + + @SuppressWarnings("unchecked") + Fetcher fetcher = Mockito.mock(Fetcher.class); + Mockito.when(fetcher.pollSplitRecords()).thenReturn(null); + + setField(reader, "currentFetcher", fetcher); + setField(reader, "currentSplitId", "split-1"); + + RecordsWithSplitIds first = reader.fetch(); + RecordsWithSplitIds idle = reader.fetch(); + + setField(reader, "currentSplitId", "split-2"); + RecordsWithSplitIds second = reader.fetch(); + + Assertions.assertEquals(Collections.singleton("split-1"), first.finishedSplits()); + Assertions.assertEquals(Collections.emptySet(), idle.finishedSplits()); + Assertions.assertEquals(Collections.singleton("split-2"), second.finishedSplits()); + Mockito.verify(fetcher, Mockito.times(2)).pollSplitRecords(); + } + + @Test + void testCloseClearsState() throws Exception { + DataSourceDialect dialect = Mockito.mock(DataSourceDialect.class); + SourceConfig config = Mockito.mock(SourceConfig.class); + SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class); + + IncrementalSourceSplitReader reader = + new IncrementalSourceSplitReader(0, dialect, config, resolver) { + @Override + protected void checkSplitOrStartNext() {} + }; + + @SuppressWarnings("unchecked") + Fetcher fetcher = Mockito.mock(Fetcher.class); + + setField(reader, "currentFetcher", fetcher); + setField(reader, "currentSplitId", "split-1"); + setField(reader, "emittedFinishedSplitId", "split-1"); + + reader.close(); + + Assertions.assertNull(getField(reader, "currentSplitId")); + Assertions.assertNull(getField(reader, "emittedFinishedSplitId")); + Mockito.verify(fetcher, Mockito.times(1)).close(); + } + + private static void setField( + IncrementalSourceSplitReader reader, String fieldName, Object value) + throws Exception { + Field field = IncrementalSourceSplitReader.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(reader, value); + } + + private static Object getField(IncrementalSourceSplitReader reader, String fieldName) + throws Exception { + Field field = IncrementalSourceSplitReader.class.getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(reader); + } +}