Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
5581192
Add RangeIndex cluster migration with sketch protection
May 5, 2026
4601a22
Log full exception
Jun 15, 2026
4d20264
Record OnError on RangeIndex migration delete failure
Jun 15, 2026
6537386
Log full exception detail in RangeIndex delete-failure activity error
Jun 15, 2026
9127583
Update
Jun 15, 2026
576f6b9
Nest DisposeResult in CooperativeDisposeGuard and add unit tests
Jun 15, 2026
446a2fc
Apply dotnet format (trailing newline fixes)
Jun 15, 2026
6acad11
Collapse RangeIndexManager init throw statements to single line
Jun 15, 2026
10ea719
Nest PublishMigratedIndexResult in RangeIndexManager
Jun 15, 2026
1f2226c
Add component test for RangeIndex receive-state dispose race
Jun 15, 2026
9041e01
Extract RangeIndex receive-state cleanup into DisposeInternal
Jun 15, 2026
917729e
Simplify riLogRoot creation failure to a plain Exception
Jun 15, 2026
a000622
Remove test-seam comment in RangeIndex receive ProcessRecord
Jun 15, 2026
7fa5ec8
Record migrated temp file path in RangeIndex receive activity
Jun 15, 2026
836df3f
Extract RangeIndex receive error handling into HandleError helper
Jun 15, 2026
3b6f68b
Log split key-length header and invalid key length in RI deserializer
Jun 15, 2026
658b687
Log invalid file size error path in RI deserializer
Jun 15, 2026
dab5186
Restore riLogRoot creation throw to match main (drop unnecessary PR d…
Jun 15, 2026
47557b3
Add empty-chunk no-op tests for KeyHeader and FileHeader states
Jun 15, 2026
a7cf5f1
Transition RI deserializer to Error on file I/O failure
Jun 15, 2026
6762af0
Make RI deserializer Dispose tolerant of CloseStream flush failure
Jun 15, 2026
f60978e
Wrap migration temp dir creation in try/catch with explicit throw
Jun 15, 2026
65648e2
Add empty-chunk no-op guard to ReceivingFileData state
Jun 15, 2026
705d1bb
Require exact stub length in RI trailer (reject over-long trailers)
Jun 15, 2026
41822e5
Make RI deserializer CloseStream null the stream even if Dispose throws
Jun 15, 2026
1c64625
Log swallowed CloseStream failure during RI deserializer dispose
Jun 15, 2026
5e998eb
Log swallowed temp-file delete failure during RI deserializer dispose
Jun 15, 2026
5dc72a7
Add blank line before goto case in RI deserializer file-open path
Jun 15, 2026
b30c544
Restore return true after ReceivingFileData try/catch
Jun 15, 2026
63ca950
Add tests for split file header and too-small trailer error paths
Jun 15, 2026
16acfb2
Ensure RangeIndexMigrationReader temp cleanup runs even if stream Dis…
Jun 15, 2026
7519b40
Add reader/serializer parity round-trip tests for RangeIndexMigration…
Jun 15, 2026
40b867a
Document ReadNextChunkAsync completion protocol via IsComplete
Jun 15, 2026
4801115
Validate minimum chunk size in RangeIndexMigrationReader constructor
Jun 15, 2026
c0be907
Comment the written==0 (buffer too small) break in ReadNextChunkAsync
Jun 15, 2026
c13317a
Move MinChunkSize to RangeIndexChunkedSerializer (wire-format owner)
Jun 15, 2026
e4b730a
Add round-trip test at exactly MinChunkSize boundary
Jun 15, 2026
8e9043a
Validate destination (not chunkSize) against MinChunkSize in reader
Jun 15, 2026
2bf76fb
Update ReadNextChunkAsync docs/comment for destination validation
Jun 15, 2026
a57eb7d
Remove redundant PublishMigratedIndex remarks block
Jun 15, 2026
d7b88a2
Update migration docs to match current RangeIndex implementation
Jun 15, 2026
efcf7cd
Trim KeyExists doc to a short inline note
Jun 15, 2026
c53fb4e
Add post-migration RangeIndex lifecycle cluster tests
Jun 16, 2026
effd36a
Harden migration publish: only Created is success; log + TODO race notes
Jun 16, 2026
c6d31f8
Clarify migration race comments; split RMW-update branch from no-reco…
Jun 16, 2026
e1acbd0
Remove stale checkpoint-CPR TODO comment in SnapshotForMigration
Jun 16, 2026
8f39f1e
Move process-crashing migration tests to crash/recovery PR
Jun 16, 2026
efffad9
Merge branch 'main' into tiagonapoli/bftree-migration
tiagonapoli Jun 16, 2026
7ce8a3e
Fix flaky DisposeDuringProcessRecord test: wait on temp file, not chu…
Jun 16, 2026
945b2e2
CI: collect crash + hang dumps for cluster tests
Jun 17, 2026
c8e1832
Merge branch 'main' into tiagonapoli/bftree-migration
tiagonapoli Jun 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ jobs:
os: [ ubuntu-latest, windows-latest ]
framework: [ 'net8.0' , 'net10.0']
configuration: [ 'Debug', 'Release' ]
test: [ 'Garnet.test.cluster', 'Garnet.test.cluster.migrate', 'Garnet.test.cluster.replication', 'Garnet.test.cluster.replication.tls', 'Garnet.test.cluster.replication.disklesssync', 'Garnet.test.cluster.replication.rangeindex', 'Garnet.test.cluster.vectorsets', 'Garnet.test.cluster.multilog' ]
test: [ 'Garnet.test.cluster', 'Garnet.test.cluster.migrate', 'Garnet.test.cluster.migrate.rangeindex', 'Garnet.test.cluster.replication', 'Garnet.test.cluster.replication.tls', 'Garnet.test.cluster.replication.disklesssync', 'Garnet.test.cluster.replication.rangeindex', 'Garnet.test.cluster.vectorsets', 'Garnet.test.cluster.multilog' ]
if: needs.changes.outputs.garnet == 'true'
steps:
- name: Check out code
Expand All @@ -175,7 +175,7 @@ jobs:
- name: Install dependencies
run: dotnet restore
- name: Run tests ${{ matrix.test }}
run: dotnet test test/cluster/${{ matrix.test }} -f ${{ matrix.framework }} --configuration ${{ matrix.configuration }} --logger "console;verbosity=detailed" --logger trx --results-directory "GarnetTestResults-${{ matrix.os }}-${{ matrix.framework }}-${{ matrix.configuration }}-${{ matrix.test }}" -- NUnit.DisplayName=FullName
run: dotnet test test/cluster/${{ matrix.test }} -f ${{ matrix.framework }} --configuration ${{ matrix.configuration }} --logger "console;verbosity=detailed" --logger trx --blame-crash --blame-crash-dump-type full --blame-hang-timeout 30m --blame-hang-dump-type full --results-directory "GarnetTestResults-${{ matrix.os }}-${{ matrix.framework }}-${{ matrix.configuration }}-${{ matrix.test }}" -- NUnit.DisplayName=FullName
timeout-minutes: 45
- name: Upload test results
uses: actions/upload-artifact@v7
Expand Down
1 change: 1 addition & 0 deletions Garnet.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
<Project Path="test/standalone/Garnet.test.vectorset/Garnet.test.vectorset.csproj" />
<Project Path="test/cluster/Garnet.test.cluster/Garnet.test.cluster.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.migrate/Garnet.test.cluster.migrate.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.migrate.rangeindex/Garnet.test.cluster.migrate.rangeindex.csproj" />
Comment thread
tiagonapoli marked this conversation as resolved.
<Project Path="test/cluster/Garnet.test.cluster.multilog/Garnet.test.cluster.multilog.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.multilog.diskless/Garnet.test.cluster.multilog.diskless.csproj" />
<Project Path="test/cluster/Garnet.test.cluster.replication/Garnet.test.cluster.replication.csproj" />
Expand Down
6 changes: 6 additions & 0 deletions libs/client/ClientSession/GarnetClientSessionIncremental.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public enum MigrationRecordSpanType : byte
/// Bespoke encoding for Vector Set indexes.
/// </summary>
VectorSetIndex = 3,

/// <summary>
/// Chunked serialization stream for a RangeIndex key during migration.
/// The receiver uses a state machine to track the in-progress stream.
/// </summary>
SerializedRangeIndexStream = 4,
}

public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageConsumer
Expand Down
45 changes: 25 additions & 20 deletions libs/cluster/Server/Migration/MigrateOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using Garnet.client;
using Garnet.server;
Expand All @@ -22,15 +23,20 @@ internal sealed partial class MigrateOperation
public StoreScan storeScan;

private readonly ConcurrentDictionary<byte[], byte[]> vectorSetsIndexKeysToMigrate;
private readonly ConcurrentDictionary<byte[], byte> rangeIndexKeysToMigrate;

readonly MigrateSession session;
readonly GarnetClientSession gcs;
readonly LocalServerSession localServerSession;

public GarnetClientSession Client => gcs;

public LocalServerSession LocalSession => localServerSession;

public IEnumerable<KeyValuePair<byte[], byte[]>> VectorSets => vectorSetsIndexKeysToMigrate;

public IEnumerable<byte[]> RangeIndexKeys => rangeIndexKeysToMigrate.Keys;

public void ThrowIfCancelled() => session._cts.Token.ThrowIfCancellationRequested();

public bool Contains(int slot) => session._sslots.Contains(slot);
Expand All @@ -46,6 +52,8 @@ public bool ContainsNamespace(ReadOnlySpan<byte> namespaceBytes)
public void EncounteredVectorSet(byte[] key, byte[] value)
=> vectorSetsIndexKeysToMigrate.TryAdd(key, value);

public void AddRangeIndexKey(byte[] key) => rangeIndexKeysToMigrate.TryAdd(key, 0);

public MigrateOperation(MigrateSession session, Sketch sketch = null, int batchSize = 1 << 18)
{
this.session = session;
Expand All @@ -55,6 +63,7 @@ public MigrateOperation(MigrateSession session, Sketch sketch = null, int batchS
storeScan = new StoreScan(this);
keysToDelete = [];
vectorSetsIndexKeysToMigrate = new(ByteArrayComparer.Instance);
rangeIndexKeysToMigrate = new(ByteArrayComparer.Instance);
}

public async ValueTask<bool> InitializeAsync()
Expand Down Expand Up @@ -127,18 +136,14 @@ public async Task<bool> TransmitSlotsAsync()
return true;
}

public async Task<bool> TransmitKeysAsync(Dictionary<byte[], byte[]> vectorSetKeysToIgnore)
public async Task<bool> TransmitKeysAsync(Func<PinnedSpanByte, bool> shouldSkipKey)
{
// Use this for both stores; main store will just use the SpanByteAndMemory directly. We want it to be outside iterations
// so we can reuse the SpanByteAndMemory.Memory across iterations.
// TODO: initialize 'output' based on gcs curr and end; make sure it has the initial part of the "send" set, and call gcs.IncrementRecordDirect().
// This will still allow SBAM.Memory to be reused.
var output = new UnifiedOutput();

#if NET9_0_OR_GREATER
var ignoreLookup = vectorSetKeysToIgnore.GetAlternateLookup<ReadOnlySpan<byte>>();
#endif

try
{
var keys = sketch.Keys;
Expand All @@ -152,21 +157,9 @@ public async Task<bool> TransmitKeysAsync(Dictionary<byte[], byte[]> vectorSetKe
if (keys[i].Item2)
continue;

var spanByte = keys[i].Item1;

// Don't transmit if a Vector Set
var isVectorSet =
vectorSetKeysToIgnore.Count > 0 &&
#if NET9_0_OR_GREATER
ignoreLookup.ContainsKey(spanByte.ReadOnlySpan);
#else
vectorSetKeysToIgnore.ContainsKey(spanByte.ToArray());
#endif

if (isVectorSet)
{
// Skip keys that require special handling
if (shouldSkipKey(keys[i].Item1))
continue;
}

if (!await session.WriteOrSendRecordAsync(gcs, localServerSession, keys[i].Item1, ref input, ref output, out var status).ConfigureAwait(false))
return false;
Expand Down Expand Up @@ -283,7 +276,19 @@ public void DeleteVectorSet(PinnedSpanByte key)

var delRes = localServerSession.BasicGarnetApi.DELETE(key);

session.logger?.LogDebug("Deleting Vector Set {key} after migration: {delRes}", System.Text.Encoding.UTF8.GetString(key), delRes);
session.logger?.LogDebug("Deleting Vector Set {key} after migration: {delRes}", Encoding.UTF8.GetString(key), delRes);
}

/// <summary>
/// Delete a RangeIndex after migration. COPY option is not yet supported for RangeIndex keys.
/// </summary>
public void DeleteRangeIndex(PinnedSpanByte key)
{
if (session._copyOption)
return;

var delRes = localServerSession.BasicGarnetApi.DELETE(key);
session.logger?.LogDebug("Deleted RangeIndex key {key} after migration: {delRes}", Encoding.UTF8.GetString(key), delRes);
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion libs/cluster/Server/Migration/MigrateScanFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ public bool Reader<TSourceLogRecord>(in TSourceLogRecord srcLogRecord, RecordMet
// Check if key belongs to slot that is being migrated and if it can be added to our buffer
if (migrateOperation.Contains(slot))
{
if (srcLogRecord.RecordType == VectorManager.RecordType)
if (srcLogRecord.RecordType == RangeIndexManager.RangeIndexRecordType)
{
// RangeIndex keys need out-of-band migration (snapshot + chunks)
migrateOperation.AddRangeIndexKey(key.ToArray());
}
else if (srcLogRecord.RecordType == VectorManager.RecordType)
{
// We can't delete the vector set _yet_ nor can we migrate it,
// we just need to remember it to migrate once the associated namespaces are all moved over
Expand Down
168 changes: 168 additions & 0 deletions libs/cluster/Server/Migration/MigrateSession.RangeIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Garnet.client;
using Garnet.common;
using Garnet.server;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Garnet.cluster
{
/// <summary>
/// RangeIndex migration support: source-side transmit driver.
/// </summary>
internal sealed partial class MigrateSession : IDisposable
{
/// <summary>
/// Transmit a single RangeIndex key to the destination node.
/// Uses <see cref="RangeIndexManager.SnapshotRangeIndexAndCreateReader"/> to obtain an async
/// migration reader that snapshots and streams the BfTree data.
/// </summary>
private async Task<bool> TransmitRangeIndexAsync(MigrateOperation migrateOperation, byte[] keyBytes, int chunkSize, CancellationToken cancellationToken)
{
if (!clusterProvider.serverOptions.EnableRangeIndexPreview)
{
logger?.LogError("TransmitRangeIndexAsync: RangeIndex feature is not enabled, skipping key {key}", Encoding.UTF8.GetString(keyBytes));
return false;
}

var rangeIndexManager = clusterProvider.storeWrapper.DefaultDatabase.RangeIndexManager;

var sessionClient = migrateOperation.Client;
var buffer = ArrayPool<byte>.Shared.Rent(chunkSize);
var transmitActivity = RangeIndexMigrationActivities.TransmitActivity.StartActivity();
try
{
using var reader = rangeIndexManager.SnapshotRangeIndexAndCreateReader(migrateOperation.LocalSession, keyBytes, chunkSize);
transmitActivity.OnSnapshotCompleted(reader.TotalFileBytes);

while (!reader.IsComplete)
{
cancellationToken.ThrowIfCancellationRequested();

var payloadLen = await reader.ReadNextChunkAsync(buffer, cancellationToken).ConfigureAwait(false);
if (payloadLen == 0)
{
transmitActivity.OnError("Zero-length payload from reader");
logger?.LogError("TransmitRangeIndexAsync: reader returned zero-length payload with a {Size}-byte buffer for key {key}", chunkSize, Encoding.UTF8.GetString(keyBytes));
return false;
}

if (!await WriteOrSendRecordSpanAsync(sessionClient, MigrationRecordSpanType.SerializedRangeIndexStream, buffer.AsSpan(0, payloadLen)).ConfigureAwait(false))
{
transmitActivity.OnError("Failed to write chunk");
logger?.LogError("TransmitRangeIndexAsync: failed to write chunk for key {key}", Encoding.UTF8.GetString(keyBytes));
return false;
}

transmitActivity.OnChunkSent(payloadLen);
}

// Force flush and await ACK
if (!await HandleMigrateTaskResponseAsync(sessionClient.SendAndResetIterationBuffer()).ConfigureAwait(false))
{
transmitActivity.OnError("Flush failed");
logger?.LogError("TransmitRangeIndexAsync: flush failed for key {key}", Encoding.UTF8.GetString(keyBytes));
return false;
}

return true;
}
catch (Exception ex)
{
transmitActivity.OnError(ex.ToString());
logger?.LogError(ex, "TransmitRangeIndexAsync: error during snapshot or transmission for key {key}", Encoding.UTF8.GetString(keyBytes));
return false;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
transmitActivity.EndAndLogActivity(logger, keyBytes);
}
}

/// <summary>
/// Migrate a batch of RangeIndex keys with sketch protection.
/// Adds all keys to the sketch, transitions through TRANSMITTING → DELETING → MIGRATED
/// with epoch barriers, ensuring concurrent operations are properly gated.
/// </summary>
private async Task<bool> MigrateRangeIndexKeysAsync(MigrateOperation migrateOperation, HashSet<byte[]> rangeIndexKeys, CancellationToken cancellationToken)
{
var migrateActivity = RangeIndexMigrationActivities.MigrateActivity.StartActivity(rangeIndexKeys.Count);

try
{
logger?.LogWarning("MigrateRangeIndexKeysAsync: migrating {count} RangeIndex keys", rangeIndexKeys.Count);

// Add all RI keys to sketch during INITIALIZING (no gating yet)
migrateOperation.sketch.Clear();
migrateOperation.sketch.SetStatus(SketchStatus.INITIALIZING);
foreach (var key in rangeIndexKeys)
migrateOperation.sketch.TryHashAndStore(key);

ExceptionInjectionHelper.TriggerException(ExceptionInjectionType.RangeIndex_Migration_Before_Transmitting);

// Block writes during snapshot + transmit
migrateOperation.sketch.SetStatus(SketchStatus.TRANSMITTING);
await WaitForConfigPropagationAsync().ConfigureAwait(false);
migrateActivity.OnTransmitting();

await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.RangeIndex_Migration_After_Transmitting).ConfigureAwait(false);

foreach (var key in rangeIndexKeys)
{
cancellationToken.ThrowIfCancellationRequested();

if (!await TransmitRangeIndexAsync(migrateOperation, key, RangeIndexManager.DefaultMigrationChunkSize, cancellationToken).ConfigureAwait(false))
{
migrateActivity.OnError($"failed to transmit key {Encoding.UTF8.GetString(key)}");
logger?.LogError("MigrateRangeIndexKeysAsync: failed to migrate RangeIndex key {key}", Encoding.UTF8.GetString(key));
return false;
}
}

ExceptionInjectionHelper.TriggerException(ExceptionInjectionType.RangeIndex_Migration_Before_Deleting);

// Block reads + writes during delete
migrateOperation.sketch.SetStatus(SketchStatus.DELETING);
await WaitForConfigPropagationAsync().ConfigureAwait(false);
migrateActivity.OnDeleting();

await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.RangeIndex_Migration_After_Deleting).ConfigureAwait(false);

foreach (var key in rangeIndexKeys)
{
try
{
unsafe
{
fixed (byte* keyPtr = key)
migrateOperation.DeleteRangeIndex(PinnedSpanByte.FromPinnedPointer(keyPtr, key.Length));
}
}
catch (Exception ex)
{
migrateActivity.OnError($"failed to delete key {Encoding.UTF8.GetString(key)}: {ex}");
logger?.LogError(ex, "MigrateRangeIndexKeysAsync: failed to delete RangeIndex key {key} after migration", Encoding.UTF8.GetString(key));
throw;
Comment thread
vazois marked this conversation as resolved.
}
}

return true;
}
finally
Comment thread
tiagonapoli marked this conversation as resolved.
{
// Always clean up the sketch, even on failure, to unblock client operations
migrateOperation.sketch.Clear();
migrateActivity.EndAndLogActivity(logger);
}
}
}
}
Loading
Loading