Skip to content

Commit 7194e73

Browse files
committed
Fix race condition in SharedTsBlockQueue async listener causing NPE in MemoryPool.free()
When SharedTsBlockQueue.add() encounters memory pressure, it registers an async listener on a MemoryReservationFuture to add the TsBlock later. If the upstream FragmentInstance finishes and calls abort()/close() before the listener executes, the following race occurs: 1. abort() sets closed=true, clears the queue, frees bufferRetainedSizeInBytes 2. deRegisterFragmentInstanceFromMemoryPool removes the upstream FI's memory mapping 3. The async listener fires and adds the TsBlock to the closed queue 4. The downstream consumer calls remove() -> MemoryPool.free() with the upstream FI's IDs, but the mapping no longer exists -> NPE Fix: Check the `closed` flag inside the async listener before adding the TsBlock. When closed, skip the add (memory was already freed by abort/close) and complete channelBlocked to prevent hangs. Also add a unit test that reproduces this race condition by using a manually-controlled SettableFuture to simulate the blocked-on-memory path.
1 parent d9b692b commit 7194e73

File tree

2 files changed

+107
-9
lines changed

2 files changed

+107
-9
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ public TsBlock remove() {
205205
localPlanNodeId,
206206
tsBlock.getSizeInBytes());
207207
bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
208-
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
208+
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the event
209+
// to
209210
// corresponding LocalSinkChannel.
210211
if (sinkChannel != null) {
211212
sinkChannel.checkAndInvokeOnFinished();
@@ -257,6 +258,16 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
257258
blockedOnMemory.addListener(
258259
() -> {
259260
synchronized (this) {
261+
// If the queue has been closed or aborted before this listener executes,
262+
// we must not add the TsBlock. The memory reserved for this TsBlock has
263+
// already been freed by abort()/close() via bufferRetainedSizeInBytes.
264+
// Adding it would cause a downstream NPE in MemoryPool.free() when
265+
// the consumer calls remove(), because the upstream FI's memory mapping
266+
// has already been deregistered.
267+
if (closed) {
268+
channelBlocked.set(null);
269+
return;
270+
}
260271
queue.add(tsBlock);
261272
if (!blocked.isDone()) {
262273
blocked.set(null);
@@ -266,8 +277,10 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
266277
},
267278
// Use directExecutor() here could lead to deadlock. Thread A holds lock of
268279
// SharedTsBlockQueueA and tries to invoke the listener of
269-
// SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture) while
270-
// Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener of
280+
// SharedTsBlockQueueB(when freeing memory to complete MemoryReservationFuture)
281+
// while
282+
// Thread B holds lock of SharedTsBlockQueueB and tries to invoke the listener
283+
// of
271284
// SharedTsBlockQueueA
272285
executorService);
273286
return channelBlocked;
@@ -307,13 +320,18 @@ public void close() {
307320
bufferRetainedSizeInBytes = 0;
308321
}
309322
if (sinkChannel != null) {
310-
// attention: LocalSinkChannel of this SharedTsBlockQueue could be null when we close
311-
// LocalSourceHandle(with limit clause it's possible) before constructing the corresponding
323+
// attention: LocalSinkChannel of this SharedTsBlockQueue could be null when we
324+
// close
325+
// LocalSourceHandle(with limit clause it's possible) before constructing the
326+
// corresponding
312327
// LocalSinkChannel.
313-
// If this close method is invoked by LocalSourceHandle, listener of LocalSourceHandle will
314-
// remove the LocalSourceHandle from the map of MppDataExchangeManager and later when
328+
// If this close method is invoked by LocalSourceHandle, listener of
329+
// LocalSourceHandle will
330+
// remove the LocalSourceHandle from the map of MppDataExchangeManager and later
331+
// when
315332
// LocalSinkChannel is initialized, it will construct a new SharedTsBlockQueue.
316-
// It is still safe that we let the LocalSourceHandle close successfully in this case. Because
333+
// It is still safe that we let the LocalSourceHandle close successfully in this
334+
// case. Because
317335
// the QueryTerminator will do the final cleaning logic.
318336
sinkChannel.close();
319337
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
2626

2727
import com.google.common.util.concurrent.ListenableFuture;
28+
import com.google.common.util.concurrent.SettableFuture;
2829
import org.apache.tsfile.external.commons.lang3.Validate;
30+
import org.apache.tsfile.read.common.block.TsBlock;
31+
import org.apache.tsfile.utils.Pair;
2932
import org.junit.Assert;
3033
import org.junit.Test;
3134
import org.mockito.Mockito;
@@ -37,12 +40,89 @@
3740
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
3841

3942
public class SharedTsBlockQueueTest {
43+
44+
/**
45+
* Test that when add() goes into the async listener path (memory blocked) and the queue is
46+
* aborted before the listener fires, the listener does NOT add the TsBlock to the closed queue.
47+
* This reproduces the race condition that caused NPE in MemoryPool.free().
48+
*/
49+
@Test
50+
public void testAsyncListenerAfterAbortDoesNotAddTsBlock() {
51+
final String queryId = "q0";
52+
final long mockTsBlockSize = 1024L;
53+
final TFragmentInstanceId fragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
54+
final String planNodeId = "test";
55+
56+
// Use a SettableFuture to manually control when the blocked-on-memory future
57+
// completes.
58+
SettableFuture<Void> manualFuture = SettableFuture.create();
59+
60+
// Create a mock MemoryPool that returns the manually-controlled future
61+
// (simulating blocked).
62+
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
63+
MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
64+
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
65+
66+
// reserve() returns (manualFuture, false) — simulating memory blocked
67+
Mockito.when(
68+
mockMemoryPool.reserve(
69+
Mockito.anyString(),
70+
Mockito.anyString(),
71+
Mockito.anyString(),
72+
Mockito.anyLong(),
73+
Mockito.anyLong()))
74+
.thenReturn(new Pair<>(manualFuture, Boolean.FALSE));
75+
// tryCancel returns 0 — simulating future already completed (can't cancel)
76+
Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L);
77+
78+
// Use a direct executor so that when we complete manualFuture, the listener
79+
// runs immediately.
80+
SharedTsBlockQueue queue =
81+
new SharedTsBlockQueue(
82+
fragmentInstanceId, planNodeId, mockLocalMemoryManager, newDirectExecutorService());
83+
queue.getCanAddTsBlock().set(null);
84+
queue.setMaxBytesCanReserve(Long.MAX_VALUE);
85+
86+
TsBlock mockTsBlock = Utils.createMockTsBlock(mockTsBlockSize);
87+
88+
// Step 1: add() goes into async path — listener is registered on manualFuture.
89+
// reserve() returns (manualFuture, false), so the TsBlock is NOT yet added to
90+
// the queue.
91+
ListenableFuture<Void> addFuture;
92+
synchronized (queue) {
93+
addFuture = queue.add(mockTsBlock);
94+
}
95+
// The addFuture (channelBlocked) should not be done yet
96+
Assert.assertFalse(addFuture.isDone());
97+
// Queue should be empty — TsBlock is waiting for memory
98+
Assert.assertTrue(queue.isEmpty());
99+
100+
// Step 2: Abort the queue (simulates upstream FI state change listener calling
101+
// abort)
102+
synchronized (queue) {
103+
queue.abort();
104+
}
105+
Assert.assertTrue(queue.isClosed());
106+
107+
// Step 3: Now complete the manualFuture — this triggers the async listener.
108+
// Before the fix, this would add the TsBlock to the closed queue.
109+
// After the fix, the listener detects closed==true and returns without adding.
110+
manualFuture.set(null);
111+
112+
// Verify: queue should still be empty (TsBlock was NOT added to the closed
113+
// queue)
114+
Assert.assertTrue(queue.isEmpty());
115+
// The channelBlocked future should be completed (no hang)
116+
Assert.assertTrue(addFuture.isDone());
117+
}
118+
40119
@Test(timeout = 15000L)
41120
public void concurrencyTest() {
42121
final String queryId = "q0";
43122
final long mockTsBlockSize = 1024L * 1024L;
44123

45-
// Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per query.
124+
// Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize per
125+
// query.
46126
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
47127
MemoryManager memoryManager = Mockito.spy(new MemoryManager(10 * mockTsBlockSize));
48128
MemoryPool spyMemoryPool =

0 commit comments

Comments
 (0)