Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Hosting.AGUI.AspNetCore.Shared;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
Expand All @@ -13,6 +15,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Agents.AI.Hosting;

namespace Microsoft.Agents.AI.Hosting.AGUI.AspNetCore;

Expand Down Expand Up @@ -42,6 +45,7 @@ public static IEndpointConventionBuilder MapAGUI(

var jsonOptions = context.RequestServices.GetRequiredService<IOptions<Microsoft.AspNetCore.Http.Json.JsonOptions>>();
var jsonSerializerOptions = jsonOptions.Value.SerializerOptions;
var sessionStore = context.RequestServices.GetRequiredService<AgentSessionStore>();

var messages = input.Messages.AsChatMessages(jsonSerializerOptions);
var clientTools = input.Tools?.AsAITools().ToList();
Expand All @@ -63,11 +67,17 @@ public static IEndpointConventionBuilder MapAGUI(
}
};

AgentSession? session = await GetOrCreateSessionAsync(aiAgent, input.ThreadId, sessionStore, cancellationToken).ConfigureAwait(false);

// Run the agent and convert to AG-UI events
var events = aiAgent.RunStreamingAsync(
var events = RunStreamingWithSessionPersistenceAsync(
aiAgent,
messages,
options: runOptions,
cancellationToken: cancellationToken)
runOptions,
session,
input.ThreadId,
sessionStore,
cancellationToken)
.AsChatResponseUpdatesAsync()
.FilterServerToolsFromMixedToolInvocationsAsync(clientTools, cancellationToken)
.AsAGUIEventStreamAsync(
Expand All @@ -80,4 +90,59 @@ public static IEndpointConventionBuilder MapAGUI(
return new AGUIServerSentEventsResult(events, sseLogger);
});
}

private static async ValueTask<AgentSession?> GetOrCreateSessionAsync(
AIAgent aiAgent,
string? threadId,
AgentSessionStore sessionStore,
CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(threadId))
{
return null;
}

return await sessionStore.GetSessionAsync(aiAgent, threadId, cancellationToken).ConfigureAwait(false);
}

private static async ValueTask PersistSessionAsync(
AIAgent aiAgent,
string? threadId,
AgentSession? session,
AgentSessionStore sessionStore,
CancellationToken cancellationToken)
{
if (session is null || string.IsNullOrWhiteSpace(threadId))
{
return;
}

await sessionStore.SaveSessionAsync(aiAgent, threadId, session, cancellationToken).ConfigureAwait(false);
}

private static async IAsyncEnumerable<AgentResponseUpdate> RunStreamingWithSessionPersistenceAsync(
AIAgent aiAgent,
IEnumerable<ChatMessage> messages,
AgentRunOptions runOptions,
AgentSession? session,
string? threadId,
AgentSessionStore sessionStore,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
try
{
await foreach (AgentResponseUpdate update in aiAgent.RunStreamingAsync(
messages,
session,
runOptions,
CancellationToken.None).ConfigureAwait(false))
{
yield return update;
}
}
finally
{
await PersistSessionAsync(aiAgent, threadId, session, sessionStore, cancellationToken).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Hosting;
using Microsoft.Extensions.Caching.Memory;

namespace Microsoft.Agents.AI.Hosting.AGUI.AspNetCore;

/// <summary>
/// Provides an in-memory <see cref="AgentSessionStore"/> implementation for AG-UI hosted agents.
/// </summary>
/// <remarks>
/// This store is intended for single-instance development and testing scenarios. Applications that need
/// durable or distributed session persistence can replace the registered <see cref="AgentSessionStore"/>
/// service with a custom implementation.
/// </remarks>
public sealed class AGUIInMemorySessionStore : AgentSessionStore, IDisposable
{
private readonly MemoryCache _cache;
private readonly MemoryCacheEntryOptions _entryOptions;
private readonly ConcurrentDictionary<SessionCacheKey, TaskCompletionSource<AgentSession>> _sessionInitializationTasks = new();

/// <summary>
/// Initializes a new instance of the <see cref="AGUIInMemorySessionStore"/> class with default options.
/// </summary>
public AGUIInMemorySessionStore()
: this(options: null)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="AGUIInMemorySessionStore"/> class.
/// </summary>
/// <param name="options">The cache options to apply. If <see langword="null"/>, default options are used.</param>
public AGUIInMemorySessionStore(AGUIInMemorySessionStoreOptions? options)
{
AGUIInMemorySessionStoreOptions resolvedOptions = options ?? new();
this._cache = new MemoryCache(resolvedOptions.ToMemoryCacheOptions());
this._entryOptions = resolvedOptions.ToMemoryCacheEntryOptions();
}

/// <inheritdoc/>
public override ValueTask<AgentSession> GetSessionAsync(AIAgent agent, string conversationId, CancellationToken cancellationToken = default)
=> this.GetOrCreateSessionAsync(agent, conversationId, cancellationToken);

/// <summary>
/// Gets the session for the specified conversation or creates a new one when none exists.
/// </summary>
/// <param name="agent">The agent that owns the session.</param>
/// <param name="threadId">The conversation or thread identifier.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The existing or newly created session.</returns>
public ValueTask<AgentSession> GetOrCreateSessionAsync(AIAgent agent, string threadId, CancellationToken cancellationToken = default)
{
SessionCacheKey key = GetKey(agent, threadId);
if (this._cache.TryGetValue(key, out AgentSession? session) && session is not null)
{
return new(session);
}

return this.GetOrCreateSessionCoreAsync(agent, key, cancellationToken);
}

/// <inheritdoc/>
public override ValueTask SaveSessionAsync(AIAgent agent, string conversationId, AgentSession session, CancellationToken cancellationToken = default)
{
this._cache.Set(GetKey(agent, conversationId), session, this._entryOptions);
return ValueTask.CompletedTask;
}

private async ValueTask<AgentSession> GetOrCreateSessionCoreAsync(AIAgent agent, SessionCacheKey key, CancellationToken cancellationToken)
{
TaskCompletionSource<AgentSession> initializationTask = new(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<AgentSession> sharedInitializationTask = this._sessionInitializationTasks.GetOrAdd(key, initializationTask);

if (ReferenceEquals(sharedInitializationTask, initializationTask))
{
if (this._cache.TryGetValue(key, out AgentSession? existingSession) && existingSession is not null)
{
initializationTask.TrySetResult(existingSession);
this._sessionInitializationTasks.TryRemove(new KeyValuePair<SessionCacheKey, TaskCompletionSource<AgentSession>>(key, initializationTask));
}
else
{
_ = this.InitializeSessionAsync(agent, key, initializationTask);
}
}

return await sharedInitializationTask.Task.WaitAsync(cancellationToken).ConfigureAwait(false);
}

private async Task<AgentSession> CreateAndStoreSessionAsync(AIAgent agent, SessionCacheKey key)
{
AgentSession session = await agent.CreateSessionAsync(CancellationToken.None).ConfigureAwait(false);
return this._cache.GetOrCreate(key, entry =>
{
entry.SetOptions(this._entryOptions);
return session;
})!;
}

private async Task InitializeSessionAsync(AIAgent agent, SessionCacheKey key, TaskCompletionSource<AgentSession> initializationTask)
{
try
{
AgentSession session = await this.CreateAndStoreSessionAsync(agent, key).ConfigureAwait(false);
initializationTask.TrySetResult(session);
}
catch (Exception ex)
{
initializationTask.TrySetException(ex);
}
finally
{
this._sessionInitializationTasks.TryRemove(new KeyValuePair<SessionCacheKey, TaskCompletionSource<AgentSession>>(key, initializationTask));
}
}

/// <summary>
/// Releases the underlying memory cache.
/// </summary>
public void Dispose()
{
this._cache.Dispose();
}

private static SessionCacheKey GetKey(AIAgent agent, string threadId) => new(agent.Id, threadId);

private sealed record SessionCacheKey(string AgentId, string ThreadId);
}

/// <summary>
/// Configures the default <see cref="AGUIInMemorySessionStore"/> registration used by <c>AddAGUI</c>.
/// </summary>
public sealed class AGUIInMemorySessionStoreOptions
{
/// <summary>
/// Gets or sets the maximum number of sessions to retain in memory.
/// </summary>
public long? SizeLimit { get; set; } = 1000;

/// <summary>
/// Gets or sets the absolute expiration applied to cached sessions.
/// </summary>
public TimeSpan? AbsoluteExpirationRelativeToNow { get; set; }

/// <summary>
/// Gets or sets the sliding expiration applied to cached sessions.
/// </summary>
public TimeSpan? SlidingExpiration { get; set; } = TimeSpan.FromHours(1);

internal MemoryCacheOptions ToMemoryCacheOptions() => new()
{
SizeLimit = this.SizeLimit
};

internal MemoryCacheEntryOptions ToMemoryCacheEntryOptions() => new()
{
AbsoluteExpirationRelativeToNow = this.AbsoluteExpirationRelativeToNow,
SlidingExpiration = this.SlidingExpiration,
Size = 1
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

<ItemGroup>
<ProjectReference Include="..\Microsoft.Agents.AI\Microsoft.Agents.AI.csproj" />
<ProjectReference Include="..\Microsoft.Agents.AI.Hosting\Microsoft.Agents.AI.Hosting.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

using System;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Hosting;
using Microsoft.Agents.AI.Hosting.AGUI.AspNetCore;
using Microsoft.AspNetCore.Http.Json;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;

namespace Microsoft.Extensions.DependencyInjection;

Expand All @@ -22,6 +25,26 @@ public static IServiceCollection AddAGUI(this IServiceCollection services)
ArgumentNullException.ThrowIfNull(services);

services.Configure<JsonOptions>(options => options.SerializerOptions.TypeInfoResolverChain.Add(AGUIJsonSerializerOptions.Default.TypeInfoResolver!));
services.AddOptions<AGUIInMemorySessionStoreOptions>();
services.TryAddSingleton<AgentSessionStore>(sp =>
new AGUIInMemorySessionStore(sp.GetRequiredService<IOptions<AGUIInMemorySessionStoreOptions>>().Value));

return services;
}

/// <summary>
/// Adds support for exposing <see cref="AIAgent"/> instances via AG-UI and configures the default in-memory session store.
/// </summary>
/// <param name="services">The <see cref="IServiceCollection"/> to configure.</param>
/// <param name="configureSessionStore">Configures the default <see cref="AGUIInMemorySessionStoreOptions"/>.</param>
/// <returns>The <see cref="IServiceCollection"/> for method chaining.</returns>
public static IServiceCollection AddAGUI(this IServiceCollection services, Action<AGUIInMemorySessionStoreOptions> configureSessionStore)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configureSessionStore);

services.AddAGUI();
services.Configure(configureSessionStore);

return services;
}
Expand Down
Loading
Loading