Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/8694.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Enable OpenTelemetry distributed tracing in Manager by activating the global TracerProvider and instrumenting aiohttp server/client for W3C Trace Context propagation.
2 changes: 2 additions & 0 deletions src/ai/backend/agent/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,8 @@ async def service_discovery_ctx(
endpoint=local_config.otel.endpoint,
service_instance_id=meta.id,
service_instance_name=meta.display_name,
max_queue_size=local_config.otel.max_queue_size,
max_export_batch_size=local_config.otel.max_export_batch_size,
)
BraceStyleAdapter.apply_otel(otel_spec)
try:
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/appproxy/coordinator/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,8 @@ async def service_discovery_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
endpoint=root_ctx.local_config.otel.endpoint,
service_instance_id=meta.id,
service_instance_name=meta.display_name,
max_queue_size=root_ctx.local_config.otel.max_queue_size,
max_export_batch_size=root_ctx.local_config.otel.max_export_batch_size,
)
BraceStyleAdapter.apply_otel(otel_spec)
try:
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/appproxy/worker/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ async def service_discovery_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
endpoint=root_ctx.local_config.otel.endpoint,
service_instance_id=meta.id,
service_instance_name=meta.display_name,
max_queue_size=root_ctx.local_config.otel.max_queue_size,
max_export_batch_size=root_ctx.local_config.otel.max_export_batch_size,
)
BraceStyleAdapter.apply_otel(otel_spec)
try:
Expand Down
33 changes: 33 additions & 0 deletions src/ai/backend/common/configs/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,36 @@ class OTELConfig(BaseConfigSchema):
example=ConfigExample(local="http://localhost:4317", prod="http://otel-collector:4317"),
),
]
max_queue_size: Annotated[
int,
Field(
default=65536,
validation_alias=AliasChoices("max-queue-size", "max_queue_size"),
serialization_alias="max-queue-size",
),
BackendAIConfigMeta(
description=(
"Maximum number of spans queued for export. "
"Spans are dropped when the queue is full. "
"The default (65536) accommodates burst traffic from GraphQL workloads."
),
added_version="26.2.0",
example=ConfigExample(local="2048", prod="65536"),
),
]
max_export_batch_size: Annotated[
int,
Field(
default=4096,
validation_alias=AliasChoices("max-export-batch-size", "max_export_batch_size"),
serialization_alias="max-export-batch-size",
),
BackendAIConfigMeta(
description=(
"Maximum number of spans exported in a single batch. "
"Larger batches reduce export overhead but increase memory usage."
),
added_version="26.2.0",
example=ConfigExample(local="512", prod="4096"),
),
]
13 changes: 9 additions & 4 deletions src/ai/backend/logging/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections.abc import Iterable
from dataclasses import dataclass

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
Expand All @@ -24,6 +25,8 @@ class OpenTelemetrySpec:
endpoint: str
service_instance_id: uuid.UUID
service_instance_name: str
max_queue_size: int
max_export_batch_size: int

def to_resource(self) -> Resource:
attributes = {
Expand Down Expand Up @@ -59,21 +62,23 @@ def apply_otel_loggers(loggers: Iterable[logging.Logger], spec: OpenTelemetrySpe


def apply_otel_tracer(spec: OpenTelemetrySpec) -> None:
# TODO: Apply after the setup procedure is decoupled from aiohttp
tracer_provider = TracerProvider(resource=spec.to_resource())
span_exporter = OTLPSpanExporter(endpoint=spec.endpoint)
span_processor = BatchSpanProcessor(span_exporter)
span_processor = BatchSpanProcessor(
span_exporter,
max_queue_size=spec.max_queue_size,
max_export_batch_size=spec.max_export_batch_size,
)
tracer_provider.add_span_processor(span_processor)
trace.set_tracer_provider(tracer_provider)
logging.info("OpenTelemetry tracing initialized successfully.")


def instrument_aiohttp_server() -> None:
# TODO: Apply after the setup procedure is decoupled from aiohttp
AioHttpServerInstrumentor().instrument()
logging.info("OpenTelemetry tracing for aiohttp server initialized successfully.")


def instrument_aiohttp_client() -> None:
# TODO: Apply after the setup procedure is decoupled from aiohttp
AioHttpClientInstrumentor().instrument()
logging.info("OpenTelemetry tracing for aiohttp client initialized successfully.")
3 changes: 2 additions & 1 deletion src/ai/backend/logging/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ def trace(

@classmethod
def apply_otel(cls, spec: OpenTelemetrySpec) -> None:
from .otel import apply_otel_loggers
from .otel import apply_otel_loggers, apply_otel_tracer

apply_otel_loggers(cls._loggers, spec)
apply_otel_tracer(spec)


def enforce_debug_logging(loggers: Iterable[str]) -> None:
Expand Down
25 changes: 22 additions & 3 deletions src/ai/backend/manager/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import uvloop
from aiohttp import web
from aiohttp.typedefs import Handler, Middleware
from opentelemetry.instrumentation.aiohttp_server import (
middleware as otel_server_middleware,
)
from setproctitle import setproctitle
from zmq.auth.certs import load_certificate

Expand Down Expand Up @@ -138,7 +141,11 @@
)
from ai.backend.common.utils import env_info
from ai.backend.logging import BraceStyleAdapter, Logger, LogLevel
from ai.backend.logging.otel import OpenTelemetrySpec
from ai.backend.logging.otel import (
OpenTelemetrySpec,
instrument_aiohttp_client,
instrument_aiohttp_server,
)
from ai.backend.manager.server_gql_ctx import gql_adapters_ctx
from ai.backend.manager.sokovan.deployment.coordinator import DeploymentCoordinator
from ai.backend.manager.sokovan.deployment.route.coordinator import RouteCoordinator
Expand Down Expand Up @@ -837,13 +844,16 @@ async def service_discovery_ctx(root_ctx: RootContext) -> AsyncIterator[None]:

if root_ctx.config_provider.config.otel.enabled:
meta = root_ctx.sd_loop.metadata
otel_config = root_ctx.config_provider.config.otel
otel_spec = OpenTelemetrySpec(
service_name=meta.service_group,
service_version=meta.version,
log_level=root_ctx.config_provider.config.otel.log_level,
endpoint=root_ctx.config_provider.config.otel.endpoint,
log_level=otel_config.log_level,
endpoint=otel_config.endpoint,
service_instance_id=meta.id,
service_instance_name=meta.display_name,
max_queue_size=otel_config.max_queue_size,
max_export_batch_size=otel_config.max_export_batch_size,
)
BraceStyleAdapter.apply_otel(otel_spec)
try:
Expand Down Expand Up @@ -1797,6 +1807,15 @@ async def webapp_ctx(root_app: web.Application) -> AsyncGenerator[None]:
jwt_config = root_ctx.config_provider.config.jwt.to_jwt_config()
root_ctx.jwt_validator = JWTValidator(jwt_config)

# TODO: Remove manual middleware injection once the manager startup is
# decoupled from the aiohttp Application lifecycle. Currently root_app is
# instantiated before OTel config is available, so instrument_aiohttp_server()
# (which patches the class via setattr) cannot take effect automatically.
if root_ctx.config_provider.config.otel.enabled:
instrument_aiohttp_server()
instrument_aiohttp_client()
root_app.middlewares.insert(0, otel_server_middleware)

# Plugin webapps should be loaded before runner.setup() because root_app is frozen upon on_startup event.
await manager_init_stack.enter_async_context(webapp_plugin_ctx(root_app))
await manager_init_stack.enter_async_context(webapp_ctx(root_app))
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/storage/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ async def service_discovery_ctx(
endpoint=local_config.otel.endpoint,
service_instance_id=meta.id,
service_instance_name=meta.display_name,
max_queue_size=local_config.otel.max_queue_size,
max_export_batch_size=local_config.otel.max_export_batch_size,
)
BraceStyleAdapter.apply_otel(otel_spec)
try:
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/web/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,8 @@ async def service_discovery_ctx(config: WebServerUnifiedConfig) -> AsyncGenerato
endpoint=config.otel.endpoint,
service_instance_id=uuid.uuid4(),
service_instance_name=instance_name,
max_queue_size=config.otel.max_queue_size,
max_export_batch_size=config.otel.max_export_batch_size,
)
BraceStyleAdapter.apply_otel(otel_spec)
yield
Expand Down
Loading