Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,22 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;

@Slf4j
/**
* Split reader for incremental source (snapshot + incremental phase).
*
* <p><b>Thread safety:</b> This class is NOT thread-safe and is expected to be used from a single
* thread. The {@link #fetch()} method should be called sequentially without concurrent access. The
* {@link #close()} method should be called from the same thread or after all fetch calls have
* completed.
*
* @param <C> The type of source configuration.
*/
public class IncrementalSourceSplitReader<C extends SourceConfig>
implements SplitReader<SourceRecords, SourceSplitBase> {
private final Queue<SourceSplitBase> splits;
Expand All @@ -49,6 +61,7 @@ public class IncrementalSourceSplitReader<C extends SourceConfig>
private Fetcher<SourceRecords, SourceSplitBase> currentFetcher;

private String currentSplitId;
private String emittedFinishedSplitId;
private final DataSourceDialect<C> dataSourceDialect;
private final C sourceConfig;
private final SchemaChangeResolver schemaChangeResolver;
Expand All @@ -70,16 +83,37 @@ public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {

checkSplitOrStartNext();
checkNeedStopBinlogReader();
if (hasEmittedCurrentSplitFinished()) {
return NoSplitRecords.INSTANCE;
}
Iterator<SourceRecords> dataIt = null;
try {
dataIt = currentFetcher.pollSplitRecords();
} catch (InterruptedException | SeaTunnelException e) {
log.warn("fetch data failed.", e);
throw new IOException(e);
}
return dataIt == null
? finishedSnapshotSplit()
: ChangeEventRecords.forRecords(currentSplitId, dataIt);
if (dataIt == null) {
return finishedSnapshotSplit();
}
if (currentSplitId == null) {
log.warn(
"Invalid state: currentSplitId is null when emitting records. "
+ "emittedFinishedSplitId={}, currentFetcher={}, isFinished={}",
emittedFinishedSplitId,
currentFetcher != null ? currentFetcher.getClass().getSimpleName() : "null",
currentFetcher != null && currentFetcher.isFinished());
throw new IOException(
String.format(
"Invalid state: currentSplitId is null when emitting records. "
+ "emittedFinishedSplitId=%s, currentFetcher=%s, isFinished=%s",
emittedFinishedSplitId,
currentFetcher != null
? currentFetcher.getClass().getSimpleName()
: "null",
currentFetcher != null && currentFetcher.isFinished()));
}
return ChangeEventRecords.forRecords(currentSplitId, dataIt);
}

@Override
Expand All @@ -100,10 +134,14 @@ public void wakeUp() {}

@Override
public void close() throws Exception {
if (currentFetcher != null) {
log.info("Close current fetcher {}", currentFetcher.getClass().getCanonicalName());
currentFetcher.close();
try {
if (currentFetcher != null) {
log.info("Close current fetcher {}", currentFetcher.getClass().getCanonicalName());
currentFetcher.close();
}
} finally {
currentSplitId = null;
emittedFinishedSplitId = null;
}
}

Expand All @@ -123,6 +161,7 @@ protected void checkSplitOrStartNext() throws IOException {
throw new IOException("Cannot fetch from another split - no split remaining.");
}
currentSplitId = nextSplit.splitId();
emittedFinishedSplitId = null;

if (nextSplit.isSnapshotSplit()) {
if (currentFetcher == null) {
Expand Down Expand Up @@ -152,10 +191,52 @@ public boolean canAssignNextSplit() {
return currentFetcher == null || currentFetcher.isFinished();
}

private ChangeEventRecords finishedSnapshotSplit() {
final ChangeEventRecords finishedRecords =
ChangeEventRecords.forFinishedSplit(currentSplitId);
currentSplitId = null;
return finishedRecords;
private boolean hasEmittedCurrentSplitFinished() {
return currentSplitId != null && currentSplitId.equals(emittedFinishedSplitId);
}

private RecordsWithSplitIds<SourceRecords> finishedSnapshotSplit() throws IOException {
final String splitId = currentSplitId;
if (splitId == null) {
log.warn(
"Invalid state: currentSplitId is null when finishing snapshot split. "
+ "emittedFinishedSplitId={}, currentFetcher={}, isFinished={}",
emittedFinishedSplitId,
currentFetcher != null ? currentFetcher.getClass().getSimpleName() : "null",
currentFetcher != null && currentFetcher.isFinished());
throw new IOException(
String.format(
"Invalid state: currentSplitId is null when finishing snapshot split. "
+ "emittedFinishedSplitId=%s, currentFetcher=%s, isFinished=%s",
emittedFinishedSplitId,
currentFetcher != null
? currentFetcher.getClass().getSimpleName()
: "null",
currentFetcher != null && currentFetcher.isFinished()));
}
if (splitId.equals(emittedFinishedSplitId)) {
return NoSplitRecords.INSTANCE;
}
emittedFinishedSplitId = splitId;
return ChangeEventRecords.forFinishedSplit(splitId);
}

private static final class NoSplitRecords implements RecordsWithSplitIds<SourceRecords> {
private static final NoSplitRecords INSTANCE = new NoSplitRecords();

@Override
public String nextSplit() {
return null;
}

@Override
public SourceRecords nextRecordFromSplit() {
throw new IllegalStateException("No split assigned");
}

@Override
public Set<String> finishedSplits() {
return Collections.emptySet();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,17 @@ public static ChangeEventRecords forRecords(
return new ChangeEventRecords(splitId, recordsForSplit, Collections.emptySet());
}

/**
* Creates a {@link ChangeEventRecords} that only indicates a split is finished.
*
* @param splitId the ID of the finished split, must not be null
* @return a new {@link ChangeEventRecords} instance
* @throws IllegalArgumentException if splitId is null
*/
public static ChangeEventRecords forFinishedSplit(final String splitId) {
if (splitId == null) {
throw new IllegalArgumentException("splitId must not be null");
}
return new ChangeEventRecords(null, null, Collections.singleton(splitId));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.source.reader;

import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.Fetcher;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;

class IncrementalSourceSplitReaderTest {

@Test
void testFetchFinishedSnapshotSplitEmitsFinishedOnlyOnce() throws Exception {
DataSourceDialect<SourceConfig> dialect = Mockito.mock(DataSourceDialect.class);
SourceConfig config = Mockito.mock(SourceConfig.class);
SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class);

IncrementalSourceSplitReader<SourceConfig> reader =
new IncrementalSourceSplitReader<SourceConfig>(0, dialect, config, resolver) {
@Override
protected void checkSplitOrStartNext() {}
};

@SuppressWarnings("unchecked")
Fetcher<SourceRecords, SourceSplitBase> fetcher = Mockito.mock(Fetcher.class);
Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);

setField(reader, "currentFetcher", fetcher);
setField(reader, "currentSplitId", "split-1");

RecordsWithSplitIds<SourceRecords> first = reader.fetch();
RecordsWithSplitIds<SourceRecords> second = reader.fetch();

Assertions.assertEquals(Collections.singleton("split-1"), first.finishedSplits());
Assertions.assertFalse(first.finishedSplits().contains(null));
Assertions.assertEquals(Collections.emptySet(), second.finishedSplits());
Assertions.assertFalse(second.finishedSplits().contains(null));
Mockito.verify(fetcher, Mockito.times(1)).pollSplitRecords();
}

@Test
void testFetchFinishedSnapshotSplitFailFastWhenCurrentSplitIdIsNull() throws Exception {
DataSourceDialect<SourceConfig> dialect = Mockito.mock(DataSourceDialect.class);
SourceConfig config = Mockito.mock(SourceConfig.class);
SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class);

IncrementalSourceSplitReader<SourceConfig> reader =
new IncrementalSourceSplitReader<SourceConfig>(0, dialect, config, resolver) {
@Override
protected void checkSplitOrStartNext() {}
};

@SuppressWarnings("unchecked")
Fetcher<SourceRecords, SourceSplitBase> fetcher = Mockito.mock(Fetcher.class);
Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);

setField(reader, "currentFetcher", fetcher);
setField(reader, "currentSplitId", null);

Assertions.assertThrows(IOException.class, reader::fetch);
}

@Test
void testFetchFinishedSnapshotSplitSupportsNextSplitAfterIdChanges() throws Exception {
DataSourceDialect<SourceConfig> dialect = Mockito.mock(DataSourceDialect.class);
SourceConfig config = Mockito.mock(SourceConfig.class);
SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class);

IncrementalSourceSplitReader<SourceConfig> reader =
new IncrementalSourceSplitReader<SourceConfig>(0, dialect, config, resolver) {
@Override
protected void checkSplitOrStartNext() {}
};

@SuppressWarnings("unchecked")
Fetcher<SourceRecords, SourceSplitBase> fetcher = Mockito.mock(Fetcher.class);
Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);

setField(reader, "currentFetcher", fetcher);
setField(reader, "currentSplitId", "split-1");

RecordsWithSplitIds<SourceRecords> first = reader.fetch();
RecordsWithSplitIds<SourceRecords> idle = reader.fetch();

setField(reader, "currentSplitId", "split-2");
RecordsWithSplitIds<SourceRecords> second = reader.fetch();

Assertions.assertEquals(Collections.singleton("split-1"), first.finishedSplits());
Assertions.assertEquals(Collections.emptySet(), idle.finishedSplits());
Assertions.assertEquals(Collections.singleton("split-2"), second.finishedSplits());
Mockito.verify(fetcher, Mockito.times(2)).pollSplitRecords();
}

@Test
void testCloseClearsState() throws Exception {
DataSourceDialect<SourceConfig> dialect = Mockito.mock(DataSourceDialect.class);
SourceConfig config = Mockito.mock(SourceConfig.class);
SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class);

IncrementalSourceSplitReader<SourceConfig> reader =
new IncrementalSourceSplitReader<SourceConfig>(0, dialect, config, resolver) {
@Override
protected void checkSplitOrStartNext() {}
};

@SuppressWarnings("unchecked")
Fetcher<SourceRecords, SourceSplitBase> fetcher = Mockito.mock(Fetcher.class);

setField(reader, "currentFetcher", fetcher);
setField(reader, "currentSplitId", "split-1");
setField(reader, "emittedFinishedSplitId", "split-1");

reader.close();

Assertions.assertNull(getField(reader, "currentSplitId"));
Assertions.assertNull(getField(reader, "emittedFinishedSplitId"));
Mockito.verify(fetcher, Mockito.times(1)).close();
}

private static void setField(
IncrementalSourceSplitReader<?> reader, String fieldName, Object value)
throws Exception {
Field field = IncrementalSourceSplitReader.class.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(reader, value);
}

private static Object getField(IncrementalSourceSplitReader<?> reader, String fieldName)
throws Exception {
Field field = IncrementalSourceSplitReader.class.getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(reader);
}
}