diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index f0bb8cac26..d6c7d301e3 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -419,6 +419,12 @@ internal async ValueTask ImportStateAsync(Checkpoint checkpoint) .Select(id => this.EnsureExecutorAsync(id, tracer: null).AsTask()) .ToArray(); + // Discard queued external deliveries from the superseded timeline so a runtime + // restore cannot apply stale responses after importing the checkpoint state. + while (this._queuedExternalDeliveries.TryDequeue(out _)) + { + } + this._nextStep = new StepContext(); this._nextStep.ImportMessages(importedState.QueuedMessages); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs index 9d4b514af7..53ea644712 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs @@ -279,6 +279,48 @@ internal async Task Checkpoint_Restore_WithPendingRequests_RepublishesRequestInf "the workflow should be able to continue after the runtime restore replay"); } + /// + /// Verifies that restoring a live run clears any queued external responses from the + /// superseded timeline before importing checkpoint state. + /// + [Fact] + internal async Task Checkpoint_Restore_ClearsQueuedExternalResponsesBeforeImportAsync() + { + Workflow workflow = CreateSimpleRequestWorkflow(); + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + InProcessExecutionEnvironment env = ExecutionEnvironment.InProcess_Lockstep.ToWorkflowExecutionEnvironment(); + + await using StreamingRun run = await env.WithCheckpointing(checkpointManager) + .RunStreamingAsync(workflow, "Hello"); + + (ExternalRequest pendingRequest, CheckpointInfo checkpoint) = await CapturePendingRequestAndCheckpointAsync(run); + + await run.SendResponseAsync(pendingRequest.CreateResponse("World")); + await run.RestoreCheckpointAsync(checkpoint); + + List restoredEvents = await ReadToHaltAsync(run); + ExternalRequest replayedRequest = restoredEvents.OfType() + .Select(evt => evt.Request) + .Should() + .ContainSingle("the restored run should still be waiting for the checkpointed request") + .Subject; + + restoredEvents.OfType().Should().BeEmpty( + "a queued response from the superseded timeline should not be processed after restore"); + RunStatus statusAfterRestore = await run.GetStatusAsync(); + statusAfterRestore.Should().Be(RunStatus.PendingRequests, + "the restored run should remain pending until a post-restore response is sent"); + + await run.SendResponseAsync(replayedRequest.CreateResponse("Again")); + + List completionEvents = await ReadToHaltAsync(run); + completionEvents.OfType().Should().BeEmpty( + "the restored request should complete cleanly once a new response is provided"); + RunStatus finalStatus = await run.GetStatusAsync(); + finalStatus.Should().Be(RunStatus.Idle, + "the workflow should finish once the replayed request receives a fresh response"); + } + /// /// Verifies that a resumed parent workflow re-emits pending requests that originated in a subworkflow. ///