Skip to content

[Bug]: RuntimeError: athrow(): asynchronous generator is already running on streaming responses #911

@ishymko

Description

@ishymko

What happened?

Streaming a response from an agent that returns a Message event (rather than TaskStatusUpdateEvents) produces an error during event loop shutdown.

Repro
import asyncio
import socket
import threading
import time
from uuid import uuid4

import httpx
import uvicorn
from starlette.applications import Starlette

from a2a.client.client import ClientConfig
from a2a.client.client_factory import ClientFactory
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.events.in_memory_queue_manager import InMemoryQueueManager
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes
from a2a.server.tasks.inmemory_task_store import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities, AgentCard, AgentInterface,
    Message, Part, Role, SendMessageRequest,
)
from a2a.utils import TransportProtocol


# Agent that responds with a Message event (triggers the bug)
class MessageExecutor(AgentExecutor):
    async def execute(self, ctx: RequestContext, eq: EventQueue):
        await eq.enqueue_event(Message(
            role=Role.ROLE_AGENT,
            message_id=str(uuid4()),
            parts=[Part(text="Hello from A2A Agent")],
            context_id=ctx.context_id,
            task_id=ctx.task_id,
        ))
    async def cancel(self, ctx, eq):
        pass


# Start server
with socket.socket() as s:
    s.bind(("127.0.0.1", 0))
    port = s.getsockname()[1]

url = f"http://127.0.0.1:{port}"
card = AgentCard(
    name="Test", description="Test", version="1",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text/plain"], default_output_modes=["text/plain"],
    supported_interfaces=[
        AgentInterface(protocol_binding=TransportProtocol.JSONRPC, url=url),
    ],
)
handler = DefaultRequestHandler(
    agent_executor=MessageExecutor(),
    task_store=InMemoryTaskStore(),
    queue_manager=InMemoryQueueManager(),
)
app = Starlette(routes=[
    *create_agent_card_routes(agent_card=card, card_url="/card"),
    *create_jsonrpc_routes(
        agent_card=card, request_handler=handler,
        extended_agent_card=card, rpc_url="/",
    ),
])
srv = uvicorn.Server(uvicorn.Config(app, host="127.0.0.1", port=port, log_level="warning"))
threading.Thread(target=srv.run, daemon=True).start()
for _ in range(50):
    if srv.started:
        break
    time.sleep(0.1)


# Client — the error appears naturally from asyncio.run()
async def main():
    async with httpx.AsyncClient() as httpx_client:
        client = ClientFactory(
            config=ClientConfig(httpx_client=httpx_client)
        ).create(card)

        msg = Message(
            role=Role.ROLE_USER,
            message_id=f"stream-{uuid4()}",
            parts=[Part(text="hello")],
        )
        async for event in client.send_message(
            request=SendMessageRequest(message=msg)
        ):
            print(event[0])

asyncio.run(main())
srv.should_exit = True

Relevant log output

an error occurred during closing of asynchronous generator <async_generator object aconnect_sse at 0x7f3f5ef44ac0>
asyncgen: <async_generator object aconnect_sse at 0x7f3f5ef44ac0>
RuntimeError: aclose(): asynchronous generator is already running
an error occurred during closing of asynchronous generator <async_generator object AsyncClient.stream at 0x7f3f5ef44b40>
asyncgen: <async_generator object AsyncClient.stream at 0x7f3f5ef44b40>
RuntimeError: aclose(): asynchronous generator is already running

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions