Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@ internal async ValueTask ImportStateAsync(Checkpoint checkpoint)
.Select(id => this.EnsureExecutorAsync(id, tracer: null).AsTask())
.ToArray();

// Discard queued external deliveries from the superseded timeline so a runtime
// restore cannot apply stale responses after importing the checkpoint state.
while (this._queuedExternalDeliveries.TryDequeue(out _))
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dequeue loop clears the queue in-place, which can inadvertently discard new external deliveries enqueued concurrently with the restore (e.g., a response arriving just after restore starts). It can also spin longer than intended if enqueues continue. Consider swapping the queue atomically at the start of import (e.g., Interlocked.Exchange to a new ConcurrentQueue, then drain the old instance) so only pre-restore deliveries are dropped and post-restore deliveries are preserved.

Suggested change
while (this._queuedExternalDeliveries.TryDequeue(out _))
// Atomically swap the queue so only deliveries that were already queued before
// restore started are discarded; deliveries arriving concurrently are preserved
// on the new queue.
var queuedExternalDeliveries = Interlocked.Exchange(ref this._queuedExternalDeliveries, new());
while (queuedExternalDeliveries.TryDequeue(out _))

Copilot uses AI. Check for mistakes.
{
}

this._nextStep = new StepContext();
this._nextStep.ImportMessages(importedState.QueuedMessages);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,48 @@ internal async Task Checkpoint_Restore_WithPendingRequests_RepublishesRequestInf
"the workflow should be able to continue after the runtime restore replay");
}

/// <summary>
/// Verifies that restoring a live run clears any queued external responses from the
/// superseded timeline before importing checkpoint state.
/// </summary>
[Fact]
internal async Task Checkpoint_Restore_ClearsQueuedExternalResponsesBeforeImportAsync()
{
Workflow workflow = CreateSimpleRequestWorkflow();
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment env = ExecutionEnvironment.InProcess_Lockstep.ToWorkflowExecutionEnvironment();

await using StreamingRun run = await env.WithCheckpointing(checkpointManager)
.RunStreamingAsync(workflow, "Hello");

(ExternalRequest pendingRequest, CheckpointInfo checkpoint) = await CapturePendingRequestAndCheckpointAsync(run);

await run.SendResponseAsync(pendingRequest.CreateResponse("World"));
await run.RestoreCheckpointAsync(checkpoint);

List<WorkflowEvent> restoredEvents = await ReadToHaltAsync(run);
ExternalRequest replayedRequest = restoredEvents.OfType<RequestInfoEvent>()
.Select(evt => evt.Request)
.Should()
.ContainSingle("the restored run should still be waiting for the checkpointed request")
.Subject;

restoredEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
"a queued response from the superseded timeline should not be processed after restore");
RunStatus statusAfterRestore = await run.GetStatusAsync();
statusAfterRestore.Should().Be(RunStatus.PendingRequests,
"the restored run should remain pending until a post-restore response is sent");

await run.SendResponseAsync(replayedRequest.CreateResponse("Again"));

List<WorkflowEvent> completionEvents = await ReadToHaltAsync(run);
completionEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
"the restored request should complete cleanly once a new response is provided");
RunStatus finalStatus = await run.GetStatusAsync();
finalStatus.Should().Be(RunStatus.Idle,
"the workflow should finish once the replayed request receives a fresh response");
}

/// <summary>
/// Verifies that a resumed parent workflow re-emits pending requests that originated in a subworkflow.
/// </summary>
Expand Down
Loading