Skip to content

Commit 0788e52

Browse files
feat(anthropic): Support span streaming (#6311)
Drop the `unknown_response` attribute in streaming lifecycle mode.
1 parent ad16009 commit 0788e52

3 files changed

Lines changed: 385 additions & 104 deletions

File tree

sentry_sdk/integrations/anthropic.py

Lines changed: 116 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
from sentry_sdk.consts import OP, SPANDATA
1818
from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version
1919
from sentry_sdk.scope import should_send_default_pii
20+
from sentry_sdk.traces import StreamedSpan
21+
from sentry_sdk.tracing import Span
22+
from sentry_sdk.tracing_utils import (
23+
has_span_streaming_enabled,
24+
should_truncate_gen_ai_input,
25+
)
2026
from sentry_sdk.utils import (
2127
capture_internal_exceptions,
2228
event_from_exception,
@@ -78,7 +84,6 @@
7884
)
7985

8086
from sentry_sdk._types import TextPart
81-
from sentry_sdk.tracing import Span
8287

8388

8489
class _RecordedUsage:
@@ -366,7 +371,7 @@ def _transform_system_instructions(
366371

367372

368373
def _set_common_input_data(
369-
span: "Span",
374+
span: "Union[Span, StreamedSpan]",
370375
integration: "AnthropicIntegration",
371376
max_tokens: "int",
372377
messages: "Iterable[MessageParam]",
@@ -380,16 +385,19 @@ def _set_common_input_data(
380385
"""
381386
Set input data for the span based on the provided keyword arguments for the anthropic message creation.
382387
"""
383-
span.set_data(SPANDATA.GEN_AI_SYSTEM, "anthropic")
384-
span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat")
388+
set_on_span = (
389+
span.set_attribute if isinstance(span, StreamedSpan) else span.set_data
390+
)
391+
set_on_span(SPANDATA.GEN_AI_SYSTEM, "anthropic")
392+
set_on_span(SPANDATA.GEN_AI_OPERATION_NAME, "chat")
385393
if (
386394
messages is not None
387395
and len(messages) > 0 # type: ignore
388396
and should_send_default_pii()
389397
and integration.include_prompts
390398
):
391399
if isinstance(system, str) or isinstance(system, Iterable):
392-
span.set_data(
400+
set_on_span(
393401
SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS,
394402
json.dumps(_transform_system_instructions(system)),
395403
)
@@ -442,37 +450,44 @@ def _set_common_input_data(
442450
client = sentry_sdk.get_client()
443451
scope = sentry_sdk.get_current_scope()
444452
messages_data = (
445-
role_normalized_messages
446-
if client.options.get("stream_gen_ai_spans", False)
447-
else truncate_and_annotate_messages(role_normalized_messages, span, scope)
453+
truncate_and_annotate_messages(role_normalized_messages, span, scope)
454+
if should_truncate_gen_ai_input(client.options)
455+
else role_normalized_messages
448456
)
449457
if messages_data is not None:
450458
set_data_normalized(
451459
span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False
452460
)
453461

454462
if max_tokens is not None and _is_given(max_tokens):
455-
span.set_data(SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)
463+
set_on_span(SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)
456464
if model is not None and _is_given(model):
457-
span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model)
465+
set_on_span(SPANDATA.GEN_AI_REQUEST_MODEL, model)
458466
if temperature is not None and _is_given(temperature):
459-
span.set_data(SPANDATA.GEN_AI_REQUEST_TEMPERATURE, temperature)
467+
set_on_span(SPANDATA.GEN_AI_REQUEST_TEMPERATURE, temperature)
460468
if top_k is not None and _is_given(top_k):
461-
span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_K, top_k)
469+
set_on_span(SPANDATA.GEN_AI_REQUEST_TOP_K, top_k)
462470
if top_p is not None and _is_given(top_p):
463-
span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_P, top_p)
471+
set_on_span(SPANDATA.GEN_AI_REQUEST_TOP_P, top_p)
464472

465473
if tools is not None and _is_given(tools) and len(tools) > 0: # type: ignore
466-
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))
474+
set_on_span(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))
467475

468476

469477
def _set_create_input_data(
470-
span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration"
478+
span: "Union[Span, StreamedSpan]",
479+
kwargs: "dict[str, Any]",
480+
integration: "AnthropicIntegration",
471481
) -> None:
472482
"""
473483
Set input data for the span based on the provided keyword arguments for the anthropic message creation.
474484
"""
475-
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, kwargs.get("stream", False))
485+
if isinstance(span, StreamedSpan):
486+
span.set_attribute(
487+
SPANDATA.GEN_AI_RESPONSE_STREAMING, kwargs.get("stream", False)
488+
)
489+
else:
490+
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, kwargs.get("stream", False))
476491

477492
_set_common_input_data(
478493
span=span,
@@ -549,7 +564,7 @@ async def _wrap_asynchronous_message_iterator(
549564

550565

551566
def _set_output_data(
552-
span: "Span",
567+
span: "Union[Span, StreamedSpan]",
553568
integration: "AnthropicIntegration",
554569
model: "str | None",
555570
input_tokens: "int | None",
@@ -562,12 +577,15 @@ def _set_output_data(
562577
) -> None:
563578
"""
564579
Set output data for the span based on the AI response."""
580+
set_on_span = (
581+
span.set_attribute if isinstance(span, StreamedSpan) else span.set_data
582+
)
565583
if model is not None:
566-
span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, model)
584+
set_on_span(SPANDATA.GEN_AI_RESPONSE_MODEL, model)
567585
if response_id is not None:
568-
span.set_data(SPANDATA.GEN_AI_RESPONSE_ID, response_id)
586+
set_on_span(SPANDATA.GEN_AI_RESPONSE_ID, response_id)
569587
if finish_reason is not None:
570-
span.set_data(SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS, [finish_reason])
588+
set_on_span(SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS, [finish_reason])
571589
if should_send_default_pii() and integration.include_prompts:
572590
output_messages: "dict[str, list[Any]]" = {
573591
"response": [],
@@ -620,12 +638,22 @@ def _sentry_patched_create_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any
620638

621639
model = kwargs.get("model", "")
622640

623-
span = get_start_span_function()(
624-
op=OP.GEN_AI_CHAT,
625-
name=f"chat {model}".strip(),
626-
origin=AnthropicIntegration.origin,
627-
)
628-
span.__enter__()
641+
span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options)
642+
if span_streaming:
643+
span = sentry_sdk.traces.start_span(
644+
name=f"chat {model}".strip(),
645+
attributes={
646+
"sentry.op": OP.GEN_AI_CHAT,
647+
"sentry.origin": AnthropicIntegration.origin,
648+
},
649+
)
650+
else:
651+
span = get_start_span_function()(
652+
op=OP.GEN_AI_CHAT,
653+
name=f"chat {model}".strip(),
654+
origin=AnthropicIntegration.origin,
655+
)
656+
span.__enter__()
629657

630658
_set_create_input_data(span, kwargs, integration)
631659

@@ -680,10 +708,10 @@ def _sentry_patched_create_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any
680708
response_id=getattr(result, "id", None),
681709
finish_reason=getattr(result, "stop_reason", None),
682710
)
683-
span.__exit__(None, None, None)
684-
else:
711+
elif isinstance(span, Span):
685712
span.set_data("unknown_response", True)
686-
span.__exit__(None, None, None)
713+
714+
span.__exit__(None, None, None)
687715

688716
return result
689717

@@ -708,12 +736,22 @@ async def _sentry_patched_create_async(
708736

709737
model = kwargs.get("model", "")
710738

711-
span = get_start_span_function()(
712-
op=OP.GEN_AI_CHAT,
713-
name=f"chat {model}".strip(),
714-
origin=AnthropicIntegration.origin,
715-
)
716-
span.__enter__()
739+
span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options)
740+
if span_streaming:
741+
span = sentry_sdk.traces.start_span(
742+
name=f"chat {model}".strip(),
743+
attributes={
744+
"sentry.op": OP.GEN_AI_CHAT,
745+
"sentry.origin": AnthropicIntegration.origin,
746+
},
747+
)
748+
else:
749+
span = get_start_span_function()(
750+
op=OP.GEN_AI_CHAT,
751+
name=f"chat {model}".strip(),
752+
origin=AnthropicIntegration.origin,
753+
)
754+
span.__enter__()
717755

718756
_set_create_input_data(span, kwargs, integration)
719757

@@ -768,10 +806,10 @@ async def _sentry_patched_create_async(
768806
response_id=getattr(result, "id", None),
769807
finish_reason=getattr(result, "stop_reason", None),
770808
)
771-
span.__exit__(None, None, None)
772-
else:
809+
elif isinstance(span, Span):
773810
span.set_data("unknown_response", True)
774-
span.__exit__(None, None, None)
811+
812+
span.__exit__(None, None, None)
775813

776814
return result
777815

@@ -929,7 +967,8 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream":
929967
if not hasattr(self, "_max_tokens"):
930968
return f(self)
931969

932-
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
970+
client = sentry_sdk.get_client()
971+
integration = client.get_integration(AnthropicIntegration)
933972

934973
if integration is None:
935974
return f(self)
@@ -942,14 +981,25 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream":
942981
except TypeError:
943982
return f(self)
944983

945-
span = get_start_span_function()(
946-
op=OP.GEN_AI_CHAT,
947-
name="chat" if self._model is None else f"chat {self._model}".strip(),
948-
origin=AnthropicIntegration.origin,
949-
)
950-
span.__enter__()
984+
if has_span_streaming_enabled(client.options):
985+
span = sentry_sdk.traces.start_span(
986+
name="chat" if self._model is None else f"chat {self._model}".strip(),
987+
attributes={
988+
"sentry.op": OP.GEN_AI_CHAT,
989+
"sentry.origin": AnthropicIntegration.origin,
990+
SPANDATA.GEN_AI_RESPONSE_STREAMING: True,
991+
},
992+
)
993+
else:
994+
span = get_start_span_function()(
995+
op=OP.GEN_AI_CHAT,
996+
name="chat" if self._model is None else f"chat {self._model}".strip(),
997+
origin=AnthropicIntegration.origin,
998+
)
999+
span.__enter__()
1000+
1001+
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
9511002

952-
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
9531003
_set_common_input_data(
9541004
span=span,
9551005
integration=integration,
@@ -1024,7 +1074,8 @@ async def _sentry_patched_aenter(
10241074
if not hasattr(self, "_max_tokens"):
10251075
return await f(self)
10261076

1027-
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
1077+
client = sentry_sdk.get_client()
1078+
integration = client.get_integration(AnthropicIntegration)
10281079

10291080
if integration is None:
10301081
return await f(self)
@@ -1037,14 +1088,25 @@ async def _sentry_patched_aenter(
10371088
except TypeError:
10381089
return await f(self)
10391090

1040-
span = get_start_span_function()(
1041-
op=OP.GEN_AI_CHAT,
1042-
name="chat" if self._model is None else f"chat {self._model}".strip(),
1043-
origin=AnthropicIntegration.origin,
1044-
)
1045-
span.__enter__()
1091+
if has_span_streaming_enabled(client.options):
1092+
span = sentry_sdk.traces.start_span(
1093+
name="chat" if self._model is None else f"chat {self._model}".strip(),
1094+
attributes={
1095+
"sentry.op": OP.GEN_AI_CHAT,
1096+
"sentry.origin": AnthropicIntegration.origin,
1097+
SPANDATA.GEN_AI_RESPONSE_STREAMING: True,
1098+
},
1099+
)
1100+
else:
1101+
span = get_start_span_function()(
1102+
op=OP.GEN_AI_CHAT,
1103+
name="chat" if self._model is None else f"chat {self._model}".strip(),
1104+
origin=AnthropicIntegration.origin,
1105+
)
1106+
span.__enter__()
1107+
1108+
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
10461109

1047-
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
10481110
_set_common_input_data(
10491111
span=span,
10501112
integration=integration,

sentry_sdk/tracing_utils.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,15 @@ def has_span_streaming_enabled(options: "Optional[dict[str, Any]]") -> bool:
116116
return (options.get("_experiments") or {}).get("trace_lifecycle") == "stream"
117117

118118

119+
def should_truncate_gen_ai_input(options: "Optional[dict[str, Any]]") -> bool:
120+
if options is None:
121+
return True
122+
123+
return not options.get(
124+
"stream_gen_ai_spans", False
125+
) and not has_span_streaming_enabled(options)
126+
127+
119128
@contextlib.contextmanager
120129
def record_sql_queries(
121130
cursor: "Any",

0 commit comments

Comments
 (0)