Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/RockBot.A2A/A2ATaskErrorHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ await conversationMemory.AddTurnAsync(

try
{
using var progressCtx = ToolProgressNotifier.SetContext(new ToolProgressContext
{
SessionId = pending.PrimarySessionId,
AgentName = agent.Name,
ReplyTo = UserProxyTopics.UserResponse
});

var finalContent = await agentLoopRunner.RunAsync(
chatMessages, chatOptions, pending.PrimarySessionId,
enableFollowUp: false, cancellationToken: ct);
Expand Down
7 changes: 7 additions & 0 deletions src/RockBot.A2A/A2ATaskResultHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
ToolGuideTools toolGuideTools,
IConversationMemory conversationMemory,
A2ATaskTracker tracker,
ModelBehavior modelBehavior,

Check warning on line 31 in src/RockBot.A2A/A2ATaskResultHandler.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Parameter 'modelBehavior' is unread.
ILogger<A2ATaskResultHandler> logger) : IMessageHandler<AgentTaskResult>
{
public async Task HandleAsync(AgentTaskResult result, MessageHandlerContext context)
Expand Down Expand Up @@ -151,6 +151,13 @@

try
{
using var progressCtx = ToolProgressNotifier.SetContext(new ToolProgressContext
{
SessionId = rawSessionId,
AgentName = agent.Name,
ReplyTo = UserProxyTopics.UserResponse
});

var finalContent = await agentLoopRunner.RunAsync(
chatMessages, chatOptions, rawSessionId,
enableFollowUp: false, cancellationToken: ct);
Expand Down
7 changes: 7 additions & 0 deletions src/RockBot.A2A/A2ATaskStatusHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ await conversationMemory.AddTurnAsync(

try
{
using var progressCtx = ToolProgressNotifier.SetContext(new ToolProgressContext
{
SessionId = pending.PrimarySessionId,
AgentName = agent.Name,
ReplyTo = UserProxyTopics.UserResponse
});

var finalContent = await agentLoopRunner.RunAsync(
chatMessages, chatOptions, pending.PrimarySessionId,
enableFollowUp: false, cancellationToken: ct);
Expand Down
11 changes: 10 additions & 1 deletion src/RockBot.Agent/ScheduledTaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,21 @@ public async Task HandleAsync(ScheduledTaskMessage message, MessageHandlerContex
return;
}

var replySessionId = message.IsSystemTask ? "scheduled-system" : "scheduled";

string finalText;
try
{
// Use slot.Token so a new user message can preempt this task cleanly.
await using (slot)
{
using var progressCtx = ToolProgressNotifier.SetContext(new ToolProgressContext
{
SessionId = replySessionId,
AgentName = agent.Name,
ReplyTo = UserProxyTopics.UserResponse
});

finalText = await agentLoopRunner.RunAsync(
chatMessages, chatOptions, sessionId: sessionId,
enableFollowUp: false, cancellationToken: slot.Token);
Expand Down Expand Up @@ -133,7 +142,7 @@ public async Task HandleAsync(ScheduledTaskMessage message, MessageHandlerContex
var reply = new AgentReply
{
Content = finalText,
SessionId = "scheduled",
SessionId = replySessionId,
AgentName = agent.Name,
IsFinal = true
};
Expand Down
16 changes: 16 additions & 0 deletions src/RockBot.Agent/UserMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,14 @@ private async Task NativeLlmLoopAsync(

logger.LogInformation("Native LLM loop started for session {SessionId}", sessionId);

using var progressCtx = ToolProgressNotifier.SetContext(new ToolProgressContext
{
SessionId = sessionId,
AgentName = agent.Name,
CorrelationId = correlationId,
ReplyTo = replyTo
});

var lastProgressAt = DateTimeOffset.UtcNow;

var text = await agentLoopRunner.RunAsync(
Expand Down Expand Up @@ -468,6 +476,14 @@ private async Task BackgroundToolLoopAsync(

logger.LogInformation("Background tool loop started for session {SessionId}", sessionId);

using var progressCtx = ToolProgressNotifier.SetContext(new ToolProgressContext
{
SessionId = sessionId,
AgentName = agent.Name,
CorrelationId = correlationId,
ReplyTo = replyTo
});

var lastProgressAt = DateTimeOffset.UtcNow;

var finalContent = await agentLoopRunner.RunAsync(
Expand Down
7 changes: 6 additions & 1 deletion src/RockBot.Host.Abstractions/ScheduledTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ namespace RockBot.Host;
/// When true the task is automatically cancelled after it fires once.
/// Use this for one-time reminders and deferred actions.
/// </param>
/// <param name="IsSystemTask">
/// When true the task is a system-internal background task (e.g. heartbeat patrol)
/// whose results are collapsed in the UI. User-created tasks default to false.
/// </param>
public sealed record ScheduledTask(
string Name,
string CronExpression,
string Description,
DateTimeOffset CreatedAt,
DateTimeOffset? LastFiredAt = null,
bool RunOnce = false);
bool RunOnce = false,
bool IsSystemTask = false);
2 changes: 1 addition & 1 deletion src/RockBot.Host.Abstractions/ScheduledTaskMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ namespace RockBot.Host;
/// </summary>
/// <param name="TaskName">Name of the scheduled task that fired.</param>
/// <param name="Description">Task description — the agent's instructions for this run.</param>
public sealed record ScheduledTaskMessage(string TaskName, string Description);
public sealed record ScheduledTaskMessage(string TaskName, string Description, bool IsSystemTask = false);
3 changes: 2 additions & 1 deletion src/RockBot.Host/HeartbeatBootstrapService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ await scheduler.ScheduleAsync(new ScheduledTask(
CronExpression: options.Value.CronExpression,
Description: "Run the heartbeat patrol: check calendar, email, active plans, and scheduled task health.",
CreatedAt: patrol?.CreatedAt ?? DateTimeOffset.UtcNow,
RunOnce: false), ct);
RunOnce: false,
IsSystemTask: true), ct);

var action = patrol is null ? "Registered" : "Updated";
logger.LogInformation("{Action} heartbeat patrol (cron: {Cron})", action, options.Value.CronExpression);
Expand Down
2 changes: 1 addition & 1 deletion src/RockBot.Host/SchedulerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private async Task FireTaskAsync(ScheduledTask task)

try
{
var message = new ScheduledTaskMessage(task.Name, task.Description);
var message = new ScheduledTaskMessage(task.Name, task.Description, task.IsSystemTask);
var envelope = message.ToEnvelope(source: _identity.Name);
await _pipeline.DispatchAsync(envelope, _cts.Token);
}
Expand Down
1 change: 1 addition & 0 deletions src/RockBot.Host/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public static IServiceCollection AddRockBotHost(
builder.Build();

services.AddTransient<ILlmClient, LlmClient>();
services.AddSingleton<IToolProgressNotifier, ToolProgressNotifier>();
services.AddTransient<AgentLoopRunner>();
services.AddScoped<AgentContextBuilder>();
services.AddSingleton<SessionStartTracker>();
Expand Down
94 changes: 94 additions & 0 deletions src/RockBot.Host/ToolProgressNotifier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using Microsoft.Extensions.Logging;
using RockBot.Messaging;
using RockBot.UserProxy;

namespace RockBot.Host;

/// <summary>
/// Ambient context that carries session routing info for tool-call progress messages.
/// Each handler sets this via <see cref="ToolProgressNotifier.SetContext"/> before calling
/// <see cref="AgentLoopRunner.RunAsync"/> so the notifier knows where to route progress.
/// </summary>
public sealed class ToolProgressContext
{
public required string SessionId { get; init; }
public required string AgentName { get; init; }
public string? CorrelationId { get; init; }
public required string ReplyTo { get; init; }
}

/// <summary>
/// Publishes per-tool-call progress messages to the message bus so the Blazor UI can
/// display a step-by-step activity log. Uses <see cref="AsyncLocal{T}"/> to read the
/// session routing context set by each handler.
/// </summary>
public sealed class ToolProgressNotifier(
IMessagePublisher publisher,
ILogger<ToolProgressNotifier> logger) : IToolProgressNotifier
{
private static readonly AsyncLocal<ToolProgressContext?> _context = new();

/// <summary>
/// Sets the ambient context for the current async flow. Call this before
/// <see cref="AgentLoopRunner.RunAsync"/> so tool-call progress is routed correctly.
/// Returns an <see cref="IDisposable"/> that clears the context on dispose.
/// </summary>
public static IDisposable SetContext(ToolProgressContext context)
{
_context.Value = context;
return new ContextScope();
}

public async Task OnToolInvokingAsync(string toolName, string? argsSummary, CancellationToken ct)
{
var ctx = _context.Value;
if (ctx is null) return;

var content = string.IsNullOrEmpty(argsSummary)
? $"Calling {toolName}…"
: $"Calling {argsSummary}…";

await PublishProgressAsync(ctx, content, ct);
}

public async Task OnToolInvokedAsync(string toolName, string? resultSummary, CancellationToken ct)
{
var ctx = _context.Value;
if (ctx is null) return;

var content = string.IsNullOrEmpty(resultSummary)
? $"{toolName} completed"
: $"{toolName} \u2192 {resultSummary}";

await PublishProgressAsync(ctx, content, ct);
}

private async Task PublishProgressAsync(ToolProgressContext ctx, string content, CancellationToken ct)
{
var reply = new AgentReply
{
Content = content,
SessionId = ctx.SessionId,
AgentName = ctx.AgentName,
IsFinal = false
};

var envelope = reply.ToEnvelope<AgentReply>(
source: ctx.AgentName,
correlationId: ctx.CorrelationId);

try
{
await publisher.PublishAsync(ctx.ReplyTo, envelope, ct);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogDebug(ex, "Failed to publish tool progress for {Tool}", content);
}
}

private sealed class ContextScope : IDisposable
{
public void Dispose() => _context.Value = null;
}
}
7 changes: 7 additions & 0 deletions src/RockBot.Subagent/SubagentResultHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ await conversationMemory.AddTurnAsync(

try
{
using var progressCtx = ToolProgressNotifier.SetContext(new ToolProgressContext
{
SessionId = message.PrimarySessionId,
AgentName = agent.Name,
ReplyTo = UserProxyTopics.UserResponse
});

var finalContent = await agentLoopRunner.RunAsync(
chatMessages, chatOptions, message.PrimarySessionId,
enableFollowUp: false, cancellationToken: ct);
Expand Down
8 changes: 8 additions & 0 deletions src/RockBot.Subagent/SubagentRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using RockBot.Messaging;
using RockBot.Skills;
using RockBot.Tools;
using RockBot.UserProxy;

namespace RockBot.Subagent;

Expand Down Expand Up @@ -153,6 +154,13 @@ public async Task RunAsync(

try
{
using var progressCtx = ToolProgressNotifier.SetContext(new ToolProgressContext
{
SessionId = primarySessionId,
AgentName = $"subagent-{taskId}",
ReplyTo = UserProxy.UserProxyTopics.UserResponse
});

finalOutput = await agentLoopRunner.RunAsync(
chatMessages, chatOptions, subagentSessionId,
tier: tier, enableFollowUp: false, enableCompletionEval: false,
Expand Down
Loading
Loading