Skip to content

feat: Production-Grade: Persistent Store & Distributed Event Bus#343

Open
hashtekconsulting wants to merge 5 commits intoa2aproject:mainfrom
hashtek-fusion:main
Open

feat: Production-Grade: Persistent Store & Distributed Event Bus#343
hashtekconsulting wants to merge 5 commits intoa2aproject:mainfrom
hashtek-fusion:main

Conversation

@hashtekconsulting
Copy link

feat: Persistent Task Store and Distributed Event Bus/Queue for scaling the A2A server instances in production environment

By default the SDK ships with InMemoryTaskStore and DefaultExecutionEventBusManager, which are perfect for a single-process server. In a production deployment with multiple server instances behind a load balancer you need:

Persistent task state — so any instance can serve a tasks/get request regardless of which instance originally handled the task.
Distributed SSE fan-out — so a client that opens an SSE stream on Instance B receives events published by the executor running on Instance A.
This feature introduced new components to support A2A server scalability in production environment

Rajesh Ramamoorthy and others added 3 commits March 3, 2026 14:44
…yments

Introduce DynamoDB-backed TaskStore and SNS/SQS-based ExecutionEventBusManager
to support horizontally-scaled A2A server deployments.

- DynamoDBTaskStore: persistent TaskStore backed by DynamoDB with optimistic
  locking (version attribute + conditional writes), configurable exponential
  back-off on conflicts, and optional TTL for automatic task expiry.
- SnsEventBusManager: drop-in replacement for DefaultExecutionEventBusManager
  that fans out execution events across instances via SNS/SQS, enabling SSE
  delivery to clients connected to any node in the cluster.
- SqsEventPoller: per-instance SQS long-poll loop that deduplicates messages
  by instanceId to prevent double-delivery on the originating node.
- TaskStoreError hierarchy: typed errors (TaskNotFoundError, TaskConflictError,
  StoreUnavailableError) with retryable classification.
- Add AWS SDK dependencies: @aws-sdk/client-dynamodb, @aws-sdk/client-sns,
  @aws-sdk/client-sqs, @aws-sdk/lib-dynamodb.
- Add test dependencies: aws-sdk-client-mock, aws-sdk-client-mock-vitest.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nents part of a2a SDK and also made it optional.
@hashtekconsulting hashtekconsulting requested a review from a team as a code owner March 3, 2026 22:56
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the A2A SDK's server-side capabilities by introducing robust solutions for production-grade scalability. It replaces in-memory task storage and event management with AWS-backed persistent storage (DynamoDB) and a distributed event bus (SNS/SQS). These changes ensure that task states are durable and events are reliably fanned out across multiple server instances, which is crucial for high-availability and load-balanced deployments.

Highlights

  • Production-Grade Scalability: Introduced new components to enable the A2A server to scale effectively in production environments, supporting multi-instance deployments behind load balancers.
  • Persistent Task Store: Implemented DynamoDBTaskStore as a drop-in replacement for InMemoryTaskStore, providing persistent storage for task states using AWS DynamoDB with features like optimistic locking and configurable TTL.
  • Distributed Event Bus: Developed SnsEventBusManager and QueueLifecycleManager to facilitate distributed Server-Sent Events (SSE) fan-out across multiple server instances using AWS SNS and SQS, ensuring clients receive events regardless of which instance processed the task.
  • Ephemeral SQS Queue Management: QueueLifecycleManager handles the dynamic creation and deletion of dedicated SQS queues for each server instance on boot and shutdown, addressing challenges in ECS auto-scaling environments.
  • Comprehensive Documentation: Added extensive documentation to the README.md detailing the installation, infrastructure requirements, IAM permissions, queue lifecycle, and usage examples for these new distributed components.
  • Modular AWS SDK Dependencies: Configured AWS SDK packages (@aws-sdk/client-dynamodb, @aws-sdk/lib-dynamodb, @aws-sdk/client-sns, @aws-sdk/client-sqs) as optional peer dependencies, keeping the base SDK lightweight for single-instance deployments.
Changelog
  • README.md
    • Added a new comprehensive section on 'Production-Grade: Persistent Store & Distributed Event Bus'.
    • Included detailed instructions for installation, infrastructure requirements, and IAM permissions.
    • Provided a clear explanation of the queue lifecycle in ECS auto-scaling and how the components connect.
    • Documented environment variables and key behaviors of DynamoDBTaskStore, QueueLifecycleManager, and SnsEventBusManager.
  • package.json
    • Updated exports field to expose the new server/distributed sub-path.
    • Added @aws-sdk/client-dynamodb, @aws-sdk/lib-dynamodb, @aws-sdk/client-sns, and @aws-sdk/client-sqs as optional peer dependencies.
    • Included aws-sdk-client-mock and aws-sdk-client-mock-vitest as dev dependencies for testing AWS SDK interactions.
  • src/server/distributed/index.ts
    • Added a new entry point file for distributed server components.
    • Exported DynamoDBTaskStore, QueueLifecycleManager, and SnsEventBusManager for modular access.
  • src/server/events/queue_lifecycle_manager.ts
    • Added a new class QueueLifecycleManager responsible for creating, configuring, and tearing down per-instance SQS queues and SNS subscriptions.
    • Implemented idempotent provision() and best-effort teardown() methods for queue lifecycle management.
    • Included logic for setting SQS queue attributes, policies, and SNS subscriptions with rollback on failure.
  • src/server/events/sns_sqs_event_bus_manager.ts
    • Added a new class SnsEventBusManager as a distributed implementation of ExecutionEventBusManager.
    • Introduced DistributedExecutionEventBus which extends DefaultExecutionEventBus to publish events to SNS for cross-instance fan-out.
    • Implemented SqsEventPoller to poll SQS queues, decode SNS messages, and deliver them to local event buses, including instance-based deduplication.
  • src/server/store/dynamo_task_store.ts
    • Added a new class DynamoDBTaskStore to provide persistent task storage using AWS DynamoDB.
    • Implemented optimistic locking with configurable retries for save() operations.
    • Ensured consistent reads for load() operations and optional TTL support for task expiry.
  • src/server/store/errors.ts
    • Added custom error classes TaskStoreError, TaskNotFoundError, TaskConflictError, and StoreUnavailableError for the distributed task store layer.
    • Categorized errors by their retryable nature to guide application-level error handling.
  • test/server/events/queue_lifecycle_manager.spec.ts
    • Added unit tests for QueueLifecycleManager covering provisioning, teardown, idempotency, and error handling.
  • test/server/events/sns_sqs_event_bus_manager.spec.ts
    • Added unit tests for SnsEventBusManager, DistributedExecutionEventBus, and SqsEventPoller.
    • Verified local delivery, SNS publishing, SQS message processing, and instance deduplication.
  • test/server/integration/distributed_stack.spec.ts
    • Added integration tests to validate the interaction between DynamoDBTaskStore, SnsEventBusManager, and DefaultRequestHandler.
    • Confirmed end-to-end task persistence and cross-instance event fan-out.
  • test/server/store/dynamo_task_store.spec.ts
    • Added unit tests for DynamoDBTaskStore covering save and load operations, optimistic locking, TTL, and error handling.
  • tsup.config.ts
    • Updated the tsup build configuration to include the new src/server/distributed/index.ts entry point.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant new functionality for running the A2A server in a distributed, production environment, featuring a persistent TaskStore using DynamoDB and a distributed event bus with SNS/SQS. While the code is well-structured with robust error handling and comprehensive tests, a critical memory leak has been identified in the distributed event bus implementation, where every server instance creates and retains event bus objects for every task in the fleet, regardless of whether a local client is connected. Additionally, a resource leak in the SQS polling loop can cause async functions to hang indefinitely upon shutdown, which could lead to system instability and Denial of Service attacks. Addressing these, along with a couple of other identified issues (one critical for usability), is essential.

@github-actions
Copy link

github-actions bot commented Mar 4, 2026

🧪 Code Coverage

⬇️ Download Full Report

Main PR Delta
src/server/request_handler/default_request_handler.ts 79.69% 80.07% 🟢 +0.38%
src/server/distributed/index.ts (new) 0%
src/server/events/queue_lifecycle_manager.ts (new) 91.92%
src/server/events/sns_sqs_event_bus_manager.ts (new) 96.05%
src/server/store/dynamo_task_store.ts (new) 96.92%
src/server/store/errors.ts (new) 92.85%
Total 80.2% 81.64% 🟢 +1.44%

Generated by coverage-comment.yml

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant