Skip to content

Commit 2b323d0

Browse files
authored
fix: fix error handling for gRPC and SSE streaming (#879)
Reproduced in `test_client_server_integration.py`. ### gRPC `validate_async_generator` decorator was applied on top of the method above A2A error handling. Compat handler was already refactored in a way which made it possible to apply it on a nested function. It was done there and v1 handler was refactored in the same way. ### SSE streaming Iterator wrapped into `validate_async_generator` is assigned to `EventSourceResponse` and is returned from the method, so when it throws `rest_stream_error_handler` has no effect on it. https://github.com/a2aproject/a2a-python/blob/4630efd0ca4bf6934a7d9215ef2a2986b6e6e73a/src/a2a/server/apps/rest/rest_adapter.py#L155-L163 Instead of throwing on the first iteration, throw on the method invocation itself to avoid more sophisticated error handling (i.e. reading one item to trigger error) by removing separate handling for async generator. Client-level handling is also updated to properly handle non-200 status code for streaming and non-streaming response in case of JSON-RPC error.
1 parent 7437b88 commit 2b323d0

File tree

10 files changed

+317
-283
lines changed

10 files changed

+317
-283
lines changed

src/a2a/client/transports/http_helpers.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,22 @@ async def send_http_stream_request(
7878
async with aconnect_sse(
7979
httpx_client, method, url, **kwargs
8080
) as event_source:
81-
event_source.response.raise_for_status()
81+
try:
82+
event_source.response.raise_for_status()
83+
except httpx.HTTPStatusError as e:
84+
# Read upfront streaming error content immediately, otherwise lower-level handlers
85+
# (e.g. response.json()) crash with 'ResponseNotRead' Access errors.
86+
await event_source.response.aread()
87+
raise e
88+
89+
# If the response is not a stream, read it standardly (e.g., upfront JSON-RPC error payload)
90+
if 'text/event-stream' not in event_source.response.headers.get(
91+
'content-type', ''
92+
):
93+
content = await event_source.response.aread()
94+
yield content.decode('utf-8')
95+
return
96+
8297
async for sse in event_source.aiter_sse():
8398
if not sse.data:
8499
continue

src/a2a/compat/v0_3/grpc_handler.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from a2a.server.request_handlers.request_handler import RequestHandler
3030
from a2a.types.a2a_pb2 import AgentCard
3131
from a2a.utils.errors import A2AError, InvalidParamsError
32-
from a2a.utils.helpers import maybe_await, validate, validate_async_generator
32+
from a2a.utils.helpers import maybe_await, validate
3333

3434

3535
logger = logging.getLogger(__name__)
@@ -170,17 +170,17 @@ async def _handler(
170170
context, _handler, a2a_v0_3_pb2.SendMessageResponse()
171171
)
172172

173-
@validate_async_generator(
174-
lambda self: self.agent_card.capabilities.streaming,
175-
'Streaming is not supported by the agent',
176-
)
177173
async def SendStreamingMessage(
178174
self,
179175
request: a2a_v0_3_pb2.SendMessageRequest,
180176
context: grpc.aio.ServicerContext,
181177
) -> AsyncIterable[a2a_v0_3_pb2.StreamResponse]:
182178
"""Handles the 'SendStreamingMessage' gRPC method (v0.3)."""
183179

180+
@validate(
181+
lambda _: self.agent_card.capabilities.streaming,
182+
'Streaming is not supported by the agent',
183+
)
184184
async def _handler(
185185
server_context: ServerCallContext,
186186
) -> AsyncIterable[a2a_v0_3_pb2.StreamResponse]:
@@ -233,17 +233,17 @@ async def _handler(
233233

234234
return await self._handle_unary(context, _handler, a2a_v0_3_pb2.Task())
235235

236-
@validate_async_generator(
237-
lambda self: self.agent_card.capabilities.streaming,
238-
'Streaming is not supported by the agent',
239-
)
240236
async def TaskSubscription(
241237
self,
242238
request: a2a_v0_3_pb2.TaskSubscriptionRequest,
243239
context: grpc.aio.ServicerContext,
244240
) -> AsyncIterable[a2a_v0_3_pb2.StreamResponse]:
245241
"""Handles the 'TaskSubscription' gRPC method (v0.3)."""
246242

243+
@validate(
244+
lambda _: self.agent_card.capabilities.streaming,
245+
'Streaming is not supported by the agent',
246+
)
247247
async def _handler(
248248
server_context: ServerCallContext,
249249
) -> AsyncIterable[a2a_v0_3_pb2.StreamResponse]:
@@ -260,17 +260,17 @@ async def _handler(
260260
async for item in self._handle_stream(context, _handler):
261261
yield item
262262

263-
@validate(
264-
lambda self: self.agent_card.capabilities.push_notifications,
265-
'Push notifications are not supported by the agent',
266-
)
267263
async def CreateTaskPushNotificationConfig(
268264
self,
269265
request: a2a_v0_3_pb2.CreateTaskPushNotificationConfigRequest,
270266
context: grpc.aio.ServicerContext,
271267
) -> a2a_v0_3_pb2.TaskPushNotificationConfig:
272268
"""Handles the 'CreateTaskPushNotificationConfig' gRPC method (v0.3)."""
273269

270+
@validate(
271+
lambda _: self.agent_card.capabilities.push_notifications,
272+
'Push notifications are not supported by the agent',
273+
)
274274
async def _handler(
275275
server_context: ServerCallContext,
276276
) -> a2a_v0_3_pb2.TaskPushNotificationConfig:

src/a2a/compat/v0_3/rest_handler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
from a2a.utils import constants
3232
from a2a.utils.helpers import (
3333
validate,
34-
validate_async_generator,
3534
validate_version,
3635
)
3736
from a2a.utils.telemetry import SpanKind, trace_class
@@ -85,7 +84,7 @@ async def on_message_send(
8584
return MessageToDict(pb2_v03_resp)
8685

8786
@validate_version(constants.PROTOCOL_VERSION_0_3)
88-
@validate_async_generator(
87+
@validate(
8988
lambda self: self.agent_card.capabilities.streaming,
9089
'Streaming is not supported by the agent',
9190
)
@@ -143,7 +142,7 @@ async def on_cancel_task(
143142
return MessageToDict(pb2_v03_task)
144143

145144
@validate_version(constants.PROTOCOL_VERSION_0_3)
146-
@validate_async_generator(
145+
@validate(
147146
lambda self: self.agent_card.capabilities.streaming,
148147
'Streaming is not supported by the agent',
149148
)

0 commit comments

Comments
 (0)