-
Notifications
You must be signed in to change notification settings - Fork 670
[BFTree] Add RangeIndex cluster migration #1731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
51 commits
Select commit
Hold shift + click to select a range
5581192
Add RangeIndex cluster migration with sketch protection
4601a22
Log full exception
4d20264
Record OnError on RangeIndex migration delete failure
6537386
Log full exception detail in RangeIndex delete-failure activity error
9127583
Update
576f6b9
Nest DisposeResult in CooperativeDisposeGuard and add unit tests
446a2fc
Apply dotnet format (trailing newline fixes)
6acad11
Collapse RangeIndexManager init throw statements to single line
10ea719
Nest PublishMigratedIndexResult in RangeIndexManager
1f2226c
Add component test for RangeIndex receive-state dispose race
9041e01
Extract RangeIndex receive-state cleanup into DisposeInternal
917729e
Simplify riLogRoot creation failure to a plain Exception
a000622
Remove test-seam comment in RangeIndex receive ProcessRecord
7fa5ec8
Record migrated temp file path in RangeIndex receive activity
836df3f
Extract RangeIndex receive error handling into HandleError helper
3b6f68b
Log split key-length header and invalid key length in RI deserializer
658b687
Log invalid file size error path in RI deserializer
dab5186
Restore riLogRoot creation throw to match main (drop unnecessary PR d…
47557b3
Add empty-chunk no-op tests for KeyHeader and FileHeader states
a7cf5f1
Transition RI deserializer to Error on file I/O failure
6762af0
Make RI deserializer Dispose tolerant of CloseStream flush failure
f60978e
Wrap migration temp dir creation in try/catch with explicit throw
65648e2
Add empty-chunk no-op guard to ReceivingFileData state
705d1bb
Require exact stub length in RI trailer (reject over-long trailers)
41822e5
Make RI deserializer CloseStream null the stream even if Dispose throws
1c64625
Log swallowed CloseStream failure during RI deserializer dispose
5e998eb
Log swallowed temp-file delete failure during RI deserializer dispose
5dc72a7
Add blank line before goto case in RI deserializer file-open path
b30c544
Restore return true after ReceivingFileData try/catch
63ca950
Add tests for split file header and too-small trailer error paths
16acfb2
Ensure RangeIndexMigrationReader temp cleanup runs even if stream Dis…
7519b40
Add reader/serializer parity round-trip tests for RangeIndexMigration…
40b867a
Document ReadNextChunkAsync completion protocol via IsComplete
4801115
Validate minimum chunk size in RangeIndexMigrationReader constructor
c0be907
Comment the written==0 (buffer too small) break in ReadNextChunkAsync
c13317a
Move MinChunkSize to RangeIndexChunkedSerializer (wire-format owner)
e4b730a
Add round-trip test at exactly MinChunkSize boundary
8e9043a
Validate destination (not chunkSize) against MinChunkSize in reader
2bf76fb
Update ReadNextChunkAsync docs/comment for destination validation
a57eb7d
Remove redundant PublishMigratedIndex remarks block
d7b88a2
Update migration docs to match current RangeIndex implementation
efcf7cd
Trim KeyExists doc to a short inline note
c53fb4e
Add post-migration RangeIndex lifecycle cluster tests
effd36a
Harden migration publish: only Created is success; log + TODO race notes
c6d31f8
Clarify migration race comments; split RMW-update branch from no-reco…
e1acbd0
Remove stale checkpoint-CPR TODO comment in SnapshotForMigration
8f39f1e
Move process-crashing migration tests to crash/recovery PR
efffad9
Merge branch 'main' into tiagonapoli/bftree-migration
tiagonapoli 7ce8a3e
Fix flaky DisposeDuringProcessRecord test: wait on temp file, not chu…
945b2e2
CI: collect crash + hang dumps for cluster tests
c8e1832
Merge branch 'main' into tiagonapoli/bftree-migration
tiagonapoli File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
168 changes: 168 additions & 0 deletions
168
libs/cluster/Server/Migration/MigrateSession.RangeIndex.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| // Copyright (c) Microsoft Corporation. | ||
| // Licensed under the MIT license. | ||
|
|
||
| using System; | ||
| using System.Buffers; | ||
| using System.Collections.Generic; | ||
| using System.Text; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Garnet.client; | ||
| using Garnet.common; | ||
| using Garnet.server; | ||
| using Microsoft.Extensions.Logging; | ||
| using Tsavorite.core; | ||
|
|
||
| namespace Garnet.cluster | ||
| { | ||
| /// <summary> | ||
| /// RangeIndex migration support: source-side transmit driver. | ||
| /// </summary> | ||
| internal sealed partial class MigrateSession : IDisposable | ||
| { | ||
| /// <summary> | ||
| /// Transmit a single RangeIndex key to the destination node. | ||
| /// Uses <see cref="RangeIndexManager.SnapshotRangeIndexAndCreateReader"/> to obtain an async | ||
| /// migration reader that snapshots and streams the BfTree data. | ||
| /// </summary> | ||
| private async Task<bool> TransmitRangeIndexAsync(MigrateOperation migrateOperation, byte[] keyBytes, int chunkSize, CancellationToken cancellationToken) | ||
| { | ||
| if (!clusterProvider.serverOptions.EnableRangeIndexPreview) | ||
| { | ||
| logger?.LogError("TransmitRangeIndexAsync: RangeIndex feature is not enabled, skipping key {key}", Encoding.UTF8.GetString(keyBytes)); | ||
| return false; | ||
| } | ||
|
|
||
| var rangeIndexManager = clusterProvider.storeWrapper.DefaultDatabase.RangeIndexManager; | ||
|
|
||
| var sessionClient = migrateOperation.Client; | ||
| var buffer = ArrayPool<byte>.Shared.Rent(chunkSize); | ||
| var transmitActivity = RangeIndexMigrationActivities.TransmitActivity.StartActivity(); | ||
| try | ||
| { | ||
| using var reader = rangeIndexManager.SnapshotRangeIndexAndCreateReader(migrateOperation.LocalSession, keyBytes, chunkSize); | ||
| transmitActivity.OnSnapshotCompleted(reader.TotalFileBytes); | ||
|
|
||
| while (!reader.IsComplete) | ||
| { | ||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
|
||
| var payloadLen = await reader.ReadNextChunkAsync(buffer, cancellationToken).ConfigureAwait(false); | ||
| if (payloadLen == 0) | ||
| { | ||
| transmitActivity.OnError("Zero-length payload from reader"); | ||
| logger?.LogError("TransmitRangeIndexAsync: reader returned zero-length payload with a {Size}-byte buffer for key {key}", chunkSize, Encoding.UTF8.GetString(keyBytes)); | ||
| return false; | ||
| } | ||
|
|
||
| if (!await WriteOrSendRecordSpanAsync(sessionClient, MigrationRecordSpanType.SerializedRangeIndexStream, buffer.AsSpan(0, payloadLen)).ConfigureAwait(false)) | ||
| { | ||
| transmitActivity.OnError("Failed to write chunk"); | ||
| logger?.LogError("TransmitRangeIndexAsync: failed to write chunk for key {key}", Encoding.UTF8.GetString(keyBytes)); | ||
| return false; | ||
| } | ||
|
|
||
| transmitActivity.OnChunkSent(payloadLen); | ||
| } | ||
|
|
||
| // Force flush and await ACK | ||
| if (!await HandleMigrateTaskResponseAsync(sessionClient.SendAndResetIterationBuffer()).ConfigureAwait(false)) | ||
| { | ||
| transmitActivity.OnError("Flush failed"); | ||
| logger?.LogError("TransmitRangeIndexAsync: flush failed for key {key}", Encoding.UTF8.GetString(keyBytes)); | ||
| return false; | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| transmitActivity.OnError(ex.ToString()); | ||
| logger?.LogError(ex, "TransmitRangeIndexAsync: error during snapshot or transmission for key {key}", Encoding.UTF8.GetString(keyBytes)); | ||
| return false; | ||
| } | ||
| finally | ||
| { | ||
| ArrayPool<byte>.Shared.Return(buffer); | ||
| transmitActivity.EndAndLogActivity(logger, keyBytes); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Migrate a batch of RangeIndex keys with sketch protection. | ||
| /// Adds all keys to the sketch, transitions through TRANSMITTING → DELETING → MIGRATED | ||
| /// with epoch barriers, ensuring concurrent operations are properly gated. | ||
| /// </summary> | ||
| private async Task<bool> MigrateRangeIndexKeysAsync(MigrateOperation migrateOperation, HashSet<byte[]> rangeIndexKeys, CancellationToken cancellationToken) | ||
| { | ||
| var migrateActivity = RangeIndexMigrationActivities.MigrateActivity.StartActivity(rangeIndexKeys.Count); | ||
|
|
||
| try | ||
| { | ||
| logger?.LogWarning("MigrateRangeIndexKeysAsync: migrating {count} RangeIndex keys", rangeIndexKeys.Count); | ||
|
|
||
| // Add all RI keys to sketch during INITIALIZING (no gating yet) | ||
| migrateOperation.sketch.Clear(); | ||
| migrateOperation.sketch.SetStatus(SketchStatus.INITIALIZING); | ||
| foreach (var key in rangeIndexKeys) | ||
| migrateOperation.sketch.TryHashAndStore(key); | ||
|
|
||
| ExceptionInjectionHelper.TriggerException(ExceptionInjectionType.RangeIndex_Migration_Before_Transmitting); | ||
|
|
||
| // Block writes during snapshot + transmit | ||
| migrateOperation.sketch.SetStatus(SketchStatus.TRANSMITTING); | ||
| await WaitForConfigPropagationAsync().ConfigureAwait(false); | ||
| migrateActivity.OnTransmitting(); | ||
|
|
||
| await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.RangeIndex_Migration_After_Transmitting).ConfigureAwait(false); | ||
|
|
||
| foreach (var key in rangeIndexKeys) | ||
| { | ||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
|
||
| if (!await TransmitRangeIndexAsync(migrateOperation, key, RangeIndexManager.DefaultMigrationChunkSize, cancellationToken).ConfigureAwait(false)) | ||
| { | ||
| migrateActivity.OnError($"failed to transmit key {Encoding.UTF8.GetString(key)}"); | ||
| logger?.LogError("MigrateRangeIndexKeysAsync: failed to migrate RangeIndex key {key}", Encoding.UTF8.GetString(key)); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| ExceptionInjectionHelper.TriggerException(ExceptionInjectionType.RangeIndex_Migration_Before_Deleting); | ||
|
|
||
| // Block reads + writes during delete | ||
| migrateOperation.sketch.SetStatus(SketchStatus.DELETING); | ||
| await WaitForConfigPropagationAsync().ConfigureAwait(false); | ||
| migrateActivity.OnDeleting(); | ||
|
|
||
| await ExceptionInjectionHelper.ResetAndWaitAsync(ExceptionInjectionType.RangeIndex_Migration_After_Deleting).ConfigureAwait(false); | ||
|
|
||
| foreach (var key in rangeIndexKeys) | ||
| { | ||
| try | ||
| { | ||
| unsafe | ||
| { | ||
| fixed (byte* keyPtr = key) | ||
| migrateOperation.DeleteRangeIndex(PinnedSpanByte.FromPinnedPointer(keyPtr, key.Length)); | ||
| } | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| migrateActivity.OnError($"failed to delete key {Encoding.UTF8.GetString(key)}: {ex}"); | ||
| logger?.LogError(ex, "MigrateRangeIndexKeysAsync: failed to delete RangeIndex key {key} after migration", Encoding.UTF8.GetString(key)); | ||
| throw; | ||
|
vazois marked this conversation as resolved.
|
||
| } | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
| finally | ||
|
tiagonapoli marked this conversation as resolved.
|
||
| { | ||
| // Always clean up the sketch, even on failure, to unblock client operations | ||
| migrateOperation.sketch.Clear(); | ||
| migrateActivity.EndAndLogActivity(logger); | ||
| } | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.