Skip to content

Commit c3e3acc

Browse files
feat(anthropic): Emit AI Client Spans for synchronous messages.stream() (#5565)
Patch `Messages.stream()` and `MessageStreamManager.__enter__()` to create AI Client Spans. Re-use existing code for setting attributes on AI Client Spans based on arguments to `anthropic` functions. Adapt tests that return a synchronous response stream with `create(stream=True)`.
1 parent c84b6d8 commit c3e3acc

File tree

2 files changed

+660
-28
lines changed

2 files changed

+660
-28
lines changed

sentry_sdk/integrations/anthropic.py

Lines changed: 142 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
from anthropic import Stream, AsyncStream
4141
from anthropic.resources import AsyncMessages, Messages
42+
from anthropic.lib.streaming import MessageStreamManager
4243

4344
from anthropic.types import (
4445
MessageStartEvent,
@@ -59,7 +60,14 @@
5960
from sentry_sdk.tracing import Span
6061
from sentry_sdk._types import TextPart
6162

62-
from anthropic.types import RawMessageStreamEvent
63+
from anthropic.types import (
64+
RawMessageStreamEvent,
65+
MessageParam,
66+
ModelParam,
67+
TextBlockParam,
68+
ToolUnionParam,
69+
)
70+
from anthropic.lib.streaming import MessageStream
6371

6472

6573
class _RecordedUsage:
@@ -84,6 +92,11 @@ def setup_once() -> None:
8492
Messages.create = _wrap_message_create(Messages.create)
8593
AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create)
8694

95+
Messages.stream = _wrap_message_stream(Messages.stream)
96+
MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter(
97+
MessageStreamManager.__enter__
98+
)
99+
87100

88101
def _capture_exception(exc: "Any") -> None:
89102
set_span_errored()
@@ -258,28 +271,33 @@ def _transform_system_instructions(
258271
]
259272

260273

261-
def _set_input_data(
262-
span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration"
274+
def _set_common_input_data(
275+
span: "Span",
276+
integration: "AnthropicIntegration",
277+
max_tokens: "int",
278+
messages: "Iterable[MessageParam]",
279+
model: "ModelParam",
280+
system: "Optional[Union[str, Iterable[TextBlockParam]]]",
281+
temperature: "Optional[float]",
282+
top_k: "Optional[int]",
283+
top_p: "Optional[float]",
284+
tools: "Optional[Iterable[ToolUnionParam]]",
263285
) -> None:
264286
"""
265287
Set input data for the span based on the provided keyword arguments for the anthropic message creation.
266288
"""
267289
span.set_data(SPANDATA.GEN_AI_SYSTEM, "anthropic")
268290
span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat")
269-
system_instructions: "Union[str, Iterable[TextBlockParam]]" = kwargs.get("system") # type: ignore
270-
messages = kwargs.get("messages")
271291
if (
272292
messages is not None
273-
and len(messages) > 0
293+
and len(messages) > 0 # type: ignore
274294
and should_send_default_pii()
275295
and integration.include_prompts
276296
):
277-
if isinstance(system_instructions, str) or isinstance(
278-
system_instructions, Iterable
279-
):
297+
if isinstance(system, str) or isinstance(system, Iterable):
280298
span.set_data(
281299
SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS,
282-
json.dumps(_transform_system_instructions(system_instructions)),
300+
json.dumps(_transform_system_instructions(system)),
283301
)
284302

285303
normalized_messages = []
@@ -335,25 +353,41 @@ def _set_input_data(
335353
span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False
336354
)
337355

356+
if max_tokens is not None and _is_given(max_tokens):
357+
span.set_data(SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)
358+
if model is not None and _is_given(model):
359+
span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model)
360+
if temperature is not None and _is_given(temperature):
361+
span.set_data(SPANDATA.GEN_AI_REQUEST_TEMPERATURE, temperature)
362+
if top_k is not None and _is_given(top_k):
363+
span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_K, top_k)
364+
if top_p is not None and _is_given(top_p):
365+
span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_P, top_p)
366+
367+
if tools is not None and _is_given(tools) and len(tools) > 0: # type: ignore
368+
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))
369+
370+
371+
def _set_create_input_data(
372+
span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration"
373+
) -> None:
374+
"""
375+
Set input data for the span based on the provided keyword arguments for the anthropic message creation.
376+
"""
338377
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, kwargs.get("stream", False))
339378

340-
kwargs_keys_to_attributes = {
341-
"max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS,
342-
"model": SPANDATA.GEN_AI_REQUEST_MODEL,
343-
"temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE,
344-
"top_k": SPANDATA.GEN_AI_REQUEST_TOP_K,
345-
"top_p": SPANDATA.GEN_AI_REQUEST_TOP_P,
346-
}
347-
for key, attribute in kwargs_keys_to_attributes.items():
348-
value = kwargs.get(key)
349-
350-
if value is not None and _is_given(value):
351-
span.set_data(attribute, value)
352-
353-
# Input attributes: Tools
354-
tools = kwargs.get("tools")
355-
if tools is not None and _is_given(tools) and len(tools) > 0:
356-
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))
379+
_set_common_input_data(
380+
span=span,
381+
integration=integration,
382+
max_tokens=kwargs.get("max_tokens"), # type: ignore
383+
messages=kwargs.get("messages"), # type: ignore
384+
model=kwargs.get("model"),
385+
system=kwargs.get("system"),
386+
temperature=kwargs.get("temperature"),
387+
top_k=kwargs.get("top_k"),
388+
top_p=kwargs.get("top_p"),
389+
tools=kwargs.get("tools"),
390+
)
357391

358392

359393
def _wrap_synchronous_message_iterator(
@@ -566,7 +600,7 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A
566600
)
567601
span.__enter__()
568602

569-
_set_input_data(span, kwargs, integration)
603+
_set_create_input_data(span, kwargs, integration)
570604

571605
result = yield f, args, kwargs
572606

@@ -695,6 +729,86 @@ async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any":
695729
return _sentry_patched_create_async
696730

697731

732+
def _wrap_message_stream(f: "Any") -> "Any":
733+
"""
734+
Attaches user-provided arguments to the returned context manager.
735+
The attributes are set on AI Client Spans in the patch for the context manager.
736+
"""
737+
738+
@wraps(f)
739+
def _sentry_patched_stream(*args: "Any", **kwargs: "Any") -> "MessageStreamManager":
740+
stream_manager = f(*args, **kwargs)
741+
742+
stream_manager._max_tokens = kwargs.get("max_tokens")
743+
stream_manager._messages = kwargs.get("messages")
744+
stream_manager._model = kwargs.get("model")
745+
stream_manager._system = kwargs.get("system")
746+
stream_manager._temperature = kwargs.get("temperature")
747+
stream_manager._top_k = kwargs.get("top_k")
748+
stream_manager._top_p = kwargs.get("top_p")
749+
stream_manager._tools = kwargs.get("tools")
750+
751+
return stream_manager
752+
753+
return _sentry_patched_stream
754+
755+
756+
def _wrap_message_stream_manager_enter(f: "Any") -> "Any":
757+
"""
758+
Creates and manages AI Client Spans.
759+
"""
760+
761+
@wraps(f)
762+
def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream":
763+
stream = f(self)
764+
if not hasattr(self, "_max_tokens"):
765+
return stream
766+
767+
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
768+
769+
if integration is None:
770+
return stream
771+
772+
if self._messages is None:
773+
return stream
774+
775+
try:
776+
iter(self._messages)
777+
except TypeError:
778+
return stream
779+
780+
span = get_start_span_function()(
781+
op=OP.GEN_AI_CHAT,
782+
name="chat" if self._model is None else f"chat {self._model}".strip(),
783+
origin=AnthropicIntegration.origin,
784+
)
785+
span.__enter__()
786+
787+
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
788+
_set_common_input_data(
789+
span=span,
790+
integration=integration,
791+
max_tokens=self._max_tokens,
792+
messages=self._messages,
793+
model=self._model,
794+
system=self._system,
795+
temperature=self._temperature,
796+
top_k=self._top_k,
797+
top_p=self._top_p,
798+
tools=self._tools,
799+
)
800+
801+
stream._iterator = _wrap_synchronous_message_iterator(
802+
iterator=stream._iterator,
803+
span=span,
804+
integration=integration,
805+
)
806+
807+
return stream
808+
809+
return _sentry_patched_enter
810+
811+
698812
def _is_given(obj: "Any") -> bool:
699813
"""
700814
Check for givenness safely across different anthropic versions.

0 commit comments

Comments
 (0)