diff --git a/changes/8694.feature.md b/changes/8694.feature.md new file mode 100644 index 00000000000..7f081d47d24 --- /dev/null +++ b/changes/8694.feature.md @@ -0,0 +1 @@ +Enable OpenTelemetry distributed tracing in Manager by activating the global TracerProvider and instrumenting aiohttp server/client for W3C Trace Context propagation. diff --git a/src/ai/backend/agent/server.py b/src/ai/backend/agent/server.py index 86ec46bfc41..bb496619a6f 100644 --- a/src/ai/backend/agent/server.py +++ b/src/ai/backend/agent/server.py @@ -1521,6 +1521,8 @@ async def service_discovery_ctx( endpoint=local_config.otel.endpoint, service_instance_id=meta.id, service_instance_name=meta.display_name, + max_queue_size=local_config.otel.max_queue_size, + max_export_batch_size=local_config.otel.max_export_batch_size, ) BraceStyleAdapter.apply_otel(otel_spec) try: diff --git a/src/ai/backend/appproxy/coordinator/server.py b/src/ai/backend/appproxy/coordinator/server.py index a5f75c2354c..f5d706f758f 100644 --- a/src/ai/backend/appproxy/coordinator/server.py +++ b/src/ai/backend/appproxy/coordinator/server.py @@ -767,6 +767,8 @@ async def service_discovery_ctx(root_ctx: RootContext) -> AsyncIterator[None]: endpoint=root_ctx.local_config.otel.endpoint, service_instance_id=meta.id, service_instance_name=meta.display_name, + max_queue_size=root_ctx.local_config.otel.max_queue_size, + max_export_batch_size=root_ctx.local_config.otel.max_export_batch_size, ) BraceStyleAdapter.apply_otel(otel_spec) try: diff --git a/src/ai/backend/appproxy/worker/server.py b/src/ai/backend/appproxy/worker/server.py index fa1577783d5..32cc3b644f0 100644 --- a/src/ai/backend/appproxy/worker/server.py +++ b/src/ai/backend/appproxy/worker/server.py @@ -560,6 +560,8 @@ async def service_discovery_ctx(root_ctx: RootContext) -> AsyncIterator[None]: endpoint=root_ctx.local_config.otel.endpoint, service_instance_id=meta.id, service_instance_name=meta.display_name, + max_queue_size=root_ctx.local_config.otel.max_queue_size, + max_export_batch_size=root_ctx.local_config.otel.max_export_batch_size, ) BraceStyleAdapter.apply_otel(otel_spec) try: diff --git a/src/ai/backend/common/configs/otel.py b/src/ai/backend/common/configs/otel.py index 7fe03bea62d..11d2cd080b9 100644 --- a/src/ai/backend/common/configs/otel.py +++ b/src/ai/backend/common/configs/otel.py @@ -58,3 +58,36 @@ class OTELConfig(BaseConfigSchema): example=ConfigExample(local="http://localhost:4317", prod="http://otel-collector:4317"), ), ] + max_queue_size: Annotated[ + int, + Field( + default=65536, + validation_alias=AliasChoices("max-queue-size", "max_queue_size"), + serialization_alias="max-queue-size", + ), + BackendAIConfigMeta( + description=( + "Maximum number of spans queued for export. " + "Spans are dropped when the queue is full. " + "The default (65536) accommodates burst traffic from GraphQL workloads." + ), + added_version="26.2.0", + example=ConfigExample(local="2048", prod="65536"), + ), + ] + max_export_batch_size: Annotated[ + int, + Field( + default=4096, + validation_alias=AliasChoices("max-export-batch-size", "max_export_batch_size"), + serialization_alias="max-export-batch-size", + ), + BackendAIConfigMeta( + description=( + "Maximum number of spans exported in a single batch. " + "Larger batches reduce export overhead but increase memory usage." + ), + added_version="26.2.0", + example=ConfigExample(local="512", prod="4096"), + ), + ] diff --git a/src/ai/backend/logging/otel.py b/src/ai/backend/logging/otel.py index ad6912b2541..9a2f6c6c7fa 100644 --- a/src/ai/backend/logging/otel.py +++ b/src/ai/backend/logging/otel.py @@ -3,6 +3,7 @@ from collections.abc import Iterable from dataclasses import dataclass +from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor @@ -24,6 +25,8 @@ class OpenTelemetrySpec: endpoint: str service_instance_id: uuid.UUID service_instance_name: str + max_queue_size: int + max_export_batch_size: int def to_resource(self) -> Resource: attributes = { @@ -59,21 +62,23 @@ def apply_otel_loggers(loggers: Iterable[logging.Logger], spec: OpenTelemetrySpe def apply_otel_tracer(spec: OpenTelemetrySpec) -> None: - # TODO: Apply after the setup procedure is decoupled from aiohttp tracer_provider = TracerProvider(resource=spec.to_resource()) span_exporter = OTLPSpanExporter(endpoint=spec.endpoint) - span_processor = BatchSpanProcessor(span_exporter) + span_processor = BatchSpanProcessor( + span_exporter, + max_queue_size=spec.max_queue_size, + max_export_batch_size=spec.max_export_batch_size, + ) tracer_provider.add_span_processor(span_processor) + trace.set_tracer_provider(tracer_provider) logging.info("OpenTelemetry tracing initialized successfully.") def instrument_aiohttp_server() -> None: - # TODO: Apply after the setup procedure is decoupled from aiohttp AioHttpServerInstrumentor().instrument() logging.info("OpenTelemetry tracing for aiohttp server initialized successfully.") def instrument_aiohttp_client() -> None: - # TODO: Apply after the setup procedure is decoupled from aiohttp AioHttpClientInstrumentor().instrument() logging.info("OpenTelemetry tracing for aiohttp client initialized successfully.") diff --git a/src/ai/backend/logging/utils.py b/src/ai/backend/logging/utils.py index f824ffa1707..27c6cbf7e14 100644 --- a/src/ai/backend/logging/utils.py +++ b/src/ai/backend/logging/utils.py @@ -124,9 +124,10 @@ def trace( @classmethod def apply_otel(cls, spec: OpenTelemetrySpec) -> None: - from .otel import apply_otel_loggers + from .otel import apply_otel_loggers, apply_otel_tracer apply_otel_loggers(cls._loggers, spec) + apply_otel_tracer(spec) def enforce_debug_logging(loggers: Iterable[str]) -> None: diff --git a/src/ai/backend/manager/server.py b/src/ai/backend/manager/server.py index 23ae9a94750..3ec4a1f2606 100644 --- a/src/ai/backend/manager/server.py +++ b/src/ai/backend/manager/server.py @@ -41,6 +41,9 @@ import uvloop from aiohttp import web from aiohttp.typedefs import Handler, Middleware +from opentelemetry.instrumentation.aiohttp_server import ( + middleware as otel_server_middleware, +) from setproctitle import setproctitle from zmq.auth.certs import load_certificate @@ -138,7 +141,11 @@ ) from ai.backend.common.utils import env_info from ai.backend.logging import BraceStyleAdapter, Logger, LogLevel -from ai.backend.logging.otel import OpenTelemetrySpec +from ai.backend.logging.otel import ( + OpenTelemetrySpec, + instrument_aiohttp_client, + instrument_aiohttp_server, +) from ai.backend.manager.server_gql_ctx import gql_adapters_ctx from ai.backend.manager.sokovan.deployment.coordinator import DeploymentCoordinator from ai.backend.manager.sokovan.deployment.route.coordinator import RouteCoordinator @@ -837,13 +844,16 @@ async def service_discovery_ctx(root_ctx: RootContext) -> AsyncIterator[None]: if root_ctx.config_provider.config.otel.enabled: meta = root_ctx.sd_loop.metadata + otel_config = root_ctx.config_provider.config.otel otel_spec = OpenTelemetrySpec( service_name=meta.service_group, service_version=meta.version, - log_level=root_ctx.config_provider.config.otel.log_level, - endpoint=root_ctx.config_provider.config.otel.endpoint, + log_level=otel_config.log_level, + endpoint=otel_config.endpoint, service_instance_id=meta.id, service_instance_name=meta.display_name, + max_queue_size=otel_config.max_queue_size, + max_export_batch_size=otel_config.max_export_batch_size, ) BraceStyleAdapter.apply_otel(otel_spec) try: @@ -1797,6 +1807,15 @@ async def webapp_ctx(root_app: web.Application) -> AsyncGenerator[None]: jwt_config = root_ctx.config_provider.config.jwt.to_jwt_config() root_ctx.jwt_validator = JWTValidator(jwt_config) + # TODO: Remove manual middleware injection once the manager startup is + # decoupled from the aiohttp Application lifecycle. Currently root_app is + # instantiated before OTel config is available, so instrument_aiohttp_server() + # (which patches the class via setattr) cannot take effect automatically. + if root_ctx.config_provider.config.otel.enabled: + instrument_aiohttp_server() + instrument_aiohttp_client() + root_app.middlewares.insert(0, otel_server_middleware) + # Plugin webapps should be loaded before runner.setup() because root_app is frozen upon on_startup event. await manager_init_stack.enter_async_context(webapp_plugin_ctx(root_app)) await manager_init_stack.enter_async_context(webapp_ctx(root_app)) diff --git a/src/ai/backend/storage/server.py b/src/ai/backend/storage/server.py index 67c3f9272b1..5a0f0541a87 100644 --- a/src/ai/backend/storage/server.py +++ b/src/ai/backend/storage/server.py @@ -543,6 +543,8 @@ async def service_discovery_ctx( endpoint=local_config.otel.endpoint, service_instance_id=meta.id, service_instance_name=meta.display_name, + max_queue_size=local_config.otel.max_queue_size, + max_export_batch_size=local_config.otel.max_export_batch_size, ) BraceStyleAdapter.apply_otel(otel_spec) try: diff --git a/src/ai/backend/web/server.py b/src/ai/backend/web/server.py index 38600844354..3de0f3f7e64 100644 --- a/src/ai/backend/web/server.py +++ b/src/ai/backend/web/server.py @@ -917,6 +917,8 @@ async def service_discovery_ctx(config: WebServerUnifiedConfig) -> AsyncGenerato endpoint=config.otel.endpoint, service_instance_id=uuid.uuid4(), service_instance_name=instance_name, + max_queue_size=config.otel.max_queue_size, + max_export_batch_size=config.otel.max_export_batch_size, ) BraceStyleAdapter.apply_otel(otel_spec) yield