[Fix][connector-cdc] Fix NPE when finishing snapshot split due to null splitId#10404
[Fix][connector-cdc] Fix NPE when finishing snapshot split due to null splitId#10404Carl-Zhou-CN merged 2 commits intoapache:devfrom
Conversation
Issue 1: close() method does not clean up emittedFinishedSplitIdLocation: @Override
public void close() throws Exception {
if (currentFetcher != null) {
log.info("Close current fetcher {}", currentFetcher.getClass().getCanonicalName());
currentFetcher.close();
currentSplitId = null; // Cleared currentSplitId
}
// ⚠️ Missing: emittedFinishedSplitId = null;
}Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestion: @Override
public void close() throws Exception {
if (currentFetcher != null) {
log.info("Close current fetcher {}", currentFetcher.getClass().getCanonicalName());
currentFetcher.close();
currentSplitId = null;
emittedFinishedSplitId = null; // Add this line
}
}Rationale:
Issue 2: Concurrency safety not explicitly documentedLocation: @Slf4j
public class IncrementalSourceSplitReader<C extends SourceConfig>
implements SplitReader<SourceRecords, SourceSplitBase> {
// Field declaration (no volatile, no synchronized)
private String currentSplitId;
private String emittedFinishedSplitId;
// ...
}Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestion: /**
* Split reader for incremental source (snapshot + incremental phase).
*
* <p><b>Thread safety:</b> This class is NOT thread-safe and should only be called
* from a single thread. The {@link #fetch()} method is expected to be called
* sequentially without concurrent access. The {@link #close()} method should also
* be called from the same thread or after all fetch calls have completed.
*
* @param <C> The type of source configuration.
*/
@Slf4j
public class IncrementalSourceSplitReader<C extends SourceConfig>
implements SplitReader<SourceRecords, SourceSplitBase> {
// ...
}Option 2 (Safer): Use private volatile String currentSplitId;
private volatile String emittedFinishedSplitId;Rationale:
Issue 3: Exception message lacks context informationLocation: if (splitId == null) {
throw new IOException("currentSplitId is null when finishing snapshot split");
}Related Context:
Problem Description:
Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestion: if (splitId == null) {
log.warn(
"Invalid state: currentSplitId is null when finishing snapshot split. "
+ "emittedFinishedSplitId={}, currentFetcher={}, isFinished={}",
emittedFinishedSplitId,
currentFetcher != null ? currentFetcher.getClass().getSimpleName() : "null",
currentFetcher != null && currentFetcher.isFinished());
throw new IOException(
String.format(
"currentSplitId is null when finishing snapshot split. "
+ "emittedFinishedSplitId=%s, currentFetcher=%s, isFinished=%s",
emittedFinishedSplitId,
currentFetcher != null ? currentFetcher.getClass().getSimpleName() : "null",
currentFetcher != null && currentFetcher.isFinished()));
}Rationale:
Issue 4: Missing unit tests for concurrent scenariosLocation: Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestion: @Test
void testFetchConcurrentWithCloseShouldNotThrowNPE() throws Exception {
DataSourceDialect<SourceConfig> dialect = Mockito.mock(DataSourceDialect.class);
SourceConfig config = Mockito.mock(SourceConfig.class);
SchemaChangeResolver resolver = Mockito.mock(SchemaChangeResolver.class);
IncrementalSourceSplitReader<SourceConfig> reader =
new IncrementalSourceSplitReader<SourceConfig>(0, dialect, config, resolver) {
@Override
protected void checkSplitOrStartNext() {}
};
@SuppressWarnings("unchecked")
Fetcher<SourceRecords, SourceSplitBase> fetcher = Mockito.mock(Fetcher.class);
Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);
Mockito.when(fetcher.isFinished()).thenReturn(true);
setField(reader, "currentFetcher", fetcher);
setField(reader, "currentSplitId", "split-1");
// Simulate concurrent close and fetch
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<?> closeFuture = executor.submit(() -> {
try {
reader.close();
} catch (Exception e) {
// Expected
}
});
Future<RecordsWithSplitIds<SourceRecords>> fetchFuture =
executor.submit(() -> reader.fetch());
// Should not throw NPE (may throw other exceptions due to closed state)
Assertions.assertDoesNotThrow(() -> {
try {
fetchFuture.get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Timeout is acceptable
}
});
executor.shutdownNow();
}Rationale:
Note: This test may need adjustment depending on the actual concurrent call patterns of Issue 5: Exception handling of ChangeEventRecords.forFinishedSplit() not documented in JavaDocLocation: public static ChangeEventRecords forFinishedSplit(final String splitId) {
if (splitId == null) {
throw new IllegalArgumentException("splitId must not be null");
}
return new ChangeEventRecords(null, null, Collections.singleton(splitId));
}Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestion: /**
* Creates a {@link ChangeEventRecords} that only indicates a split is finished.
*
* @param splitId the ID of the finished split, must not be null
* @return a new {@link ChangeEventRecords} instance
* @throws IllegalArgumentException if splitId is null
*/
public static ChangeEventRecords forFinishedSplit(final String splitId) {
if (splitId == null) {
throw new IllegalArgumentException("splitId must not be null");
}
return new ChangeEventRecords(null, null, Collections.singleton(splitId));
}Rationale:
|
Revisions have been made in accordance with the suggestions. |
Purpose of this pull request
This PR fixes issue #10398 where a job can fail with NullPointerException in SourceReaderBase.finishCurrentFetch caused by a null finished splitId propagated from CDC reader.
Root cause is that IncrementalSourceSplitReader may emit “split finished” more than once for the same split, and in some edge states it can construct a finished event with a null splitId. Since SourceReaderBase removes split state by splitId, a null key leads to ConcurrentHashMap.remove(null) NPE.
Does this PR introduce any user-facing change?
Yes.
Previously, some CDC jobs could fail with NullPointerException when finishing a snapshot split (internal null finished splitId). After this change, the finished-split event is emitted exactly once and splitId is guaranteed non-null, so the job no longer crashes with that NPE. If an invalid internal state occurs (currentSplitId == null), it now throws a clearer exception earlier.
How was this patch tested?
Added unit tests: IncrementalSourceSplitReaderTest
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.