From 27cd3309eee4373dab7bd4cdb4c66afa44213248 Mon Sep 17 00:00:00 2001 From: David Golightly Date: Fri, 19 Dec 2025 12:12:52 -0800 Subject: [PATCH 01/10] feat(opentelemetry): add protobuf protocol support for OTLP exporters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for Protocol Buffers binary encoding as an alternative to JSON for OTLP trace, metrics, and log exports. This enables more efficient wire format when communicating with OpenTelemetry collectors. - Add `protocol` option ("json" | "protobuf") to Otlp.layer and individual OtlpTracer, OtlpMetrics, OtlpLogger layers - Implement protobuf wire format encoding following opentelemetry-proto specs - Set appropriate Content-Type header (application/x-protobuf vs application/json) - No new dependencies - protobuf encoding implemented from scratch 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .changeset/add-otlp-protobuf-support.md | 22 + packages/opentelemetry/src/Otlp.ts | 22 +- packages/opentelemetry/src/OtlpLogger.ts | 28 + packages/opentelemetry/src/OtlpMetrics.ts | 63 ++ packages/opentelemetry/src/OtlpTracer.ts | 16 + .../src/internal/otlpExporter.ts | 28 +- .../src/internal/otlpProtobuf.ts | 737 ++++++++++++++++++ .../opentelemetry/src/internal/protobuf.ts | 226 ++++++ packages/opentelemetry/test/Protobuf.test.ts | 338 ++++++++ 9 files changed, 1473 insertions(+), 7 deletions(-) create mode 100644 .changeset/add-otlp-protobuf-support.md create mode 100644 packages/opentelemetry/src/internal/otlpProtobuf.ts create mode 100644 packages/opentelemetry/src/internal/protobuf.ts create mode 100644 packages/opentelemetry/test/Protobuf.test.ts diff --git a/.changeset/add-otlp-protobuf-support.md b/.changeset/add-otlp-protobuf-support.md new file mode 100644 index 00000000000..56b6d9fb4cc --- /dev/null +++ b/.changeset/add-otlp-protobuf-support.md @@ -0,0 +1,22 @@ +--- +"@effect/opentelemetry": minor +--- + +Add protobuf protocol support for OTLP exporters. + +This adds a `protocol` option to `Otlp.layer`, `OtlpTracer.layer`, `OtlpMetrics.layer`, and `OtlpLogger.layer` that allows choosing between JSON (default) and Protocol Buffers binary encoding when exporting telemetry data to OpenTelemetry collectors. + +```typescript +import { Otlp } from "@effect/opentelemetry" + +// Use protobuf encoding for more efficient wire format +Otlp.layer({ + baseUrl: "http://localhost:4318", + protocol: "protobuf", + resource: { serviceName: "my-service" } +}) +``` + +- No new dependencies - protobuf encoding implemented from scratch +- Sets appropriate Content-Type header (`application/x-protobuf` vs `application/json`) +- Follows opentelemetry-proto specifications diff --git a/packages/opentelemetry/src/Otlp.ts b/packages/opentelemetry/src/Otlp.ts index 7da41d43fdc..ae1c39012e4 100644 --- a/packages/opentelemetry/src/Otlp.ts +++ b/packages/opentelemetry/src/Otlp.ts @@ -8,10 +8,22 @@ import type * as Duration from "effect/Duration" import * as Layer from "effect/Layer" import type * as Logger from "effect/Logger" import type * as Tracer from "effect/Tracer" +import type { OtlpProtocol as _OtlpProtocol } from "./internal/otlpExporter.js" import * as OtlpLogger from "./OtlpLogger.js" import * as OtlpMetrics from "./OtlpMetrics.js" import * as OtlpTracer from "./OtlpTracer.js" +/** + * OTLP protocol type for encoding telemetry data. + * + * - `"json"`: JSON encoding (default) - uses `application/json` content type + * - `"protobuf"`: Protocol Buffers binary encoding - uses `application/x-protobuf` content type + * + * @since 1.0.0 + * @category Models + */ +export type OtlpProtocol = _OtlpProtocol + /** * @since 1.0.0 * @category Layers @@ -32,6 +44,7 @@ export const layer = (options: { readonly metricsExportInterval?: Duration.DurationInput | undefined readonly tracerExportInterval?: Duration.DurationInput | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined + readonly protocol?: OtlpProtocol | undefined }): Layer.Layer => { const baseReq = HttpClientRequest.get(options.baseUrl) const url = (path: string) => HttpClientRequest.appendUrl(baseReq, path).url @@ -44,14 +57,16 @@ export const layer = (options: { exportInterval: options.loggerExportInterval, maxBatchSize: options.maxBatchSize, shutdownTimeout: options.shutdownTimeout, - excludeLogSpans: options.loggerExcludeLogSpans + excludeLogSpans: options.loggerExcludeLogSpans, + protocol: options.protocol }), OtlpMetrics.layer({ url: url("/v1/metrics"), resource: options.resource, headers: options.headers, exportInterval: options.metricsExportInterval, - shutdownTimeout: options.shutdownTimeout + shutdownTimeout: options.shutdownTimeout, + protocol: options.protocol }), OtlpTracer.layer({ url: url("/v1/traces"), @@ -60,7 +75,8 @@ export const layer = (options: { exportInterval: options.tracerExportInterval, maxBatchSize: options.maxBatchSize, context: options.tracerContext, - shutdownTimeout: options.shutdownTimeout + shutdownTimeout: options.shutdownTimeout, + protocol: options.protocol }) ) } diff --git a/packages/opentelemetry/src/OtlpLogger.ts b/packages/opentelemetry/src/OtlpLogger.ts index 8db8db422b7..900295fa2f3 100644 --- a/packages/opentelemetry/src/OtlpLogger.ts +++ b/packages/opentelemetry/src/OtlpLogger.ts @@ -18,6 +18,8 @@ import * as Option from "effect/Option" import type * as Scope from "effect/Scope" import * as Tracer from "effect/Tracer" import * as Exporter from "./internal/otlpExporter.js" +import type { OtlpProtocol } from "./internal/otlpExporter.js" +import * as OtlpProtobuf from "./internal/otlpProtobuf.js" import type { AnyValue, Fixed64, KeyValue, Resource } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" @@ -38,6 +40,7 @@ export const make: ( readonly maxBatchSize?: number | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined readonly excludeLogSpans?: boolean | undefined + readonly protocol?: OtlpProtocol | undefined } ) => Effect.Effect< Logger.Logger, @@ -55,6 +58,7 @@ export const make: ( headers: options.headers, maxBatchSize: options.maxBatchSize ?? 1000, exportInterval: options.exportInterval ?? Duration.seconds(1), + protocol: options.protocol, body: (data): IExportLogsServiceRequest => ({ resourceLogs: [{ resource: otelResource, @@ -64,6 +68,29 @@ export const make: ( }] }] }), + bodyProtobuf: (data: Array): Uint8Array => + OtlpProtobuf.encodeLogsData({ + resourceLogs: [{ + resource: otelResource, + scopeLogs: [{ + scope, + logRecords: data.map((record) => ({ + timeUnixNano: String(record.timeUnixNano), + observedTimeUnixNano: record.observedTimeUnixNano !== undefined + ? String(record.observedTimeUnixNano) + : undefined, + severityNumber: record.severityNumber, + severityText: record.severityText, + body: record.body, + attributes: record.attributes, + droppedAttributesCount: record.droppedAttributesCount, + flags: record.flags, + traceId: typeof record.traceId === "string" ? record.traceId : undefined, + spanId: typeof record.spanId === "string" ? record.spanId : undefined + })) + }] + }] + }), shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3) }) @@ -92,6 +119,7 @@ export const layer = (options: { readonly maxBatchSize?: number | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined readonly excludeLogSpans?: boolean | undefined + readonly protocol?: OtlpProtocol | undefined }): Layer.Layer => options.replaceLogger ? Logger.replaceScoped(options.replaceLogger, make(options)) : Logger.addScoped(make(options)) diff --git a/packages/opentelemetry/src/OtlpMetrics.ts b/packages/opentelemetry/src/OtlpMetrics.ts index e6fa62da432..84e72f3fbc0 100644 --- a/packages/opentelemetry/src/OtlpMetrics.ts +++ b/packages/opentelemetry/src/OtlpMetrics.ts @@ -13,6 +13,8 @@ import * as MetricState from "effect/MetricState" import * as Option from "effect/Option" import type * as Scope from "effect/Scope" import * as Exporter from "./internal/otlpExporter.js" +import type { OtlpProtocol } from "./internal/otlpExporter.js" +import * as OtlpProtobuf from "./internal/otlpProtobuf.js" import type { Fixed64, KeyValue } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" @@ -30,6 +32,7 @@ export const make: (options: { readonly headers?: Headers.Input | undefined readonly exportInterval?: Duration.DurationInput | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined + readonly protocol?: OtlpProtocol | undefined }) => Effect.Effect< void, never, @@ -265,13 +268,72 @@ export const make: (options: { } } + const snapshotProtobuf = (): Uint8Array => { + const data = snapshot() + return OtlpProtobuf.encodeMetricsData({ + resourceMetrics: data.resourceMetrics.map((rm) => ({ + resource: rm.resource!, + scopeMetrics: rm.scopeMetrics.map((sm) => ({ + scope: sm.scope!, + metrics: sm.metrics.map((m) => ({ + name: m.name, + description: m.description, + unit: m.unit, + gauge: m.gauge + ? { + dataPoints: m.gauge.dataPoints.map((dp) => ({ + attributes: dp.attributes, + startTimeUnixNano: String(dp.startTimeUnixNano ?? "0"), + timeUnixNano: String(dp.timeUnixNano ?? "0"), + asDouble: dp.asDouble !== null ? dp.asDouble : undefined, + asInt: dp.asInt !== undefined ? String(dp.asInt) : undefined + })) + } + : undefined, + sum: m.sum + ? { + dataPoints: m.sum.dataPoints.map((dp) => ({ + attributes: dp.attributes, + startTimeUnixNano: String(dp.startTimeUnixNano ?? "0"), + timeUnixNano: String(dp.timeUnixNano ?? "0"), + asDouble: dp.asDouble !== null ? dp.asDouble : undefined, + asInt: dp.asInt !== undefined ? String(dp.asInt) : undefined + })), + aggregationTemporality: m.sum.aggregationTemporality, + isMonotonic: m.sum.isMonotonic + } + : undefined, + histogram: m.histogram + ? { + dataPoints: m.histogram.dataPoints.map((dp) => ({ + attributes: dp.attributes ?? [], + startTimeUnixNano: String(dp.startTimeUnixNano ?? "0"), + timeUnixNano: String(dp.timeUnixNano ?? "0"), + count: dp.count ?? 0, + sum: dp.sum, + bucketCounts: dp.bucketCounts ?? [], + explicitBounds: dp.explicitBounds ?? [], + min: dp.min, + max: dp.max + })), + aggregationTemporality: m.histogram.aggregationTemporality ?? OtlpProtobuf.AggregationTemporality.Cumulative + } + : undefined + })) + })) + })) + }) + } + yield* Exporter.make({ label: "OtlpMetrics", url: options.url, headers: options.headers, maxBatchSize: "disabled", exportInterval: options.exportInterval ?? Duration.seconds(10), + protocol: options.protocol, body: snapshot, + bodyProtobuf: snapshotProtobuf, shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3) }) }) @@ -290,6 +352,7 @@ export const layer = (options: { readonly headers?: Headers.Input | undefined readonly exportInterval?: Duration.DurationInput | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined + readonly protocol?: OtlpProtocol | undefined }): Layer.Layer => Layer.scopedDiscard(make(options)) // internal diff --git a/packages/opentelemetry/src/OtlpTracer.ts b/packages/opentelemetry/src/OtlpTracer.ts index 97f8e90a7b3..af5c0e5a6ca 100644 --- a/packages/opentelemetry/src/OtlpTracer.ts +++ b/packages/opentelemetry/src/OtlpTracer.ts @@ -14,6 +14,8 @@ import type * as Scope from "effect/Scope" import * as Tracer from "effect/Tracer" import type { ExtractTag } from "effect/Types" import * as Exporter from "./internal/otlpExporter.js" +import type { OtlpProtocol } from "./internal/otlpExporter.js" +import * as OtlpProtobuf from "./internal/otlpProtobuf.js" import type { KeyValue, Resource } from "./OtlpResource.js" import { entriesToAttributes } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" @@ -39,6 +41,7 @@ export const make: ( readonly maxBatchSize?: number | undefined readonly context?: ((f: () => X, span: Tracer.AnySpan) => X) | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined + readonly protocol?: OtlpProtocol | undefined } ) => Effect.Effect< Tracer.Tracer, @@ -56,6 +59,7 @@ export const make: ( headers: options.headers, exportInterval: options.exportInterval ?? Duration.seconds(5), maxBatchSize: options.maxBatchSize ?? 1000, + protocol: options.protocol, body(spans) { const data: TraceData = { resourceSpans: [{ @@ -68,6 +72,17 @@ export const make: ( } return data }, + bodyProtobuf(spans) { + return OtlpProtobuf.encodeTracesData({ + resourceSpans: [{ + resource: otelResource, + scopeSpans: [{ + scope, + spans + }] + }] + }) + }, shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3) }) @@ -119,6 +134,7 @@ export const layer = (options: { readonly maxBatchSize?: number | undefined readonly context?: ((f: () => X, span: Tracer.AnySpan) => X) | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined + readonly protocol?: OtlpProtocol | undefined }): Layer.Layer => Layer.unwrapScoped(Effect.map(make(options), Layer.setTracer)) // internal diff --git a/packages/opentelemetry/src/internal/otlpExporter.ts b/packages/opentelemetry/src/internal/otlpExporter.ts index 877961afcb3..ecc20667ea5 100644 --- a/packages/opentelemetry/src/internal/otlpExporter.ts +++ b/packages/opentelemetry/src/internal/otlpExporter.ts @@ -1,4 +1,5 @@ import * as Headers from "@effect/platform/Headers" +import * as HttpBody from "@effect/platform/HttpBody" import * as HttpClient from "@effect/platform/HttpClient" import * as HttpClientError from "@effect/platform/HttpClientError" import * as HttpClientRequest from "@effect/platform/HttpClientRequest" @@ -10,6 +11,12 @@ import * as Option from "effect/Option" import * as Schedule from "effect/Schedule" import * as Scope from "effect/Scope" +/** + * OTLP protocol type for encoding + * @internal + */ +export type OtlpProtocol = "json" | "protobuf" + const policy = Schedule.forever.pipe( Schedule.passthrough, Schedule.addDelay((error) => { @@ -37,6 +44,8 @@ export const make: ( readonly exportInterval: Duration.DurationInput readonly maxBatchSize: number | "disabled" readonly body: (data: Array) => unknown + readonly bodyProtobuf?: ((data: Array) => Uint8Array) | undefined + readonly protocol?: OtlpProtocol | undefined readonly shutdownTimeout: Duration.DurationInput } ) => Effect.Effect< @@ -53,8 +62,14 @@ export const make: ( HttpClient.retryTransient({ schedule: policy, times: 3 }) ) + const protocol = options.protocol ?? "json" + const contentType = protocol === "protobuf" + ? "application/x-protobuf" + : "application/json" + let headers = Headers.unsafeFromRecord({ - "user-agent": `effect-opentelemetry-${options.label}/0.0.0` + "user-agent": `effect-opentelemetry-${options.label}/0.0.0`, + "content-type": contentType }) if (options.headers) { headers = Headers.merge(Headers.fromInput(options.headers), headers) @@ -75,9 +90,14 @@ export const make: ( } buffer = [] } - return client.execute( - HttpClientRequest.bodyUnsafeJson(request, options.body(items)) - ).pipe( + const requestWithBody = protocol === "protobuf" && options.bodyProtobuf + ? HttpClientRequest.setBody( + request, + HttpBody.uint8Array(options.bodyProtobuf(items), contentType) + ) + : HttpClientRequest.bodyUnsafeJson(request, options.body(items)) + + return client.execute(requestWithBody).pipe( Effect.asVoid, Effect.withTracerEnabled(false) ) diff --git a/packages/opentelemetry/src/internal/otlpProtobuf.ts b/packages/opentelemetry/src/internal/otlpProtobuf.ts new file mode 100644 index 00000000000..de5eb8871cc --- /dev/null +++ b/packages/opentelemetry/src/internal/otlpProtobuf.ts @@ -0,0 +1,737 @@ +/** + * OTLP Protobuf encoding for traces, metrics, and logs. + * + * Implements the protobuf wire format according to: + * https://github.com/open-telemetry/opentelemetry-proto + * + * @internal + */ + +import type { AnyValue, KeyValue, Resource } from "../OtlpResource.js" +import * as Proto from "./protobuf.js" + +// Common types (opentelemetry.proto.common.v1) + +/** + * Encodes an AnyValue message. + * + * message AnyValue { + * oneof value { + * string string_value = 1; + * bool bool_value = 2; + * int64 int_value = 3; + * double double_value = 4; + * ArrayValue array_value = 5; + * KeyValueList kvlist_value = 6; + * bytes bytes_value = 7; + * } + * } + */ +export const encodeAnyValue = (value: AnyValue): Uint8Array => { + if (value.stringValue !== undefined && value.stringValue !== null) { + return Proto.stringField(1, value.stringValue) + } + if (value.boolValue !== undefined && value.boolValue !== null) { + return Proto.boolField(2, value.boolValue) + } + if (value.intValue !== undefined && value.intValue !== null) { + return Proto.varintField(3, BigInt(value.intValue)) + } + if (value.doubleValue !== undefined && value.doubleValue !== null) { + return Proto.doubleField(4, value.doubleValue) + } + if (value.arrayValue !== undefined) { + return Proto.messageField(5, encodeArrayValue(value.arrayValue)) + } + if (value.kvlistValue !== undefined) { + return Proto.messageField(6, encodeKeyValueList(value.kvlistValue)) + } + if (value.bytesValue !== undefined) { + return Proto.lengthDelimitedField(7, value.bytesValue) + } + return new Uint8Array(0) +} + +/** + * Encodes an ArrayValue message. + * + * message ArrayValue { + * repeated AnyValue values = 1; + * } + */ +export const encodeArrayValue = (value: { values: ReadonlyArray }): Uint8Array => + Proto.repeatedField(1, value.values, encodeAnyValue) + +/** + * Encodes a KeyValueList message. + * + * message KeyValueList { + * repeated KeyValue values = 1; + * } + */ +export const encodeKeyValueList = (value: { values: ReadonlyArray }): Uint8Array => + Proto.repeatedField(1, value.values, encodeKeyValue) + +/** + * Encodes a KeyValue message. + * + * message KeyValue { + * string key = 1; + * AnyValue value = 2; + * } + */ +export const encodeKeyValue = (kv: KeyValue): Uint8Array => + Proto.concat( + Proto.stringField(1, kv.key), + Proto.messageField(2, encodeAnyValue(kv.value)) + ) + +/** + * Encodes an InstrumentationScope message. + * + * message InstrumentationScope { + * string name = 1; + * string version = 2; + * repeated KeyValue attributes = 3; + * uint32 dropped_attributes_count = 4; + * } + */ +export const encodeInstrumentationScope = (scope: { + readonly name: string + readonly version?: string + readonly attributes?: ReadonlyArray + readonly droppedAttributesCount?: number +}): Uint8Array => + Proto.concat( + Proto.stringField(1, scope.name), + Proto.optionalStringField(2, scope.version), + scope.attributes ? Proto.repeatedField(3, scope.attributes, encodeKeyValue) : new Uint8Array(0), + scope.droppedAttributesCount ? Proto.varintField(4, scope.droppedAttributesCount) : new Uint8Array(0) + ) + +// Resource types (opentelemetry.proto.resource.v1) + +/** + * Encodes a Resource message. + * + * message Resource { + * repeated KeyValue attributes = 1; + * uint32 dropped_attributes_count = 2; + * } + */ +export const encodeResource = (resource: Resource): Uint8Array => + Proto.concat( + Proto.repeatedField(1, resource.attributes, encodeKeyValue), + resource.droppedAttributesCount > 0 + ? Proto.varintField(2, resource.droppedAttributesCount) + : new Uint8Array(0) + ) + +// Trace types (opentelemetry.proto.trace.v1) + +/** + * Status code enum + */ +export const StatusCode = { + Unset: 0, + Ok: 1, + Error: 2 +} as const + +/** + * SpanKind enum + */ +export const SpanKind = { + Unspecified: 0, + Internal: 1, + Server: 2, + Client: 3, + Producer: 4, + Consumer: 5 +} as const + +/** + * Encodes a Status message. + * + * message Status { + * string message = 2; + * StatusCode code = 3; + * } + */ +export const encodeStatus = (status: { + readonly code: number + readonly message?: string +}): Uint8Array => + Proto.concat( + Proto.optionalStringField(2, status.message), + Proto.varintField(3, status.code) + ) + +/** + * Encodes an Event message. + * + * message Event { + * fixed64 time_unix_nano = 1; + * string name = 2; + * repeated KeyValue attributes = 3; + * uint32 dropped_attributes_count = 4; + * } + */ +export const encodeEvent = (event: { + readonly timeUnixNano: string + readonly name: string + readonly attributes: ReadonlyArray + readonly droppedAttributesCount: number +}): Uint8Array => + Proto.concat( + Proto.fixed64Field(1, BigInt(event.timeUnixNano)), + Proto.stringField(2, event.name), + Proto.repeatedField(3, event.attributes, encodeKeyValue), + event.droppedAttributesCount > 0 + ? Proto.varintField(4, event.droppedAttributesCount) + : new Uint8Array(0) + ) + +/** + * Encodes a Link message. + * + * message Link { + * bytes trace_id = 1; + * bytes span_id = 2; + * string trace_state = 3; + * repeated KeyValue attributes = 4; + * uint32 dropped_attributes_count = 5; + * fixed32 flags = 6; + * } + */ +export const encodeLink = (link: { + readonly traceId: string + readonly spanId: string + readonly traceState?: string + readonly attributes: ReadonlyArray + readonly droppedAttributesCount: number + readonly flags?: number +}): Uint8Array => + Proto.concat( + Proto.bytesFieldFromHex(1, link.traceId), + Proto.bytesFieldFromHex(2, link.spanId), + Proto.optionalStringField(3, link.traceState), + Proto.repeatedField(4, link.attributes, encodeKeyValue), + link.droppedAttributesCount > 0 + ? Proto.varintField(5, link.droppedAttributesCount) + : new Uint8Array(0), + link.flags !== undefined ? Proto.fixed32Field(6, link.flags) : new Uint8Array(0) + ) + +/** + * Encodes a Span message. + * + * message Span { + * bytes trace_id = 1; + * bytes span_id = 2; + * string trace_state = 3; + * bytes parent_span_id = 4; + * string name = 5; + * SpanKind kind = 6; + * fixed64 start_time_unix_nano = 7; + * fixed64 end_time_unix_nano = 8; + * repeated KeyValue attributes = 9; + * uint32 dropped_attributes_count = 10; + * repeated Event events = 11; + * uint32 dropped_events_count = 12; + * repeated Link links = 13; + * uint32 dropped_links_count = 14; + * Status status = 15; + * fixed32 flags = 16; + * } + */ +export const encodeSpan = (span: { + readonly traceId: string + readonly spanId: string + readonly traceState?: string + readonly parentSpanId?: string + readonly name: string + readonly kind: number + readonly startTimeUnixNano: string + readonly endTimeUnixNano: string + readonly attributes: ReadonlyArray + readonly droppedAttributesCount: number + readonly events: ReadonlyArray<{ + readonly timeUnixNano: string + readonly name: string + readonly attributes: ReadonlyArray + readonly droppedAttributesCount: number + }> + readonly droppedEventsCount: number + readonly links: ReadonlyArray<{ + readonly traceId: string + readonly spanId: string + readonly traceState?: string + readonly attributes: ReadonlyArray + readonly droppedAttributesCount: number + readonly flags?: number + }> + readonly droppedLinksCount: number + readonly status: { + readonly code: number + readonly message?: string + } + readonly flags?: number +}): Uint8Array => + Proto.concat( + Proto.bytesFieldFromHex(1, span.traceId), + Proto.bytesFieldFromHex(2, span.spanId), + Proto.optionalStringField(3, span.traceState), + span.parentSpanId !== undefined + ? Proto.bytesFieldFromHex(4, span.parentSpanId) + : new Uint8Array(0), + Proto.stringField(5, span.name), + Proto.varintField(6, span.kind), + Proto.fixed64Field(7, BigInt(span.startTimeUnixNano)), + Proto.fixed64Field(8, BigInt(span.endTimeUnixNano)), + Proto.repeatedField(9, span.attributes, encodeKeyValue), + span.droppedAttributesCount > 0 + ? Proto.varintField(10, span.droppedAttributesCount) + : new Uint8Array(0), + Proto.repeatedField(11, span.events, encodeEvent), + span.droppedEventsCount > 0 + ? Proto.varintField(12, span.droppedEventsCount) + : new Uint8Array(0), + Proto.repeatedField(13, span.links, encodeLink), + span.droppedLinksCount > 0 + ? Proto.varintField(14, span.droppedLinksCount) + : new Uint8Array(0), + Proto.messageField(15, encodeStatus(span.status)), + span.flags !== undefined ? Proto.fixed32Field(16, span.flags) : new Uint8Array(0) + ) + +/** + * Encodes a ScopeSpans message. + * + * message ScopeSpans { + * InstrumentationScope scope = 1; + * repeated Span spans = 2; + * string schema_url = 3; + * } + */ +export const encodeScopeSpans = (scopeSpans: { + readonly scope: { readonly name: string; readonly version?: string } + readonly spans: ReadonlyArray[0]> + readonly schemaUrl?: string +}): Uint8Array => + Proto.concat( + Proto.messageField(1, encodeInstrumentationScope(scopeSpans.scope)), + Proto.repeatedField(2, scopeSpans.spans, encodeSpan), + Proto.optionalStringField(3, scopeSpans.schemaUrl) + ) + +/** + * Encodes a ResourceSpans message. + * + * message ResourceSpans { + * Resource resource = 1; + * repeated ScopeSpans scope_spans = 2; + * string schema_url = 3; + * } + */ +export const encodeResourceSpans = (resourceSpans: { + readonly resource: Resource + readonly scopeSpans: ReadonlyArray[0]> + readonly schemaUrl?: string +}): Uint8Array => + Proto.concat( + Proto.messageField(1, encodeResource(resourceSpans.resource)), + Proto.repeatedField(2, resourceSpans.scopeSpans, encodeScopeSpans), + Proto.optionalStringField(3, resourceSpans.schemaUrl) + ) + +/** + * Encodes a TracesData message (top-level export request). + * + * message TracesData { + * repeated ResourceSpans resource_spans = 1; + * } + */ +export const encodeTracesData = (tracesData: { + readonly resourceSpans: ReadonlyArray[0]> +}): Uint8Array => + Proto.repeatedField(1, tracesData.resourceSpans, encodeResourceSpans) + +// Metrics types (opentelemetry.proto.metrics.v1) + +/** + * AggregationTemporality enum + */ +export const AggregationTemporality = { + Unspecified: 0, + Delta: 1, + Cumulative: 2 +} as const + +/** + * Encodes a NumberDataPoint message. + * + * message NumberDataPoint { + * repeated KeyValue attributes = 7; + * fixed64 start_time_unix_nano = 2; + * fixed64 time_unix_nano = 3; + * oneof value { + * double as_double = 4; + * sfixed64 as_int = 6; + * } + * repeated Exemplar exemplars = 5; + * uint32 flags = 8; + * } + */ +export const encodeNumberDataPoint = (point: { + readonly attributes: ReadonlyArray + readonly startTimeUnixNano: string + readonly timeUnixNano: string + readonly asDouble?: number | undefined + readonly asInt?: string | number | bigint | undefined + readonly flags?: number | undefined +}): Uint8Array => + Proto.concat( + Proto.fixed64Field(2, BigInt(point.startTimeUnixNano)), + Proto.fixed64Field(3, BigInt(point.timeUnixNano)), + point.asDouble !== undefined + ? Proto.doubleField(4, point.asDouble) + : new Uint8Array(0), + point.asInt !== undefined + ? Proto.fixed64Field(6, BigInt(point.asInt)) + : new Uint8Array(0), + Proto.repeatedField(7, point.attributes, encodeKeyValue), + point.flags !== undefined ? Proto.varintField(8, point.flags) : new Uint8Array(0) + ) + +/** + * Encodes a HistogramDataPoint message. + * + * message HistogramDataPoint { + * repeated KeyValue attributes = 9; + * fixed64 start_time_unix_nano = 2; + * fixed64 time_unix_nano = 3; + * fixed64 count = 4; + * optional double sum = 5; + * repeated fixed64 bucket_counts = 6; + * repeated double explicit_bounds = 7; + * optional double min = 11; + * optional double max = 12; + * uint32 flags = 10; + * } + */ +export const encodeHistogramDataPoint = (point: { + readonly attributes: ReadonlyArray + readonly startTimeUnixNano: string + readonly timeUnixNano: string + readonly count: string | number | bigint + readonly sum?: number | undefined + readonly bucketCounts: ReadonlyArray + readonly explicitBounds: ReadonlyArray + readonly min?: number | undefined + readonly max?: number | undefined + readonly flags?: number | undefined +}): Uint8Array => { + // Pack bucket counts as repeated fixed64 + const bucketCountsEncoded = Proto.concat( + ...point.bucketCounts.map((count) => + Proto.fixed64Field(6, BigInt(count)) + ) + ) + // Pack explicit bounds as repeated double + const explicitBoundsEncoded = Proto.concat( + ...point.explicitBounds.map((bound) => + Proto.doubleField(7, bound) + ) + ) + return Proto.concat( + Proto.fixed64Field(2, BigInt(point.startTimeUnixNano)), + Proto.fixed64Field(3, BigInt(point.timeUnixNano)), + Proto.fixed64Field(4, BigInt(point.count)), + point.sum !== undefined ? Proto.doubleField(5, point.sum) : new Uint8Array(0), + bucketCountsEncoded, + explicitBoundsEncoded, + Proto.repeatedField(9, point.attributes, encodeKeyValue), + point.flags !== undefined ? Proto.varintField(10, point.flags) : new Uint8Array(0), + point.min !== undefined ? Proto.doubleField(11, point.min) : new Uint8Array(0), + point.max !== undefined ? Proto.doubleField(12, point.max) : new Uint8Array(0) + ) +} + +/** + * Encodes a Gauge message. + * + * message Gauge { + * repeated NumberDataPoint data_points = 1; + * } + */ +export const encodeGauge = (gauge: { + readonly dataPoints: ReadonlyArray[0]> +}): Uint8Array => + Proto.repeatedField(1, gauge.dataPoints, encodeNumberDataPoint) + +/** + * Encodes a Sum message. + * + * message Sum { + * repeated NumberDataPoint data_points = 1; + * AggregationTemporality aggregation_temporality = 2; + * bool is_monotonic = 3; + * } + */ +export const encodeSum = (sum: { + readonly dataPoints: ReadonlyArray[0]> + readonly aggregationTemporality: number + readonly isMonotonic: boolean +}): Uint8Array => + Proto.concat( + Proto.repeatedField(1, sum.dataPoints, encodeNumberDataPoint), + Proto.varintField(2, sum.aggregationTemporality), + Proto.boolField(3, sum.isMonotonic) + ) + +/** + * Encodes a Histogram message. + * + * message Histogram { + * repeated HistogramDataPoint data_points = 1; + * AggregationTemporality aggregation_temporality = 2; + * } + */ +export const encodeHistogram = (histogram: { + readonly dataPoints: ReadonlyArray[0]> + readonly aggregationTemporality: number +}): Uint8Array => + Proto.concat( + Proto.repeatedField(1, histogram.dataPoints, encodeHistogramDataPoint), + Proto.varintField(2, histogram.aggregationTemporality) + ) + +/** + * Encodes a Metric message. + * + * message Metric { + * string name = 1; + * string description = 2; + * string unit = 3; + * oneof data { + * Gauge gauge = 5; + * Sum sum = 7; + * Histogram histogram = 9; + * ExponentialHistogram exponential_histogram = 10; + * Summary summary = 11; + * } + * } + */ +export const encodeMetric = (metric: { + readonly name: string + readonly description?: string | undefined + readonly unit?: string | undefined + readonly gauge?: Parameters[0] | undefined + readonly sum?: Parameters[0] | undefined + readonly histogram?: Parameters[0] | undefined +}): Uint8Array => + Proto.concat( + Proto.stringField(1, metric.name), + Proto.optionalStringField(2, metric.description), + Proto.optionalStringField(3, metric.unit), + metric.gauge !== undefined + ? Proto.messageField(5, encodeGauge(metric.gauge)) + : new Uint8Array(0), + metric.sum !== undefined + ? Proto.messageField(7, encodeSum(metric.sum)) + : new Uint8Array(0), + metric.histogram !== undefined + ? Proto.messageField(9, encodeHistogram(metric.histogram)) + : new Uint8Array(0) + ) + +/** + * Encodes a ScopeMetrics message. + * + * message ScopeMetrics { + * InstrumentationScope scope = 1; + * repeated Metric metrics = 2; + * string schema_url = 3; + * } + */ +export const encodeScopeMetrics = (scopeMetrics: { + readonly scope: { readonly name: string; readonly version?: string } + readonly metrics: ReadonlyArray[0]> + readonly schemaUrl?: string +}): Uint8Array => + Proto.concat( + Proto.messageField(1, encodeInstrumentationScope(scopeMetrics.scope)), + Proto.repeatedField(2, scopeMetrics.metrics, encodeMetric), + Proto.optionalStringField(3, scopeMetrics.schemaUrl) + ) + +/** + * Encodes a ResourceMetrics message. + * + * message ResourceMetrics { + * Resource resource = 1; + * repeated ScopeMetrics scope_metrics = 2; + * string schema_url = 3; + * } + */ +export const encodeResourceMetrics = (resourceMetrics: { + readonly resource: Resource + readonly scopeMetrics: ReadonlyArray[0]> + readonly schemaUrl?: string +}): Uint8Array => + Proto.concat( + Proto.messageField(1, encodeResource(resourceMetrics.resource)), + Proto.repeatedField(2, resourceMetrics.scopeMetrics, encodeScopeMetrics), + Proto.optionalStringField(3, resourceMetrics.schemaUrl) + ) + +/** + * Encodes a MetricsData message (top-level export request). + * + * message MetricsData { + * repeated ResourceMetrics resource_metrics = 1; + * } + */ +export const encodeMetricsData = (metricsData: { + readonly resourceMetrics: ReadonlyArray[0]> +}): Uint8Array => + Proto.repeatedField(1, metricsData.resourceMetrics, encodeResourceMetrics) + +// Logs types (opentelemetry.proto.logs.v1) + +/** + * SeverityNumber enum + */ +export const SeverityNumber = { + Unspecified: 0, + Trace: 1, + Trace2: 2, + Trace3: 3, + Trace4: 4, + Debug: 5, + Debug2: 6, + Debug3: 7, + Debug4: 8, + Info: 9, + Info2: 10, + Info3: 11, + Info4: 12, + Warn: 13, + Warn2: 14, + Warn3: 15, + Warn4: 16, + Error: 17, + Error2: 18, + Error3: 19, + Error4: 20, + Fatal: 21, + Fatal2: 22, + Fatal3: 23, + Fatal4: 24 +} as const + +/** + * Encodes a LogRecord message. + * + * message LogRecord { + * fixed64 time_unix_nano = 1; + * fixed64 observed_time_unix_nano = 11; + * SeverityNumber severity_number = 2; + * string severity_text = 3; + * AnyValue body = 5; + * repeated KeyValue attributes = 6; + * uint32 dropped_attributes_count = 7; + * fixed32 flags = 8; + * bytes trace_id = 9; + * bytes span_id = 10; + * } + */ +export const encodeLogRecord = (record: { + readonly timeUnixNano: string + readonly observedTimeUnixNano?: string | undefined + readonly severityNumber?: number | undefined + readonly severityText?: string | undefined + readonly body?: AnyValue | undefined + readonly attributes: ReadonlyArray + readonly droppedAttributesCount?: number | undefined + readonly flags?: number | undefined + readonly traceId?: string | undefined + readonly spanId?: string | undefined +}): Uint8Array => + Proto.concat( + Proto.fixed64Field(1, BigInt(record.timeUnixNano)), + record.severityNumber !== undefined + ? Proto.varintField(2, record.severityNumber) + : new Uint8Array(0), + Proto.optionalStringField(3, record.severityText), + record.body !== undefined + ? Proto.messageField(5, encodeAnyValue(record.body)) + : new Uint8Array(0), + Proto.repeatedField(6, record.attributes, encodeKeyValue), + record.droppedAttributesCount !== undefined && record.droppedAttributesCount > 0 + ? Proto.varintField(7, record.droppedAttributesCount) + : new Uint8Array(0), + record.flags !== undefined ? Proto.fixed32Field(8, record.flags) : new Uint8Array(0), + record.traceId !== undefined && record.traceId !== "" + ? Proto.bytesFieldFromHex(9, record.traceId) + : new Uint8Array(0), + record.spanId !== undefined && record.spanId !== "" + ? Proto.bytesFieldFromHex(10, record.spanId) + : new Uint8Array(0), + record.observedTimeUnixNano !== undefined + ? Proto.fixed64Field(11, BigInt(record.observedTimeUnixNano)) + : new Uint8Array(0) + ) + +/** + * Encodes a ScopeLogs message. + * + * message ScopeLogs { + * InstrumentationScope scope = 1; + * repeated LogRecord log_records = 2; + * string schema_url = 3; + * } + */ +export const encodeScopeLogs = (scopeLogs: { + readonly scope: { readonly name: string; readonly version?: string } + readonly logRecords: ReadonlyArray[0]> + readonly schemaUrl?: string +}): Uint8Array => + Proto.concat( + Proto.messageField(1, encodeInstrumentationScope(scopeLogs.scope)), + Proto.repeatedField(2, scopeLogs.logRecords, encodeLogRecord), + Proto.optionalStringField(3, scopeLogs.schemaUrl) + ) + +/** + * Encodes a ResourceLogs message. + * + * message ResourceLogs { + * Resource resource = 1; + * repeated ScopeLogs scope_logs = 2; + * string schema_url = 3; + * } + */ +export const encodeResourceLogs = (resourceLogs: { + readonly resource: Resource + readonly scopeLogs: ReadonlyArray[0]> + readonly schemaUrl?: string +}): Uint8Array => + Proto.concat( + Proto.messageField(1, encodeResource(resourceLogs.resource)), + Proto.repeatedField(2, resourceLogs.scopeLogs, encodeScopeLogs), + Proto.optionalStringField(3, resourceLogs.schemaUrl) + ) + +/** + * Encodes a LogsData message (top-level export request). + * + * message LogsData { + * repeated ResourceLogs resource_logs = 1; + * } + */ +export const encodeLogsData = (logsData: { + readonly resourceLogs: ReadonlyArray[0]> +}): Uint8Array => + Proto.repeatedField(1, logsData.resourceLogs, encodeResourceLogs) diff --git a/packages/opentelemetry/src/internal/protobuf.ts b/packages/opentelemetry/src/internal/protobuf.ts new file mode 100644 index 00000000000..7d26b5dbe7f --- /dev/null +++ b/packages/opentelemetry/src/internal/protobuf.ts @@ -0,0 +1,226 @@ +/** + * Low-level protobuf wire format encoding utilities. + * + * Protobuf wire types: + * - 0: Varint (int32, int64, uint32, uint64, sint32, sint64, bool, enum) + * - 1: 64-bit (fixed64, sfixed64, double) + * - 2: Length-delimited (string, bytes, embedded messages, packed repeated fields) + * - 5: 32-bit (fixed32, sfixed32, float) + * + * @internal + */ + +const enum WireType { + Varint = 0, + Fixed64 = 1, + LengthDelimited = 2, + Fixed32 = 5 +} + +/** + * Encodes a field tag (field number + wire type) + */ +const encodeTag = (fieldNumber: number, wireType: WireType): number => + (fieldNumber << 3) | wireType + +/** + * Encodes a varint (variable-length integer) + */ +export const encodeVarint = (value: number | bigint): Uint8Array => { + const bytes: Array = [] + let n = typeof value === "bigint" ? value : BigInt(value) + while (n > 0x7fn) { + bytes.push(Number(n & 0x7fn) | 0x80) + n >>= 7n + } + bytes.push(Number(n)) + return new Uint8Array(bytes) +} + +/** + * Encodes a signed varint using ZigZag encoding + */ +export const encodeSint = (value: number | bigint): Uint8Array => { + const n = typeof value === "bigint" ? value : BigInt(value) + const zigzag = (n << 1n) ^ (n >> 63n) + return encodeVarint(zigzag) +} + +/** + * Encodes a 64-bit fixed value (little-endian) + */ +export const encodeFixed64 = (value: bigint): Uint8Array => { + const bytes = new Uint8Array(8) + const view = new DataView(bytes.buffer) + view.setBigUint64(0, value, true) + return bytes +} + +/** + * Encodes a 32-bit fixed value (little-endian) + */ +export const encodeFixed32 = (value: number): Uint8Array => { + const bytes = new Uint8Array(4) + const view = new DataView(bytes.buffer) + view.setUint32(0, value, true) + return bytes +} + +/** + * Encodes a double (64-bit float, little-endian) + */ +export const encodeDouble = (value: number): Uint8Array => { + const bytes = new Uint8Array(8) + const view = new DataView(bytes.buffer) + view.setFloat64(0, value, true) + return bytes +} + +/** + * Encodes a string to UTF-8 bytes + */ +export const encodeString = (value: string): Uint8Array => + new TextEncoder().encode(value) + +/** + * Encodes bytes as a hex string to Uint8Array + */ +export const encodeHexBytes = (hex: string): Uint8Array => { + const bytes = new Uint8Array(hex.length / 2) + for (let i = 0; i < hex.length; i += 2) { + bytes[i / 2] = parseInt(hex.slice(i, i + 2), 16) + } + return bytes +} + +/** + * Concatenates multiple Uint8Arrays + */ +export const concat = (...arrays: Array): Uint8Array => { + const totalLength = arrays.reduce((sum, arr) => sum + arr.length, 0) + const result = new Uint8Array(totalLength) + let offset = 0 + for (const arr of arrays) { + result.set(arr, offset) + offset += arr.length + } + return result +} + +// Field encoders + +/** + * Encodes a varint field + */ +export const varintField = (fieldNumber: number, value: number | bigint): Uint8Array => + concat( + encodeVarint(encodeTag(fieldNumber, WireType.Varint)), + encodeVarint(value) + ) + +/** + * Encodes a sint field (ZigZag encoded) + */ +export const sintField = (fieldNumber: number, value: number | bigint): Uint8Array => + concat( + encodeVarint(encodeTag(fieldNumber, WireType.Varint)), + encodeSint(value) + ) + +/** + * Encodes a bool field + */ +export const boolField = (fieldNumber: number, value: boolean): Uint8Array => + varintField(fieldNumber, value ? 1 : 0) + +/** + * Encodes a fixed64 field + */ +export const fixed64Field = (fieldNumber: number, value: bigint): Uint8Array => + concat( + encodeVarint(encodeTag(fieldNumber, WireType.Fixed64)), + encodeFixed64(value) + ) + +/** + * Encodes a fixed32 field + */ +export const fixed32Field = (fieldNumber: number, value: number): Uint8Array => + concat( + encodeVarint(encodeTag(fieldNumber, WireType.Fixed32)), + encodeFixed32(value) + ) + +/** + * Encodes a double field + */ +export const doubleField = (fieldNumber: number, value: number): Uint8Array => + concat( + encodeVarint(encodeTag(fieldNumber, WireType.Fixed64)), + encodeDouble(value) + ) + +/** + * Encodes a length-delimited field (bytes, string, embedded message) + */ +export const lengthDelimitedField = (fieldNumber: number, value: Uint8Array): Uint8Array => + concat( + encodeVarint(encodeTag(fieldNumber, WireType.LengthDelimited)), + encodeVarint(value.length), + value + ) + +/** + * Encodes a string field + */ +export const stringField = (fieldNumber: number, value: string): Uint8Array => + lengthDelimitedField(fieldNumber, encodeString(value)) + +/** + * Encodes a bytes field from hex string + */ +export const bytesFieldFromHex = (fieldNumber: number, hex: string): Uint8Array => + lengthDelimitedField(fieldNumber, encodeHexBytes(hex)) + +/** + * Encodes an embedded message field + */ +export const messageField = (fieldNumber: number, message: Uint8Array): Uint8Array => + lengthDelimitedField(fieldNumber, message) + +/** + * Encodes repeated fields + */ +export const repeatedField = ( + fieldNumber: number, + values: ReadonlyArray, + encode: (value: T) => Uint8Array +): Uint8Array => + concat(...values.map((v) => messageField(fieldNumber, encode(v)))) + +/** + * Encodes repeated varint fields (not packed) + */ +export const repeatedVarintField = ( + fieldNumber: number, + values: ReadonlyArray +): Uint8Array => + concat(...values.map((v) => varintField(fieldNumber, v))) + +/** + * Helper to conditionally encode an optional field + */ +export const optionalField = ( + value: T | undefined, + encode: (v: T) => Uint8Array +): Uint8Array => + value !== undefined ? encode(value) : new Uint8Array(0) + +/** + * Helper to conditionally encode a string field if non-empty + */ +export const optionalStringField = ( + fieldNumber: number, + value: string | undefined +): Uint8Array => + value !== undefined && value !== "" ? stringField(fieldNumber, value) : new Uint8Array(0) diff --git a/packages/opentelemetry/test/Protobuf.test.ts b/packages/opentelemetry/test/Protobuf.test.ts new file mode 100644 index 00000000000..2f91ceeb886 --- /dev/null +++ b/packages/opentelemetry/test/Protobuf.test.ts @@ -0,0 +1,338 @@ +import { describe, expect, it } from "@effect/vitest" +import * as Proto from "../src/internal/protobuf.js" +import * as OtlpProtobuf from "../src/internal/otlpProtobuf.js" + +describe("Protobuf encoding", () => { + describe("primitives", () => { + it("encodeVarint small values", () => { + // 0 encodes to [0] + expect(Proto.encodeVarint(0)).toEqual(new Uint8Array([0])) + // 1 encodes to [1] + expect(Proto.encodeVarint(1)).toEqual(new Uint8Array([1])) + // 127 encodes to [127] (single byte max) + expect(Proto.encodeVarint(127)).toEqual(new Uint8Array([127])) + // 128 encodes to [128, 1] (requires continuation bit) + expect(Proto.encodeVarint(128)).toEqual(new Uint8Array([128, 1])) + // 300 encodes to [172, 2] + expect(Proto.encodeVarint(300)).toEqual(new Uint8Array([172, 2])) + }) + + it("encodeVarint large values", () => { + // 16384 = 0x4000 encodes to [128, 128, 1] + expect(Proto.encodeVarint(16384)).toEqual(new Uint8Array([128, 128, 1])) + }) + + it("encodeFixed64", () => { + const result = Proto.encodeFixed64(BigInt("1234567890123456789")) + expect(result.length).toBe(8) + // Little-endian encoding + const view = new DataView(result.buffer) + expect(view.getBigUint64(0, true)).toBe(BigInt("1234567890123456789")) + }) + + it("encodeFixed32", () => { + const result = Proto.encodeFixed32(0x12345678) + expect(result.length).toBe(4) + const view = new DataView(result.buffer) + expect(view.getUint32(0, true)).toBe(0x12345678) + }) + + it("encodeDouble", () => { + const result = Proto.encodeDouble(3.14159) + expect(result.length).toBe(8) + const view = new DataView(result.buffer) + expect(view.getFloat64(0, true)).toBeCloseTo(3.14159) + }) + + it("encodeString", () => { + const result = Proto.encodeString("hello") + expect(result).toEqual(new Uint8Array([104, 101, 108, 108, 111])) + }) + + it("encodeHexBytes", () => { + const result = Proto.encodeHexBytes("deadbeef") + expect(result).toEqual(new Uint8Array([0xde, 0xad, 0xbe, 0xef])) + }) + + it("concat", () => { + const a = new Uint8Array([1, 2, 3]) + const b = new Uint8Array([4, 5]) + const c = new Uint8Array([6]) + const result = Proto.concat(a, b, c) + expect(result).toEqual(new Uint8Array([1, 2, 3, 4, 5, 6])) + }) + }) + + describe("field encoders", () => { + it("varintField", () => { + // field 1, value 150 + // tag = (1 << 3) | 0 = 8 + // value = 150 = [150, 1] (as varint) + const result = Proto.varintField(1, 150) + expect(result).toEqual(new Uint8Array([8, 150, 1])) + }) + + it("boolField", () => { + // field 2, true + // tag = (2 << 3) | 0 = 16 + const trueResult = Proto.boolField(2, true) + expect(trueResult).toEqual(new Uint8Array([16, 1])) + + const falseResult = Proto.boolField(2, false) + expect(falseResult).toEqual(new Uint8Array([16, 0])) + }) + + it("stringField", () => { + // field 1, value "hi" + // tag = (1 << 3) | 2 = 10 (length-delimited) + // length = 2 + // data = [104, 105] + const result = Proto.stringField(1, "hi") + expect(result).toEqual(new Uint8Array([10, 2, 104, 105])) + }) + + it("fixed64Field", () => { + // field 1, wire type 1 (64-bit) + // tag = (1 << 3) | 1 = 9 + const result = Proto.fixed64Field(1, BigInt(1)) + expect(result[0]).toBe(9) + expect(result.length).toBe(9) // 1 tag + 8 data + }) + + it("messageField", () => { + // field 2, embedded message [1, 2, 3] + // tag = (2 << 3) | 2 = 18 (length-delimited) + // length = 3 + const result = Proto.messageField(2, new Uint8Array([1, 2, 3])) + expect(result).toEqual(new Uint8Array([18, 3, 1, 2, 3])) + }) + }) + + describe("OTLP types", () => { + it("encodeAnyValue - string", () => { + const result = OtlpProtobuf.encodeAnyValue({ stringValue: "test" }) + // field 1 (string_value), length-delimited + // tag = (1 << 3) | 2 = 10 + expect(result[0]).toBe(10) + expect(result[1]).toBe(4) // length + }) + + it("encodeAnyValue - bool", () => { + const result = OtlpProtobuf.encodeAnyValue({ boolValue: true }) + // field 2 (bool_value), varint + // tag = (2 << 3) | 0 = 16 + expect(result).toEqual(new Uint8Array([16, 1])) + }) + + it("encodeAnyValue - int", () => { + const result = OtlpProtobuf.encodeAnyValue({ intValue: 42 }) + // field 3 (int_value), varint + // tag = (3 << 3) | 0 = 24 + expect(result[0]).toBe(24) + }) + + it("encodeAnyValue - double", () => { + const result = OtlpProtobuf.encodeAnyValue({ doubleValue: 3.14 }) + // field 4 (double_value), 64-bit + // tag = (4 << 3) | 1 = 33 + expect(result[0]).toBe(33) + expect(result.length).toBe(9) // 1 tag + 8 data + }) + + it("encodeKeyValue", () => { + const result = OtlpProtobuf.encodeKeyValue({ + key: "test", + value: { stringValue: "value" } + }) + // Should contain field 1 (key) and field 2 (value) + expect(result.length).toBeGreaterThan(0) + // First byte should be tag for field 1 string + expect(result[0]).toBe(10) // (1 << 3) | 2 = 10 + }) + + it("encodeResource", () => { + const result = OtlpProtobuf.encodeResource({ + attributes: [ + { key: "service.name", value: { stringValue: "test-service" } } + ], + droppedAttributesCount: 0 + }) + // Should encode attributes as repeated field 1 + expect(result.length).toBeGreaterThan(0) + }) + + it("encodeStatus", () => { + const okStatus = OtlpProtobuf.encodeStatus({ code: OtlpProtobuf.StatusCode.Ok }) + expect(okStatus.length).toBeGreaterThan(0) + + const errorStatus = OtlpProtobuf.encodeStatus({ + code: OtlpProtobuf.StatusCode.Error, + message: "test error" + }) + expect(errorStatus.length).toBeGreaterThan(okStatus.length) + }) + + it("encodeSpan", () => { + const result = OtlpProtobuf.encodeSpan({ + traceId: "0123456789abcdef0123456789abcdef", + spanId: "0123456789abcdef", + name: "test-span", + kind: OtlpProtobuf.SpanKind.Internal, + startTimeUnixNano: "1000000000", + endTimeUnixNano: "2000000000", + attributes: [ + { key: "test.attr", value: { stringValue: "value" } } + ], + droppedAttributesCount: 0, + events: [], + droppedEventsCount: 0, + links: [], + droppedLinksCount: 0, + status: { code: OtlpProtobuf.StatusCode.Ok } + }) + expect(result.length).toBeGreaterThan(0) + // Should be a valid protobuf message + // Verify it starts with field 1 (trace_id) bytes + expect(result[0]).toBe(10) // (1 << 3) | 2 = 10 (length-delimited) + }) + + it("encodeTracesData", () => { + const result = OtlpProtobuf.encodeTracesData({ + resourceSpans: [{ + resource: { + attributes: [ + { key: "service.name", value: { stringValue: "test" } } + ], + droppedAttributesCount: 0 + }, + scopeSpans: [{ + scope: { name: "test-scope" }, + spans: [{ + traceId: "0123456789abcdef0123456789abcdef", + spanId: "0123456789abcdef", + name: "test-span", + kind: OtlpProtobuf.SpanKind.Server, + startTimeUnixNano: "1000000000000000000", + endTimeUnixNano: "2000000000000000000", + attributes: [], + droppedAttributesCount: 0, + events: [], + droppedEventsCount: 0, + links: [], + droppedLinksCount: 0, + status: { code: OtlpProtobuf.StatusCode.Ok } + }] + }] + }] + }) + expect(result.length).toBeGreaterThan(0) + }) + + it("encodeMetricsData", () => { + const result = OtlpProtobuf.encodeMetricsData({ + resourceMetrics: [{ + resource: { + attributes: [ + { key: "service.name", value: { stringValue: "test" } } + ], + droppedAttributesCount: 0 + }, + scopeMetrics: [{ + scope: { name: "test-scope" }, + metrics: [{ + name: "test.counter", + description: "A test counter", + unit: "1", + sum: { + dataPoints: [{ + attributes: [], + startTimeUnixNano: "1000000000000000000", + timeUnixNano: "2000000000000000000", + asInt: "42" + }], + aggregationTemporality: OtlpProtobuf.AggregationTemporality.Cumulative, + isMonotonic: true + } + }] + }] + }] + }) + expect(result.length).toBeGreaterThan(0) + }) + + it("encodeLogsData", () => { + const result = OtlpProtobuf.encodeLogsData({ + resourceLogs: [{ + resource: { + attributes: [ + { key: "service.name", value: { stringValue: "test" } } + ], + droppedAttributesCount: 0 + }, + scopeLogs: [{ + scope: { name: "test-scope" }, + logRecords: [{ + timeUnixNano: "1000000000000000000", + severityNumber: OtlpProtobuf.SeverityNumber.Info, + severityText: "INFO", + body: { stringValue: "Test log message" }, + attributes: [ + { key: "log.key", value: { stringValue: "log.value" } } + ], + droppedAttributesCount: 0 + }] + }] + }] + }) + expect(result.length).toBeGreaterThan(0) + }) + }) + + describe("edge cases", () => { + it("handles empty arrays", () => { + const result = OtlpProtobuf.encodeTracesData({ + resourceSpans: [] + }) + // Empty repeated field should produce empty output + expect(result.length).toBe(0) + }) + + it("handles optional fields", () => { + const result = OtlpProtobuf.encodeSpan({ + traceId: "0123456789abcdef0123456789abcdef", + spanId: "0123456789abcdef", + name: "test", + kind: OtlpProtobuf.SpanKind.Internal, + startTimeUnixNano: "0", + endTimeUnixNano: "0", + attributes: [], + droppedAttributesCount: 0, + events: [], + droppedEventsCount: 0, + links: [], + droppedLinksCount: 0, + status: { code: OtlpProtobuf.StatusCode.Unset } + }) + expect(result.length).toBeGreaterThan(0) + }) + + it("handles special characters in strings", () => { + const result = OtlpProtobuf.encodeAnyValue({ + stringValue: "hello\nworld\t\r\n" + }) + expect(result.length).toBeGreaterThan(0) + }) + + it("handles unicode strings", () => { + const result = OtlpProtobuf.encodeAnyValue({ + stringValue: "hello" + }) + expect(result.length).toBeGreaterThan(0) + }) + + it("handles large numbers", () => { + const result = Proto.encodeVarint(BigInt("9223372036854775807")) + expect(result.length).toBe(9) // Max varint size for 64-bit + }) + }) +}) From 93dc380aa297299e0d5d34f619fa50c00677b1ea Mon Sep 17 00:00:00 2001 From: David Golightly Date: Tue, 20 Jan 2026 10:23:08 -0800 Subject: [PATCH 02/10] feat(opentelemetry): add OtlpSerializer service for tree-shakable protobuf support Refactors OTLP exporters to use a serializer service pattern, enabling tree-shaking of protobuf code when only JSON encoding is used. - Add OtlpSerializer service with JSON layer (default) - Add OtlpSerializerProtobuf layer for explicit protobuf opt-in - Remove direct protobuf imports from exporters (key for tree-shaking) - Update Otlp.layer to provide JSON serializer by default - Backwards compatible: existing JSON code works unchanged Co-Authored-By: Claude Opus 4.5 --- .changeset/add-otlp-protobuf-support.md | 30 ++++++-- packages/opentelemetry/src/Otlp.ts | 62 ++++++++++----- packages/opentelemetry/src/OtlpLogger.ts | 34 +------- packages/opentelemetry/src/OtlpMetrics.ts | 69 +---------------- packages/opentelemetry/src/OtlpSerializer.ts | 77 +++++++++++++++++++ .../src/OtlpSerializerProtobuf.ts | 40 ++++++++++ packages/opentelemetry/src/OtlpTracer.ts | 23 ++---- packages/opentelemetry/src/index.ts | 10 +++ .../src/internal/otlpExporter.ts | 32 ++++---- .../src/internal/otlpProtobuf.ts | 20 ++--- .../opentelemetry/src/internal/protobuf.ts | 21 ++--- packages/opentelemetry/test/Protobuf.test.ts | 50 ++++++++++++ .../test/SerializerOverride.test.ts | 53 +++++++++++++ packages/opentelemetry/test/Tracer.test.ts | 3 +- 14 files changed, 343 insertions(+), 181 deletions(-) create mode 100644 packages/opentelemetry/src/OtlpSerializer.ts create mode 100644 packages/opentelemetry/src/OtlpSerializerProtobuf.ts create mode 100644 packages/opentelemetry/test/SerializerOverride.test.ts diff --git a/.changeset/add-otlp-protobuf-support.md b/.changeset/add-otlp-protobuf-support.md index 56b6d9fb4cc..7bdf999c9ed 100644 --- a/.changeset/add-otlp-protobuf-support.md +++ b/.changeset/add-otlp-protobuf-support.md @@ -2,21 +2,41 @@ "@effect/opentelemetry": minor --- -Add protobuf protocol support for OTLP exporters. +Add tree-shakable protobuf protocol support for OTLP exporters. -This adds a `protocol` option to `Otlp.layer`, `OtlpTracer.layer`, `OtlpMetrics.layer`, and `OtlpLogger.layer` that allows choosing between JSON (default) and Protocol Buffers binary encoding when exporting telemetry data to OpenTelemetry collectors. +This introduces an `OtlpSerializer` service that allows choosing between JSON (default) and Protocol Buffers binary encoding when exporting telemetry data to OpenTelemetry collectors. The design ensures protobuf code is only included in your bundle when you explicitly opt into it. +**JSON encoding (default) - no changes required:** ```typescript import { Otlp } from "@effect/opentelemetry" -// Use protobuf encoding for more efficient wire format -Otlp.layer({ +// Works exactly as before - protobuf code is tree-shaken away +const layer = Otlp.layer({ baseUrl: "http://localhost:4318", - protocol: "protobuf", resource: { serviceName: "my-service" } }) ``` +**Protobuf encoding - explicit opt-in:** +```typescript +import { Otlp, OtlpSerializerProtobuf } from "@effect/opentelemetry" +import * as Layer from "effect/Layer" + +// Use layerWithSerializer to control the serialization format +// Protobuf code is only included when you import OtlpSerializerProtobuf +const layer = Otlp.layerWithSerializer({ + baseUrl: "http://localhost:4318", + resource: { serviceName: "my-service" } +}).pipe(Layer.provide(OtlpSerializerProtobuf.protobuf)) +``` + +**New exports:** +- `Otlp.layerWithSerializer` - OTLP layer that requires `OtlpSerializer` to be provided +- `OtlpSerializer` - Service definition and JSON layer (`OtlpSerializer.json`) +- `OtlpSerializerProtobuf` - Protobuf layer (`OtlpSerializerProtobuf.protobuf`) + +**Features:** - No new dependencies - protobuf encoding implemented from scratch +- Tree-shakable - protobuf code not in bundle unless explicitly imported - Sets appropriate Content-Type header (`application/x-protobuf` vs `application/json`) - Follows opentelemetry-proto specifications diff --git a/packages/opentelemetry/src/Otlp.ts b/packages/opentelemetry/src/Otlp.ts index ae1c39012e4..b944f25170a 100644 --- a/packages/opentelemetry/src/Otlp.ts +++ b/packages/opentelemetry/src/Otlp.ts @@ -8,27 +8,18 @@ import type * as Duration from "effect/Duration" import * as Layer from "effect/Layer" import type * as Logger from "effect/Logger" import type * as Tracer from "effect/Tracer" -import type { OtlpProtocol as _OtlpProtocol } from "./internal/otlpExporter.js" import * as OtlpLogger from "./OtlpLogger.js" import * as OtlpMetrics from "./OtlpMetrics.js" +import * as OtlpSerializer from "./OtlpSerializer.js" import * as OtlpTracer from "./OtlpTracer.js" /** - * OTLP protocol type for encoding telemetry data. - * - * - `"json"`: JSON encoding (default) - uses `application/json` content type - * - `"protobuf"`: Protocol Buffers binary encoding - uses `application/x-protobuf` content type + * Options for OTLP layer configuration. * * @since 1.0.0 * @category Models */ -export type OtlpProtocol = _OtlpProtocol - -/** - * @since 1.0.0 - * @category Layers - */ -export const layer = (options: { +export interface OtlpLayerOptions { readonly baseUrl: string readonly resource?: { readonly serviceName?: string | undefined @@ -44,8 +35,23 @@ export const layer = (options: { readonly metricsExportInterval?: Duration.DurationInput | undefined readonly tracerExportInterval?: Duration.DurationInput | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined - readonly protocol?: OtlpProtocol | undefined -}): Layer.Layer => { +} + +/** + * Creates an OTLP layer that requires an `OtlpSerializer` to be provided. + * + * Use this when you want to explicitly control the serialization format: + * - For JSON: `Otlp.layerWithSerializer(options).pipe(Layer.provide(OtlpSerializer.json))` + * - For Protobuf: `Otlp.layerWithSerializer(options).pipe(Layer.provide(OtlpSerializerProtobuf.protobuf))` + * + * For convenience, use `Otlp.layer` which provides JSON serialization by default. + * + * @since 1.0.0 + * @category Layers + */ +export const layerWithSerializer = ( + options: OtlpLayerOptions +): Layer.Layer => { const baseReq = HttpClientRequest.get(options.baseUrl) const url = (path: string) => HttpClientRequest.appendUrl(baseReq, path).url return Layer.mergeAll( @@ -57,16 +63,14 @@ export const layer = (options: { exportInterval: options.loggerExportInterval, maxBatchSize: options.maxBatchSize, shutdownTimeout: options.shutdownTimeout, - excludeLogSpans: options.loggerExcludeLogSpans, - protocol: options.protocol + excludeLogSpans: options.loggerExcludeLogSpans }), OtlpMetrics.layer({ url: url("/v1/metrics"), resource: options.resource, headers: options.headers, exportInterval: options.metricsExportInterval, - shutdownTimeout: options.shutdownTimeout, - protocol: options.protocol + shutdownTimeout: options.shutdownTimeout }), OtlpTracer.layer({ url: url("/v1/traces"), @@ -75,8 +79,26 @@ export const layer = (options: { exportInterval: options.tracerExportInterval, maxBatchSize: options.maxBatchSize, context: options.tracerContext, - shutdownTimeout: options.shutdownTimeout, - protocol: options.protocol + shutdownTimeout: options.shutdownTimeout }) ) } + +/** + * Creates an OTLP layer with JSON serialization (default). + * + * This is the recommended way to create an OTLP layer for most use cases. + * For protobuf encoding, use `layerWithSerializer` with `OtlpSerializerProtobuf.protobuf`. + * + * @example + * ```typescript + * import { Otlp } from "@effect/opentelemetry" + * + * const layer = Otlp.layer({ baseUrl: "http://localhost:4318" }) + * ``` + * + * @since 1.0.0 + * @category Layers + */ +export const layer = (options: OtlpLayerOptions): Layer.Layer => + layerWithSerializer(options).pipe(Layer.provide(OtlpSerializer.json)) diff --git a/packages/opentelemetry/src/OtlpLogger.ts b/packages/opentelemetry/src/OtlpLogger.ts index 900295fa2f3..3a322c77797 100644 --- a/packages/opentelemetry/src/OtlpLogger.ts +++ b/packages/opentelemetry/src/OtlpLogger.ts @@ -18,10 +18,9 @@ import * as Option from "effect/Option" import type * as Scope from "effect/Scope" import * as Tracer from "effect/Tracer" import * as Exporter from "./internal/otlpExporter.js" -import type { OtlpProtocol } from "./internal/otlpExporter.js" -import * as OtlpProtobuf from "./internal/otlpProtobuf.js" import type { AnyValue, Fixed64, KeyValue, Resource } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" +import type { OtlpSerializer } from "./OtlpSerializer.js" /** * @since 1.0.0 @@ -40,12 +39,11 @@ export const make: ( readonly maxBatchSize?: number | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined readonly excludeLogSpans?: boolean | undefined - readonly protocol?: OtlpProtocol | undefined } ) => Effect.Effect< Logger.Logger, never, - HttpClient.HttpClient | Scope.Scope + HttpClient.HttpClient | OtlpSerializer | Scope.Scope > = Effect.fnUntraced(function*(options) { const otelResource = yield* OtlpResource.fromConfig(options.resource) const scope: IInstrumentationScope = { @@ -58,7 +56,7 @@ export const make: ( headers: options.headers, maxBatchSize: options.maxBatchSize ?? 1000, exportInterval: options.exportInterval ?? Duration.seconds(1), - protocol: options.protocol, + kind: "logs", body: (data): IExportLogsServiceRequest => ({ resourceLogs: [{ resource: otelResource, @@ -68,29 +66,6 @@ export const make: ( }] }] }), - bodyProtobuf: (data: Array): Uint8Array => - OtlpProtobuf.encodeLogsData({ - resourceLogs: [{ - resource: otelResource, - scopeLogs: [{ - scope, - logRecords: data.map((record) => ({ - timeUnixNano: String(record.timeUnixNano), - observedTimeUnixNano: record.observedTimeUnixNano !== undefined - ? String(record.observedTimeUnixNano) - : undefined, - severityNumber: record.severityNumber, - severityText: record.severityText, - body: record.body, - attributes: record.attributes, - droppedAttributesCount: record.droppedAttributesCount, - flags: record.flags, - traceId: typeof record.traceId === "string" ? record.traceId : undefined, - spanId: typeof record.spanId === "string" ? record.spanId : undefined - })) - }] - }] - }), shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3) }) @@ -119,8 +94,7 @@ export const layer = (options: { readonly maxBatchSize?: number | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined readonly excludeLogSpans?: boolean | undefined - readonly protocol?: OtlpProtocol | undefined -}): Layer.Layer => +}): Layer.Layer => options.replaceLogger ? Logger.replaceScoped(options.replaceLogger, make(options)) : Logger.addScoped(make(options)) // internal diff --git a/packages/opentelemetry/src/OtlpMetrics.ts b/packages/opentelemetry/src/OtlpMetrics.ts index 84e72f3fbc0..e8f2a4881d2 100644 --- a/packages/opentelemetry/src/OtlpMetrics.ts +++ b/packages/opentelemetry/src/OtlpMetrics.ts @@ -13,10 +13,9 @@ import * as MetricState from "effect/MetricState" import * as Option from "effect/Option" import type * as Scope from "effect/Scope" import * as Exporter from "./internal/otlpExporter.js" -import type { OtlpProtocol } from "./internal/otlpExporter.js" -import * as OtlpProtobuf from "./internal/otlpProtobuf.js" import type { Fixed64, KeyValue } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" +import type { OtlpSerializer } from "./OtlpSerializer.js" /** * @since 1.0.0 @@ -32,11 +31,10 @@ export const make: (options: { readonly headers?: Headers.Input | undefined readonly exportInterval?: Duration.DurationInput | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined - readonly protocol?: OtlpProtocol | undefined }) => Effect.Effect< void, never, - HttpClient.HttpClient | Scope.Scope + HttpClient.HttpClient | OtlpSerializer | Scope.Scope > = Effect.fnUntraced(function*(options) { const clock = yield* Effect.clock const startTime = String(clock.unsafeCurrentTimeNanos()) @@ -268,72 +266,14 @@ export const make: (options: { } } - const snapshotProtobuf = (): Uint8Array => { - const data = snapshot() - return OtlpProtobuf.encodeMetricsData({ - resourceMetrics: data.resourceMetrics.map((rm) => ({ - resource: rm.resource!, - scopeMetrics: rm.scopeMetrics.map((sm) => ({ - scope: sm.scope!, - metrics: sm.metrics.map((m) => ({ - name: m.name, - description: m.description, - unit: m.unit, - gauge: m.gauge - ? { - dataPoints: m.gauge.dataPoints.map((dp) => ({ - attributes: dp.attributes, - startTimeUnixNano: String(dp.startTimeUnixNano ?? "0"), - timeUnixNano: String(dp.timeUnixNano ?? "0"), - asDouble: dp.asDouble !== null ? dp.asDouble : undefined, - asInt: dp.asInt !== undefined ? String(dp.asInt) : undefined - })) - } - : undefined, - sum: m.sum - ? { - dataPoints: m.sum.dataPoints.map((dp) => ({ - attributes: dp.attributes, - startTimeUnixNano: String(dp.startTimeUnixNano ?? "0"), - timeUnixNano: String(dp.timeUnixNano ?? "0"), - asDouble: dp.asDouble !== null ? dp.asDouble : undefined, - asInt: dp.asInt !== undefined ? String(dp.asInt) : undefined - })), - aggregationTemporality: m.sum.aggregationTemporality, - isMonotonic: m.sum.isMonotonic - } - : undefined, - histogram: m.histogram - ? { - dataPoints: m.histogram.dataPoints.map((dp) => ({ - attributes: dp.attributes ?? [], - startTimeUnixNano: String(dp.startTimeUnixNano ?? "0"), - timeUnixNano: String(dp.timeUnixNano ?? "0"), - count: dp.count ?? 0, - sum: dp.sum, - bucketCounts: dp.bucketCounts ?? [], - explicitBounds: dp.explicitBounds ?? [], - min: dp.min, - max: dp.max - })), - aggregationTemporality: m.histogram.aggregationTemporality ?? OtlpProtobuf.AggregationTemporality.Cumulative - } - : undefined - })) - })) - })) - }) - } - yield* Exporter.make({ label: "OtlpMetrics", url: options.url, headers: options.headers, maxBatchSize: "disabled", exportInterval: options.exportInterval ?? Duration.seconds(10), - protocol: options.protocol, + kind: "metrics", body: snapshot, - bodyProtobuf: snapshotProtobuf, shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3) }) }) @@ -352,8 +292,7 @@ export const layer = (options: { readonly headers?: Headers.Input | undefined readonly exportInterval?: Duration.DurationInput | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined - readonly protocol?: OtlpProtocol | undefined -}): Layer.Layer => Layer.scopedDiscard(make(options)) +}): Layer.Layer => Layer.scopedDiscard(make(options)) // internal diff --git a/packages/opentelemetry/src/OtlpSerializer.ts b/packages/opentelemetry/src/OtlpSerializer.ts new file mode 100644 index 00000000000..bf07b789c00 --- /dev/null +++ b/packages/opentelemetry/src/OtlpSerializer.ts @@ -0,0 +1,77 @@ +/** + * OtlpSerializer service for tree-shakable protobuf support. + * + * This module provides the `OtlpSerializer` service that abstracts the + * encoding of OTLP telemetry data. By default, the JSON serializer is used. + * To use protobuf encoding, provide the `OtlpSerializerProtobuf.protobuf` layer. + * + * @example + * ```typescript + * import { Otlp } from "@effect/opentelemetry" + * + * // JSON encoding (default) - protobuf code is tree-shaken away + * const jsonLayer = Otlp.layer({ baseUrl: "http://localhost:4318" }) + * + * // Protobuf encoding - explicitly import to include in bundle + * import { OtlpSerializerProtobuf } from "@effect/opentelemetry" + * + * const protobufLayer = Otlp.layer({ baseUrl: "http://localhost:4318" }).pipe( + * Layer.provide(OtlpSerializerProtobuf.protobuf) + * ) + * ``` + * + * @since 1.0.0 + */ +import * as Context from "effect/Context" +import * as Layer from "effect/Layer" + +/** + * Service interface for serializing OTLP telemetry data. + * + * @since 1.0.0 + * @category Models + */ +export interface OtlpSerializer { + /** + * The content type header to use for HTTP requests. + */ + readonly contentType: string + /** + * Encodes trace data for transmission. + */ + readonly encodeTraces: (data: unknown) => Uint8Array | string + /** + * Encodes metrics data for transmission. + */ + readonly encodeMetrics: (data: unknown) => Uint8Array | string + /** + * Encodes logs data for transmission. + */ + readonly encodeLogs: (data: unknown) => Uint8Array | string +} + +/** + * Tag for the OtlpSerializer service. + * + * @since 1.0.0 + * @category Tags + */ +export const OtlpSerializer: Context.Tag = Context.GenericTag( + "@effect/opentelemetry/OtlpSerializer" +) + +/** + * JSON serializer layer for OTLP telemetry data. + * + * This is the default serializer used by OTLP exporters. It encodes + * telemetry data as JSON strings with `application/json` content type. + * + * @since 1.0.0 + * @category Layers + */ +export const json: Layer.Layer = Layer.succeed(OtlpSerializer, { + contentType: "application/json", + encodeTraces: (data) => JSON.stringify(data), + encodeMetrics: (data) => JSON.stringify(data), + encodeLogs: (data) => JSON.stringify(data) +}) diff --git a/packages/opentelemetry/src/OtlpSerializerProtobuf.ts b/packages/opentelemetry/src/OtlpSerializerProtobuf.ts new file mode 100644 index 00000000000..cc3c5a103a9 --- /dev/null +++ b/packages/opentelemetry/src/OtlpSerializerProtobuf.ts @@ -0,0 +1,40 @@ +/** + * Protobuf serializer for OTLP telemetry data. + * + * This module provides the protobuf-based serializer for OTLP exporters. + * Import this module only when you need protobuf encoding - it will bring + * the protobuf encoding code into your bundle. + * + * @example + * ```typescript + * import { Otlp, OtlpSerializerProtobuf } from "@effect/opentelemetry" + * import * as Layer from "effect/Layer" + * + * // Use protobuf encoding for more efficient wire format + * const layer = Otlp.layer({ baseUrl: "http://localhost:4318" }).pipe( + * Layer.provide(OtlpSerializerProtobuf.protobuf) + * ) + * ``` + * + * @since 1.0.0 + */ +import * as Layer from "effect/Layer" +import * as OtlpProtobuf from "./internal/otlpProtobuf.js" +import { OtlpSerializer } from "./OtlpSerializer.js" + +/** + * Protobuf serializer layer for OTLP telemetry data. + * + * This serializer encodes telemetry data using Protocol Buffers binary + * format with `application/x-protobuf` content type. It provides more + * efficient wire format compared to JSON. + * + * @since 1.0.0 + * @category Layers + */ +export const protobuf: Layer.Layer = Layer.succeed(OtlpSerializer, { + contentType: "application/x-protobuf", + encodeTraces: (data: any) => OtlpProtobuf.encodeTracesData(data), + encodeMetrics: (data: any) => OtlpProtobuf.encodeMetricsData(data), + encodeLogs: (data: any) => OtlpProtobuf.encodeLogsData(data) +}) diff --git a/packages/opentelemetry/src/OtlpTracer.ts b/packages/opentelemetry/src/OtlpTracer.ts index af5c0e5a6ca..900a5e15c6d 100644 --- a/packages/opentelemetry/src/OtlpTracer.ts +++ b/packages/opentelemetry/src/OtlpTracer.ts @@ -14,11 +14,10 @@ import type * as Scope from "effect/Scope" import * as Tracer from "effect/Tracer" import type { ExtractTag } from "effect/Types" import * as Exporter from "./internal/otlpExporter.js" -import type { OtlpProtocol } from "./internal/otlpExporter.js" -import * as OtlpProtobuf from "./internal/otlpProtobuf.js" import type { KeyValue, Resource } from "./OtlpResource.js" import { entriesToAttributes } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" +import type { OtlpSerializer } from "./OtlpSerializer.js" const ATTR_EXCEPTION_TYPE = "exception.type" const ATTR_EXCEPTION_MESSAGE = "exception.message" @@ -41,12 +40,11 @@ export const make: ( readonly maxBatchSize?: number | undefined readonly context?: ((f: () => X, span: Tracer.AnySpan) => X) | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined - readonly protocol?: OtlpProtocol | undefined } ) => Effect.Effect< Tracer.Tracer, never, - HttpClient.HttpClient | Scope.Scope + HttpClient.HttpClient | OtlpSerializer | Scope.Scope > = Effect.fnUntraced(function*(options) { const otelResource = yield* OtlpResource.fromConfig(options.resource) const scope: Scope = { @@ -59,7 +57,7 @@ export const make: ( headers: options.headers, exportInterval: options.exportInterval ?? Duration.seconds(5), maxBatchSize: options.maxBatchSize ?? 1000, - protocol: options.protocol, + kind: "traces", body(spans) { const data: TraceData = { resourceSpans: [{ @@ -72,17 +70,6 @@ export const make: ( } return data }, - bodyProtobuf(spans) { - return OtlpProtobuf.encodeTracesData({ - resourceSpans: [{ - resource: otelResource, - scopeSpans: [{ - scope, - spans - }] - }] - }) - }, shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3) }) @@ -134,8 +121,8 @@ export const layer = (options: { readonly maxBatchSize?: number | undefined readonly context?: ((f: () => X, span: Tracer.AnySpan) => X) | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined - readonly protocol?: OtlpProtocol | undefined -}): Layer.Layer => Layer.unwrapScoped(Effect.map(make(options), Layer.setTracer)) +}): Layer.Layer => + Layer.unwrapScoped(Effect.map(make(options), Layer.setTracer)) // internal diff --git a/packages/opentelemetry/src/index.ts b/packages/opentelemetry/src/index.ts index ee515958921..c61c3172769 100644 --- a/packages/opentelemetry/src/index.ts +++ b/packages/opentelemetry/src/index.ts @@ -33,6 +33,16 @@ export * as OtlpMetrics from "./OtlpMetrics.js" */ export * as OtlpResource from "./OtlpResource.js" +/** + * @since 1.0.0 + */ +export * as OtlpSerializer from "./OtlpSerializer.js" + +/** + * @since 1.0.0 + */ +export * as OtlpSerializerProtobuf from "./OtlpSerializerProtobuf.js" + /** * @since 1.0.0 */ diff --git a/packages/opentelemetry/src/internal/otlpExporter.ts b/packages/opentelemetry/src/internal/otlpExporter.ts index ecc20667ea5..43cb0a0bf96 100644 --- a/packages/opentelemetry/src/internal/otlpExporter.ts +++ b/packages/opentelemetry/src/internal/otlpExporter.ts @@ -10,12 +10,13 @@ import * as Num from "effect/Number" import * as Option from "effect/Option" import * as Schedule from "effect/Schedule" import * as Scope from "effect/Scope" +import { OtlpSerializer } from "../OtlpSerializer.js" /** - * OTLP protocol type for encoding + * OTLP telemetry kind for encoding * @internal */ -export type OtlpProtocol = "json" | "protobuf" +export type OtlpKind = "traces" | "metrics" | "logs" const policy = Schedule.forever.pipe( Schedule.passthrough, @@ -44,14 +45,13 @@ export const make: ( readonly exportInterval: Duration.DurationInput readonly maxBatchSize: number | "disabled" readonly body: (data: Array) => unknown - readonly bodyProtobuf?: ((data: Array) => Uint8Array) | undefined - readonly protocol?: OtlpProtocol | undefined + readonly kind: OtlpKind readonly shutdownTimeout: Duration.DurationInput } ) => Effect.Effect< { readonly push: (data: unknown) => void }, never, - HttpClient.HttpClient | Scope.Scope + HttpClient.HttpClient | OtlpSerializer | Scope.Scope > = Effect.fnUntraced(function*(options) { const clock = yield* Effect.clock const scope = yield* Effect.scope @@ -62,14 +62,16 @@ export const make: ( HttpClient.retryTransient({ schedule: policy, times: 3 }) ) - const protocol = options.protocol ?? "json" - const contentType = protocol === "protobuf" - ? "application/x-protobuf" - : "application/json" + const serializer = yield* OtlpSerializer + const encode = options.kind === "traces" + ? serializer.encodeTraces + : options.kind === "metrics" + ? serializer.encodeMetrics + : serializer.encodeLogs let headers = Headers.unsafeFromRecord({ "user-agent": `effect-opentelemetry-${options.label}/0.0.0`, - "content-type": contentType + "content-type": serializer.contentType }) if (options.headers) { headers = Headers.merge(Headers.fromInput(options.headers), headers) @@ -90,12 +92,14 @@ export const make: ( } buffer = [] } - const requestWithBody = protocol === "protobuf" && options.bodyProtobuf - ? HttpClientRequest.setBody( + const data = options.body(items) + const encoded = encode(data) + const requestWithBody = typeof encoded === "string" + ? HttpClientRequest.bodyUnsafeJson(request, data) + : HttpClientRequest.setBody( request, - HttpBody.uint8Array(options.bodyProtobuf(items), contentType) + HttpBody.uint8Array(encoded, serializer.contentType) ) - : HttpClientRequest.bodyUnsafeJson(request, options.body(items)) return client.execute(requestWithBody).pipe( Effect.asVoid, diff --git a/packages/opentelemetry/src/internal/otlpProtobuf.ts b/packages/opentelemetry/src/internal/otlpProtobuf.ts index de5eb8871cc..26d7f46e602 100644 --- a/packages/opentelemetry/src/internal/otlpProtobuf.ts +++ b/packages/opentelemetry/src/internal/otlpProtobuf.ts @@ -354,8 +354,7 @@ export const encodeResourceSpans = (resourceSpans: { */ export const encodeTracesData = (tracesData: { readonly resourceSpans: ReadonlyArray[0]> -}): Uint8Array => - Proto.repeatedField(1, tracesData.resourceSpans, encodeResourceSpans) +}): Uint8Array => Proto.repeatedField(1, tracesData.resourceSpans, encodeResourceSpans) // Metrics types (opentelemetry.proto.metrics.v1) @@ -434,15 +433,11 @@ export const encodeHistogramDataPoint = (point: { }): Uint8Array => { // Pack bucket counts as repeated fixed64 const bucketCountsEncoded = Proto.concat( - ...point.bucketCounts.map((count) => - Proto.fixed64Field(6, BigInt(count)) - ) + ...point.bucketCounts.map((count) => Proto.fixed64Field(6, BigInt(count))) ) // Pack explicit bounds as repeated double const explicitBoundsEncoded = Proto.concat( - ...point.explicitBounds.map((bound) => - Proto.doubleField(7, bound) - ) + ...point.explicitBounds.map((bound) => Proto.doubleField(7, bound)) ) return Proto.concat( Proto.fixed64Field(2, BigInt(point.startTimeUnixNano)), @@ -467,8 +462,7 @@ export const encodeHistogramDataPoint = (point: { */ export const encodeGauge = (gauge: { readonly dataPoints: ReadonlyArray[0]> -}): Uint8Array => - Proto.repeatedField(1, gauge.dataPoints, encodeNumberDataPoint) +}): Uint8Array => Proto.repeatedField(1, gauge.dataPoints, encodeNumberDataPoint) /** * Encodes a Sum message. @@ -595,8 +589,7 @@ export const encodeResourceMetrics = (resourceMetrics: { */ export const encodeMetricsData = (metricsData: { readonly resourceMetrics: ReadonlyArray[0]> -}): Uint8Array => - Proto.repeatedField(1, metricsData.resourceMetrics, encodeResourceMetrics) +}): Uint8Array => Proto.repeatedField(1, metricsData.resourceMetrics, encodeResourceMetrics) // Logs types (opentelemetry.proto.logs.v1) @@ -733,5 +726,4 @@ export const encodeResourceLogs = (resourceLogs: { */ export const encodeLogsData = (logsData: { readonly resourceLogs: ReadonlyArray[0]> -}): Uint8Array => - Proto.repeatedField(1, logsData.resourceLogs, encodeResourceLogs) +}): Uint8Array => Proto.repeatedField(1, logsData.resourceLogs, encodeResourceLogs) diff --git a/packages/opentelemetry/src/internal/protobuf.ts b/packages/opentelemetry/src/internal/protobuf.ts index 7d26b5dbe7f..aea5c9812ba 100644 --- a/packages/opentelemetry/src/internal/protobuf.ts +++ b/packages/opentelemetry/src/internal/protobuf.ts @@ -20,8 +20,7 @@ const enum WireType { /** * Encodes a field tag (field number + wire type) */ -const encodeTag = (fieldNumber: number, wireType: WireType): number => - (fieldNumber << 3) | wireType +const encodeTag = (fieldNumber: number, wireType: WireType): number => (fieldNumber << 3) | wireType /** * Encodes a varint (variable-length integer) @@ -79,8 +78,7 @@ export const encodeDouble = (value: number): Uint8Array => { /** * Encodes a string to UTF-8 bytes */ -export const encodeString = (value: string): Uint8Array => - new TextEncoder().encode(value) +export const encodeString = (value: string): Uint8Array => new TextEncoder().encode(value) /** * Encodes bytes as a hex string to Uint8Array @@ -130,8 +128,7 @@ export const sintField = (fieldNumber: number, value: number | bigint): Uint8Arr /** * Encodes a bool field */ -export const boolField = (fieldNumber: number, value: boolean): Uint8Array => - varintField(fieldNumber, value ? 1 : 0) +export const boolField = (fieldNumber: number, value: boolean): Uint8Array => varintField(fieldNumber, value ? 1 : 0) /** * Encodes a fixed64 field @@ -195,8 +192,7 @@ export const repeatedField = ( fieldNumber: number, values: ReadonlyArray, encode: (value: T) => Uint8Array -): Uint8Array => - concat(...values.map((v) => messageField(fieldNumber, encode(v)))) +): Uint8Array => concat(...values.map((v) => messageField(fieldNumber, encode(v)))) /** * Encodes repeated varint fields (not packed) @@ -204,8 +200,7 @@ export const repeatedField = ( export const repeatedVarintField = ( fieldNumber: number, values: ReadonlyArray -): Uint8Array => - concat(...values.map((v) => varintField(fieldNumber, v))) +): Uint8Array => concat(...values.map((v) => varintField(fieldNumber, v))) /** * Helper to conditionally encode an optional field @@ -213,8 +208,7 @@ export const repeatedVarintField = ( export const optionalField = ( value: T | undefined, encode: (v: T) => Uint8Array -): Uint8Array => - value !== undefined ? encode(value) : new Uint8Array(0) +): Uint8Array => value !== undefined ? encode(value) : new Uint8Array(0) /** * Helper to conditionally encode a string field if non-empty @@ -222,5 +216,4 @@ export const optionalField = ( export const optionalStringField = ( fieldNumber: number, value: string | undefined -): Uint8Array => - value !== undefined && value !== "" ? stringField(fieldNumber, value) : new Uint8Array(0) +): Uint8Array => value !== undefined && value !== "" ? stringField(fieldNumber, value) : new Uint8Array(0) diff --git a/packages/opentelemetry/test/Protobuf.test.ts b/packages/opentelemetry/test/Protobuf.test.ts index 2f91ceeb886..a99d50fef07 100644 --- a/packages/opentelemetry/test/Protobuf.test.ts +++ b/packages/opentelemetry/test/Protobuf.test.ts @@ -1,6 +1,9 @@ import { describe, expect, it } from "@effect/vitest" +import * as Effect from "effect/Effect" import * as Proto from "../src/internal/protobuf.js" import * as OtlpProtobuf from "../src/internal/otlpProtobuf.js" +import * as OtlpSerializer from "../src/OtlpSerializer.js" +import * as OtlpSerializerProtobuf from "../src/OtlpSerializerProtobuf.js" describe("Protobuf encoding", () => { describe("primitives", () => { @@ -335,4 +338,51 @@ describe("Protobuf encoding", () => { expect(result.length).toBe(9) // Max varint size for 64-bit }) }) + + describe("OtlpSerializer", () => { + const sampleTracesData = { + resourceSpans: [{ + resource: { + attributes: [{ key: "service.name", value: { stringValue: "test" } }], + droppedAttributesCount: 0 + }, + scopeSpans: [{ + scope: { name: "test-scope" }, + spans: [{ + traceId: "0123456789abcdef0123456789abcdef", + spanId: "0123456789abcdef", + name: "test-span", + kind: 1, + startTimeUnixNano: "1000000000000000000", + endTimeUnixNano: "2000000000000000000", + attributes: [], + droppedAttributesCount: 0, + events: [], + droppedEventsCount: 0, + links: [], + droppedLinksCount: 0, + status: { code: 1 } + }] + }] + }] + } + + it.effect("json serializer returns string", () => + Effect.gen(function*() { + const serializer = yield* OtlpSerializer.OtlpSerializer + expect(serializer.contentType).toBe("application/json") + const result = serializer.encodeTraces(sampleTracesData) + expect(typeof result).toBe("string") + expect(JSON.parse(result as string)).toEqual(sampleTracesData) + }).pipe(Effect.provide(OtlpSerializer.json))) + + it.effect("protobuf serializer returns Uint8Array", () => + Effect.gen(function*() { + const serializer = yield* OtlpSerializer.OtlpSerializer + expect(serializer.contentType).toBe("application/x-protobuf") + const result = serializer.encodeTraces(sampleTracesData) + expect(result).toBeInstanceOf(Uint8Array) + expect((result as Uint8Array).length).toBeGreaterThan(0) + }).pipe(Effect.provide(OtlpSerializerProtobuf.protobuf))) + }) }) diff --git a/packages/opentelemetry/test/SerializerOverride.test.ts b/packages/opentelemetry/test/SerializerOverride.test.ts new file mode 100644 index 00000000000..920a29a9cfe --- /dev/null +++ b/packages/opentelemetry/test/SerializerOverride.test.ts @@ -0,0 +1,53 @@ +import { describe, expect, it } from "@effect/vitest" +import * as Effect from "effect/Effect" +import * as Layer from "effect/Layer" +import * as Scope from "effect/Scope" +import * as OtlpSerializer from "../src/OtlpSerializer.js" +import * as OtlpSerializerProtobuf from "../src/OtlpSerializerProtobuf.js" + +describe("OtlpSerializer override behavior", () => { + it.effect("json layer provides json contentType", () => + Effect.gen(function*() { + const serializer = yield* OtlpSerializer.OtlpSerializer + expect(serializer.contentType).toBe("application/json") + }).pipe(Effect.provide(OtlpSerializer.json))) + + it.effect("protobuf layer provides protobuf contentType", () => + Effect.gen(function*() { + const serializer = yield* OtlpSerializer.OtlpSerializer + expect(serializer.contentType).toBe("application/x-protobuf") + }).pipe(Effect.provide(OtlpSerializerProtobuf.protobuf))) + + it.effect("layerWithSerializer pattern allows protobuf override", () => + Effect.gen(function*() { + // This simulates: Otlp.layerWithSerializer(...).pipe(Layer.provide(protobuf)) + // The layerWithSerializer does NOT provide json internally, so protobuf can be provided + + // Layer that requires OtlpSerializer (like layerWithSerializer) + const innerLayer = Layer.scopedDiscard( + Effect.gen(function*() { + const serializer = yield* OtlpSerializer.OtlpSerializer + // Protobuf should be used since we provide it externally + expect(serializer.contentType).toBe("application/x-protobuf") + }) + ) + + // Provide protobuf externally + const finalLayer = innerLayer.pipe(Layer.provide(OtlpSerializerProtobuf.protobuf)) + + yield* Layer.build(finalLayer).pipe(Scope.extend(yield* Effect.scope)) + }).pipe(Effect.scoped)) + + it.effect("layer pattern uses json by default", () => + Effect.gen(function*() { + // This simulates: Otlp.layer(...) which provides json internally + const innerLayer = Layer.scopedDiscard( + Effect.gen(function*() { + const serializer = yield* OtlpSerializer.OtlpSerializer + expect(serializer.contentType).toBe("application/json") + }) + ).pipe(Layer.provide(OtlpSerializer.json)) + + yield* Layer.build(innerLayer).pipe(Scope.extend(yield* Effect.scope)) + }).pipe(Effect.scoped)) +}) diff --git a/packages/opentelemetry/test/Tracer.test.ts b/packages/opentelemetry/test/Tracer.test.ts index 11fd074e1be..3df8e7c430f 100644 --- a/packages/opentelemetry/test/Tracer.test.ts +++ b/packages/opentelemetry/test/Tracer.test.ts @@ -1,4 +1,5 @@ import * as NodeSdk from "@effect/opentelemetry/NodeSdk" +import * as OtlpSerializer from "@effect/opentelemetry/OtlpSerializer" import * as OtlpTracer from "@effect/opentelemetry/OtlpTracer" import * as Tracer from "@effect/opentelemetry/Tracer" import { HttpClient } from "@effect/platform" @@ -137,7 +138,7 @@ describe("Tracer", () => { resource: { serviceName: "test-otlp" } - }).pipe(Layer.provide(MockHttpClient)) + }).pipe(Layer.provide(MockHttpClient), Layer.provide(OtlpSerializer.json)) it.effect("currentOtelSpan works with OTLP tracer", () => Effect.provide( From d05bf166de385c82da9d08d387176b91e5f2283b Mon Sep 17 00:00:00 2001 From: David Golightly Date: Tue, 20 Jan 2026 17:40:06 -0800 Subject: [PATCH 03/10] feat(opentelemetry): adopt OtlpSerialization design from effect-smol Simplifies the protobuf support API based on tim-smart's design from effect-smol. API changes: - Added Otlp.layerJson and Otlp.layerProtobuf for explicit serialization choice - Otlp.layer remains as backwards-compatible alias for layerJson - Removed layerWithSerializer (use layerJson/layerProtobuf instead) - Replaced OtlpSerializer/OtlpSerializerProtobuf with OtlpSerialization service - OtlpSerialization now returns HttpBody directly instead of Uint8Array | string Co-Authored-By: Claude Opus 4.5 --- .changeset/add-otlp-protobuf-support.md | 40 +++++---- packages/opentelemetry/src/Otlp.ts | 56 ++++++++---- packages/opentelemetry/src/OtlpLogger.ts | 6 +- packages/opentelemetry/src/OtlpMetrics.ts | 6 +- .../opentelemetry/src/OtlpSerialization.ts | 86 +++++++++++++++++++ packages/opentelemetry/src/OtlpSerializer.ts | 77 ----------------- .../src/OtlpSerializerProtobuf.ts | 40 --------- packages/opentelemetry/src/OtlpTracer.ts | 6 +- packages/opentelemetry/src/index.ts | 7 +- .../src/internal/otlpExporter.ts | 28 +++--- packages/opentelemetry/test/Protobuf.test.ts | 31 +++---- .../test/SerializerOverride.test.ts | 55 ++++++------ packages/opentelemetry/test/Tracer.test.ts | 4 +- 13 files changed, 210 insertions(+), 232 deletions(-) create mode 100644 packages/opentelemetry/src/OtlpSerialization.ts delete mode 100644 packages/opentelemetry/src/OtlpSerializer.ts delete mode 100644 packages/opentelemetry/src/OtlpSerializerProtobuf.ts diff --git a/.changeset/add-otlp-protobuf-support.md b/.changeset/add-otlp-protobuf-support.md index 7bdf999c9ed..c2383c935f1 100644 --- a/.changeset/add-otlp-protobuf-support.md +++ b/.changeset/add-otlp-protobuf-support.md @@ -2,41 +2,51 @@ "@effect/opentelemetry": minor --- -Add tree-shakable protobuf protocol support for OTLP exporters. +Add tree-shakable protobuf protocol support for OTLP exporters with simplified API. -This introduces an `OtlpSerializer` service that allows choosing between JSON (default) and Protocol Buffers binary encoding when exporting telemetry data to OpenTelemetry collectors. The design ensures protobuf code is only included in your bundle when you explicitly opt into it. +This introduces an `OtlpSerialization` service and simplified layer functions for choosing between JSON and Protocol Buffers encoding when exporting telemetry data. -**JSON encoding (default) - no changes required:** +**JSON encoding (default):** ```typescript import { Otlp } from "@effect/opentelemetry" -// Works exactly as before - protobuf code is tree-shaken away +// Option 1: Explicit JSON layer +const layer = Otlp.layerJson({ + baseUrl: "http://localhost:4318", + resource: { serviceName: "my-service" } +}) + +// Option 2: Use `layer` alias (backwards compatible) const layer = Otlp.layer({ baseUrl: "http://localhost:4318", resource: { serviceName: "my-service" } }) ``` -**Protobuf encoding - explicit opt-in:** +**Protobuf encoding:** ```typescript -import { Otlp, OtlpSerializerProtobuf } from "@effect/opentelemetry" -import * as Layer from "effect/Layer" +import { Otlp } from "@effect/opentelemetry" -// Use layerWithSerializer to control the serialization format -// Protobuf code is only included when you import OtlpSerializerProtobuf -const layer = Otlp.layerWithSerializer({ +// Simply use layerProtobuf for protobuf encoding +const layer = Otlp.layerProtobuf({ baseUrl: "http://localhost:4318", resource: { serviceName: "my-service" } -}).pipe(Layer.provide(OtlpSerializerProtobuf.protobuf)) +}) ``` **New exports:** -- `Otlp.layerWithSerializer` - OTLP layer that requires `OtlpSerializer` to be provided -- `OtlpSerializer` - Service definition and JSON layer (`OtlpSerializer.json`) -- `OtlpSerializerProtobuf` - Protobuf layer (`OtlpSerializerProtobuf.protobuf`) +- `Otlp.layerJson` - OTLP layer with JSON serialization +- `Otlp.layerProtobuf` - OTLP layer with Protobuf serialization +- `Otlp.layer` - Alias for `layerJson` (backwards compatible) +- `OtlpSerialization` - Service definition and layers (`layerJson`, `layerProtobuf`) + +**Breaking changes:** +- Removed `Otlp.layerWithSerializer` - use `Otlp.layerJson` or `Otlp.layerProtobuf` instead +- Removed `OtlpSerializer` module - use `OtlpSerialization` instead +- Removed `OtlpSerializerProtobuf` module - use `Otlp.layerProtobuf` or `OtlpSerialization.layerProtobuf` instead **Features:** - No new dependencies - protobuf encoding implemented from scratch -- Tree-shakable - protobuf code not in bundle unless explicitly imported +- Tree-shakable - protobuf code not in bundle unless `layerProtobuf` is used - Sets appropriate Content-Type header (`application/x-protobuf` vs `application/json`) - Follows opentelemetry-proto specifications diff --git a/packages/opentelemetry/src/Otlp.ts b/packages/opentelemetry/src/Otlp.ts index b944f25170a..cd4a1455029 100644 --- a/packages/opentelemetry/src/Otlp.ts +++ b/packages/opentelemetry/src/Otlp.ts @@ -10,7 +10,7 @@ import type * as Logger from "effect/Logger" import type * as Tracer from "effect/Tracer" import * as OtlpLogger from "./OtlpLogger.js" import * as OtlpMetrics from "./OtlpMetrics.js" -import * as OtlpSerializer from "./OtlpSerializer.js" +import * as OtlpSerialization from "./OtlpSerialization.js" import * as OtlpTracer from "./OtlpTracer.js" /** @@ -37,21 +37,9 @@ export interface OtlpLayerOptions { readonly shutdownTimeout?: Duration.DurationInput | undefined } -/** - * Creates an OTLP layer that requires an `OtlpSerializer` to be provided. - * - * Use this when you want to explicitly control the serialization format: - * - For JSON: `Otlp.layerWithSerializer(options).pipe(Layer.provide(OtlpSerializer.json))` - * - For Protobuf: `Otlp.layerWithSerializer(options).pipe(Layer.provide(OtlpSerializerProtobuf.protobuf))` - * - * For convenience, use `Otlp.layer` which provides JSON serialization by default. - * - * @since 1.0.0 - * @category Layers - */ -export const layerWithSerializer = ( +const makeLayer = ( options: OtlpLayerOptions -): Layer.Layer => { +): Layer.Layer => { const baseReq = HttpClientRequest.get(options.baseUrl) const url = (path: string) => HttpClientRequest.appendUrl(baseReq, path).url return Layer.mergeAll( @@ -84,11 +72,42 @@ export const layerWithSerializer = ( ) } +/** + * Creates an OTLP layer with JSON serialization. + * + * @example + * ```typescript + * import { Otlp } from "@effect/opentelemetry" + * + * const layer = Otlp.layerJson({ baseUrl: "http://localhost:4318" }) + * ``` + * + * @since 1.0.0 + * @category Layers + */ +export const layerJson = (options: OtlpLayerOptions): Layer.Layer => + makeLayer(options).pipe(Layer.provide(OtlpSerialization.layerJson)) + +/** + * Creates an OTLP layer with Protobuf serialization. + * + * @example + * ```typescript + * import { Otlp } from "@effect/opentelemetry" + * + * const layer = Otlp.layerProtobuf({ baseUrl: "http://localhost:4318" }) + * ``` + * + * @since 1.0.0 + * @category Layers + */ +export const layerProtobuf = (options: OtlpLayerOptions): Layer.Layer => + makeLayer(options).pipe(Layer.provide(OtlpSerialization.layerProtobuf)) + /** * Creates an OTLP layer with JSON serialization (default). * - * This is the recommended way to create an OTLP layer for most use cases. - * For protobuf encoding, use `layerWithSerializer` with `OtlpSerializerProtobuf.protobuf`. + * This is an alias for `layerJson` for backwards compatibility. * * @example * ```typescript @@ -100,5 +119,4 @@ export const layerWithSerializer = ( * @since 1.0.0 * @category Layers */ -export const layer = (options: OtlpLayerOptions): Layer.Layer => - layerWithSerializer(options).pipe(Layer.provide(OtlpSerializer.json)) +export const layer = layerJson diff --git a/packages/opentelemetry/src/OtlpLogger.ts b/packages/opentelemetry/src/OtlpLogger.ts index 3a322c77797..3a9e51d563e 100644 --- a/packages/opentelemetry/src/OtlpLogger.ts +++ b/packages/opentelemetry/src/OtlpLogger.ts @@ -20,7 +20,7 @@ import * as Tracer from "effect/Tracer" import * as Exporter from "./internal/otlpExporter.js" import type { AnyValue, Fixed64, KeyValue, Resource } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" -import type { OtlpSerializer } from "./OtlpSerializer.js" +import type { OtlpSerialization } from "./OtlpSerialization.js" /** * @since 1.0.0 @@ -43,7 +43,7 @@ export const make: ( ) => Effect.Effect< Logger.Logger, never, - HttpClient.HttpClient | OtlpSerializer | Scope.Scope + HttpClient.HttpClient | OtlpSerialization | Scope.Scope > = Effect.fnUntraced(function*(options) { const otelResource = yield* OtlpResource.fromConfig(options.resource) const scope: IInstrumentationScope = { @@ -94,7 +94,7 @@ export const layer = (options: { readonly maxBatchSize?: number | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined readonly excludeLogSpans?: boolean | undefined -}): Layer.Layer => +}): Layer.Layer => options.replaceLogger ? Logger.replaceScoped(options.replaceLogger, make(options)) : Logger.addScoped(make(options)) // internal diff --git a/packages/opentelemetry/src/OtlpMetrics.ts b/packages/opentelemetry/src/OtlpMetrics.ts index e8f2a4881d2..a3b6bc93680 100644 --- a/packages/opentelemetry/src/OtlpMetrics.ts +++ b/packages/opentelemetry/src/OtlpMetrics.ts @@ -15,7 +15,7 @@ import type * as Scope from "effect/Scope" import * as Exporter from "./internal/otlpExporter.js" import type { Fixed64, KeyValue } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" -import type { OtlpSerializer } from "./OtlpSerializer.js" +import type { OtlpSerialization } from "./OtlpSerialization.js" /** * @since 1.0.0 @@ -34,7 +34,7 @@ export const make: (options: { }) => Effect.Effect< void, never, - HttpClient.HttpClient | OtlpSerializer | Scope.Scope + HttpClient.HttpClient | OtlpSerialization | Scope.Scope > = Effect.fnUntraced(function*(options) { const clock = yield* Effect.clock const startTime = String(clock.unsafeCurrentTimeNanos()) @@ -292,7 +292,7 @@ export const layer = (options: { readonly headers?: Headers.Input | undefined readonly exportInterval?: Duration.DurationInput | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined -}): Layer.Layer => Layer.scopedDiscard(make(options)) +}): Layer.Layer => Layer.scopedDiscard(make(options)) // internal diff --git a/packages/opentelemetry/src/OtlpSerialization.ts b/packages/opentelemetry/src/OtlpSerialization.ts new file mode 100644 index 00000000000..789968b3a73 --- /dev/null +++ b/packages/opentelemetry/src/OtlpSerialization.ts @@ -0,0 +1,86 @@ +/** + * OtlpSerialization service for tree-shakable protobuf support. + * + * This module provides the `OtlpSerialization` service that abstracts the + * encoding of OTLP telemetry data to HttpBody. By default, the JSON serializer is used. + * To use protobuf encoding, use `Otlp.layerProtobuf` or provide the `layerProtobuf` layer. + * + * @example + * ```typescript + * import { Otlp } from "@effect/opentelemetry" + * + * // JSON encoding (default) + * const jsonLayer = Otlp.layerJson({ baseUrl: "http://localhost:4318" }) + * + * // Protobuf encoding + * const protobufLayer = Otlp.layerProtobuf({ baseUrl: "http://localhost:4318" }) + * ``` + * + * @since 1.0.0 + */ +import * as HttpBody from "@effect/platform/HttpBody" +import * as Context from "effect/Context" +import * as Layer from "effect/Layer" +import * as OtlpProtobuf from "./internal/otlpProtobuf.js" + +/** + * Service interface for serializing OTLP telemetry data to HttpBody. + * + * @since 1.0.0 + * @category Models + */ +export interface OtlpSerialization { + /** + * Encodes trace data for transmission. + */ + readonly traces: (data: unknown) => HttpBody.HttpBody + /** + * Encodes metrics data for transmission. + */ + readonly metrics: (data: unknown) => HttpBody.HttpBody + /** + * Encodes logs data for transmission. + */ + readonly logs: (data: unknown) => HttpBody.HttpBody +} + +/** + * Tag for the OtlpSerialization service. + * + * @since 1.0.0 + * @category Tags + */ +export const OtlpSerialization: Context.Tag = Context.GenericTag< + OtlpSerialization +>("@effect/opentelemetry/OtlpSerialization") + +/** + * JSON serializer layer for OTLP telemetry data. + * + * This is the default serializer used by OTLP exporters. It encodes + * telemetry data as JSON with `application/json` content type. + * + * @since 1.0.0 + * @category Layers + */ +export const layerJson: Layer.Layer = Layer.succeed(OtlpSerialization, { + traces: (data) => HttpBody.unsafeJson(data), + metrics: (data) => HttpBody.unsafeJson(data), + logs: (data) => HttpBody.unsafeJson(data) +}) + +/** + * Protobuf serializer layer for OTLP telemetry data. + * + * This serializer encodes telemetry data using Protocol Buffers binary + * format with `application/x-protobuf` content type. It provides more + * efficient wire format compared to JSON. + * + * @since 1.0.0 + * @category Layers + */ +export const layerProtobuf: Layer.Layer = Layer.succeed(OtlpSerialization, { + traces: (data) => HttpBody.uint8Array(OtlpProtobuf.encodeTracesData(data as any), "application/x-protobuf"), + metrics: (data) => HttpBody.uint8Array(OtlpProtobuf.encodeMetricsData(data as any), "application/x-protobuf"), + logs: (data) => HttpBody.uint8Array(OtlpProtobuf.encodeLogsData(data as any), "application/x-protobuf") +}) diff --git a/packages/opentelemetry/src/OtlpSerializer.ts b/packages/opentelemetry/src/OtlpSerializer.ts deleted file mode 100644 index bf07b789c00..00000000000 --- a/packages/opentelemetry/src/OtlpSerializer.ts +++ /dev/null @@ -1,77 +0,0 @@ -/** - * OtlpSerializer service for tree-shakable protobuf support. - * - * This module provides the `OtlpSerializer` service that abstracts the - * encoding of OTLP telemetry data. By default, the JSON serializer is used. - * To use protobuf encoding, provide the `OtlpSerializerProtobuf.protobuf` layer. - * - * @example - * ```typescript - * import { Otlp } from "@effect/opentelemetry" - * - * // JSON encoding (default) - protobuf code is tree-shaken away - * const jsonLayer = Otlp.layer({ baseUrl: "http://localhost:4318" }) - * - * // Protobuf encoding - explicitly import to include in bundle - * import { OtlpSerializerProtobuf } from "@effect/opentelemetry" - * - * const protobufLayer = Otlp.layer({ baseUrl: "http://localhost:4318" }).pipe( - * Layer.provide(OtlpSerializerProtobuf.protobuf) - * ) - * ``` - * - * @since 1.0.0 - */ -import * as Context from "effect/Context" -import * as Layer from "effect/Layer" - -/** - * Service interface for serializing OTLP telemetry data. - * - * @since 1.0.0 - * @category Models - */ -export interface OtlpSerializer { - /** - * The content type header to use for HTTP requests. - */ - readonly contentType: string - /** - * Encodes trace data for transmission. - */ - readonly encodeTraces: (data: unknown) => Uint8Array | string - /** - * Encodes metrics data for transmission. - */ - readonly encodeMetrics: (data: unknown) => Uint8Array | string - /** - * Encodes logs data for transmission. - */ - readonly encodeLogs: (data: unknown) => Uint8Array | string -} - -/** - * Tag for the OtlpSerializer service. - * - * @since 1.0.0 - * @category Tags - */ -export const OtlpSerializer: Context.Tag = Context.GenericTag( - "@effect/opentelemetry/OtlpSerializer" -) - -/** - * JSON serializer layer for OTLP telemetry data. - * - * This is the default serializer used by OTLP exporters. It encodes - * telemetry data as JSON strings with `application/json` content type. - * - * @since 1.0.0 - * @category Layers - */ -export const json: Layer.Layer = Layer.succeed(OtlpSerializer, { - contentType: "application/json", - encodeTraces: (data) => JSON.stringify(data), - encodeMetrics: (data) => JSON.stringify(data), - encodeLogs: (data) => JSON.stringify(data) -}) diff --git a/packages/opentelemetry/src/OtlpSerializerProtobuf.ts b/packages/opentelemetry/src/OtlpSerializerProtobuf.ts deleted file mode 100644 index cc3c5a103a9..00000000000 --- a/packages/opentelemetry/src/OtlpSerializerProtobuf.ts +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Protobuf serializer for OTLP telemetry data. - * - * This module provides the protobuf-based serializer for OTLP exporters. - * Import this module only when you need protobuf encoding - it will bring - * the protobuf encoding code into your bundle. - * - * @example - * ```typescript - * import { Otlp, OtlpSerializerProtobuf } from "@effect/opentelemetry" - * import * as Layer from "effect/Layer" - * - * // Use protobuf encoding for more efficient wire format - * const layer = Otlp.layer({ baseUrl: "http://localhost:4318" }).pipe( - * Layer.provide(OtlpSerializerProtobuf.protobuf) - * ) - * ``` - * - * @since 1.0.0 - */ -import * as Layer from "effect/Layer" -import * as OtlpProtobuf from "./internal/otlpProtobuf.js" -import { OtlpSerializer } from "./OtlpSerializer.js" - -/** - * Protobuf serializer layer for OTLP telemetry data. - * - * This serializer encodes telemetry data using Protocol Buffers binary - * format with `application/x-protobuf` content type. It provides more - * efficient wire format compared to JSON. - * - * @since 1.0.0 - * @category Layers - */ -export const protobuf: Layer.Layer = Layer.succeed(OtlpSerializer, { - contentType: "application/x-protobuf", - encodeTraces: (data: any) => OtlpProtobuf.encodeTracesData(data), - encodeMetrics: (data: any) => OtlpProtobuf.encodeMetricsData(data), - encodeLogs: (data: any) => OtlpProtobuf.encodeLogsData(data) -}) diff --git a/packages/opentelemetry/src/OtlpTracer.ts b/packages/opentelemetry/src/OtlpTracer.ts index 900a5e15c6d..47d228e43ed 100644 --- a/packages/opentelemetry/src/OtlpTracer.ts +++ b/packages/opentelemetry/src/OtlpTracer.ts @@ -17,7 +17,7 @@ import * as Exporter from "./internal/otlpExporter.js" import type { KeyValue, Resource } from "./OtlpResource.js" import { entriesToAttributes } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" -import type { OtlpSerializer } from "./OtlpSerializer.js" +import type { OtlpSerialization } from "./OtlpSerialization.js" const ATTR_EXCEPTION_TYPE = "exception.type" const ATTR_EXCEPTION_MESSAGE = "exception.message" @@ -44,7 +44,7 @@ export const make: ( ) => Effect.Effect< Tracer.Tracer, never, - HttpClient.HttpClient | OtlpSerializer | Scope.Scope + HttpClient.HttpClient | OtlpSerialization | Scope.Scope > = Effect.fnUntraced(function*(options) { const otelResource = yield* OtlpResource.fromConfig(options.resource) const scope: Scope = { @@ -121,7 +121,7 @@ export const layer = (options: { readonly maxBatchSize?: number | undefined readonly context?: ((f: () => X, span: Tracer.AnySpan) => X) | undefined readonly shutdownTimeout?: Duration.DurationInput | undefined -}): Layer.Layer => +}): Layer.Layer => Layer.unwrapScoped(Effect.map(make(options), Layer.setTracer)) // internal diff --git a/packages/opentelemetry/src/index.ts b/packages/opentelemetry/src/index.ts index c61c3172769..89c7091eda6 100644 --- a/packages/opentelemetry/src/index.ts +++ b/packages/opentelemetry/src/index.ts @@ -36,12 +36,7 @@ export * as OtlpResource from "./OtlpResource.js" /** * @since 1.0.0 */ -export * as OtlpSerializer from "./OtlpSerializer.js" - -/** - * @since 1.0.0 - */ -export * as OtlpSerializerProtobuf from "./OtlpSerializerProtobuf.js" +export * as OtlpSerialization from "./OtlpSerialization.js" /** * @since 1.0.0 diff --git a/packages/opentelemetry/src/internal/otlpExporter.ts b/packages/opentelemetry/src/internal/otlpExporter.ts index 43cb0a0bf96..d6693571ff5 100644 --- a/packages/opentelemetry/src/internal/otlpExporter.ts +++ b/packages/opentelemetry/src/internal/otlpExporter.ts @@ -1,5 +1,5 @@ import * as Headers from "@effect/platform/Headers" -import * as HttpBody from "@effect/platform/HttpBody" +import type * as HttpBody from "@effect/platform/HttpBody" import * as HttpClient from "@effect/platform/HttpClient" import * as HttpClientError from "@effect/platform/HttpClientError" import * as HttpClientRequest from "@effect/platform/HttpClientRequest" @@ -10,7 +10,7 @@ import * as Num from "effect/Number" import * as Option from "effect/Option" import * as Schedule from "effect/Schedule" import * as Scope from "effect/Scope" -import { OtlpSerializer } from "../OtlpSerializer.js" +import { OtlpSerialization } from "../OtlpSerialization.js" /** * OTLP telemetry kind for encoding @@ -51,7 +51,7 @@ export const make: ( ) => Effect.Effect< { readonly push: (data: unknown) => void }, never, - HttpClient.HttpClient | OtlpSerializer | Scope.Scope + HttpClient.HttpClient | OtlpSerialization | Scope.Scope > = Effect.fnUntraced(function*(options) { const clock = yield* Effect.clock const scope = yield* Effect.scope @@ -62,16 +62,15 @@ export const make: ( HttpClient.retryTransient({ schedule: policy, times: 3 }) ) - const serializer = yield* OtlpSerializer - const encode = options.kind === "traces" - ? serializer.encodeTraces + const serialization = yield* OtlpSerialization + const encode: (data: unknown) => HttpBody.HttpBody = options.kind === "traces" + ? serialization.traces : options.kind === "metrics" - ? serializer.encodeMetrics - : serializer.encodeLogs + ? serialization.metrics + : serialization.logs let headers = Headers.unsafeFromRecord({ - "user-agent": `effect-opentelemetry-${options.label}/0.0.0`, - "content-type": serializer.contentType + "user-agent": `effect-opentelemetry-${options.label}/0.0.0` }) if (options.headers) { headers = Headers.merge(Headers.fromInput(options.headers), headers) @@ -93,13 +92,8 @@ export const make: ( buffer = [] } const data = options.body(items) - const encoded = encode(data) - const requestWithBody = typeof encoded === "string" - ? HttpClientRequest.bodyUnsafeJson(request, data) - : HttpClientRequest.setBody( - request, - HttpBody.uint8Array(encoded, serializer.contentType) - ) + const body = encode(data) + const requestWithBody = HttpClientRequest.setBody(request, body) return client.execute(requestWithBody).pipe( Effect.asVoid, diff --git a/packages/opentelemetry/test/Protobuf.test.ts b/packages/opentelemetry/test/Protobuf.test.ts index a99d50fef07..6e10af8f5de 100644 --- a/packages/opentelemetry/test/Protobuf.test.ts +++ b/packages/opentelemetry/test/Protobuf.test.ts @@ -2,8 +2,7 @@ import { describe, expect, it } from "@effect/vitest" import * as Effect from "effect/Effect" import * as Proto from "../src/internal/protobuf.js" import * as OtlpProtobuf from "../src/internal/otlpProtobuf.js" -import * as OtlpSerializer from "../src/OtlpSerializer.js" -import * as OtlpSerializerProtobuf from "../src/OtlpSerializerProtobuf.js" +import * as OtlpSerialization from "../src/OtlpSerialization.js" describe("Protobuf encoding", () => { describe("primitives", () => { @@ -339,7 +338,7 @@ describe("Protobuf encoding", () => { }) }) - describe("OtlpSerializer", () => { + describe("OtlpSerialization", () => { const sampleTracesData = { resourceSpans: [{ resource: { @@ -367,22 +366,18 @@ describe("Protobuf encoding", () => { }] } - it.effect("json serializer returns string", () => + it.effect("json serializer returns HttpBody with json content type", () => Effect.gen(function*() { - const serializer = yield* OtlpSerializer.OtlpSerializer - expect(serializer.contentType).toBe("application/json") - const result = serializer.encodeTraces(sampleTracesData) - expect(typeof result).toBe("string") - expect(JSON.parse(result as string)).toEqual(sampleTracesData) - }).pipe(Effect.provide(OtlpSerializer.json))) - - it.effect("protobuf serializer returns Uint8Array", () => + const serialization = yield* OtlpSerialization.OtlpSerialization + const body = serialization.traces(sampleTracesData) + expect(body.contentType).toBe("application/json") + }).pipe(Effect.provide(OtlpSerialization.layerJson))) + + it.effect("protobuf serializer returns HttpBody with protobuf content type", () => Effect.gen(function*() { - const serializer = yield* OtlpSerializer.OtlpSerializer - expect(serializer.contentType).toBe("application/x-protobuf") - const result = serializer.encodeTraces(sampleTracesData) - expect(result).toBeInstanceOf(Uint8Array) - expect((result as Uint8Array).length).toBeGreaterThan(0) - }).pipe(Effect.provide(OtlpSerializerProtobuf.protobuf))) + const serialization = yield* OtlpSerialization.OtlpSerialization + const body = serialization.traces(sampleTracesData) + expect(body.contentType).toBe("application/x-protobuf") + }).pipe(Effect.provide(OtlpSerialization.layerProtobuf))) }) }) diff --git a/packages/opentelemetry/test/SerializerOverride.test.ts b/packages/opentelemetry/test/SerializerOverride.test.ts index 920a29a9cfe..76749846f4f 100644 --- a/packages/opentelemetry/test/SerializerOverride.test.ts +++ b/packages/opentelemetry/test/SerializerOverride.test.ts @@ -2,52 +2,49 @@ import { describe, expect, it } from "@effect/vitest" import * as Effect from "effect/Effect" import * as Layer from "effect/Layer" import * as Scope from "effect/Scope" -import * as OtlpSerializer from "../src/OtlpSerializer.js" -import * as OtlpSerializerProtobuf from "../src/OtlpSerializerProtobuf.js" +import * as OtlpSerialization from "../src/OtlpSerialization.js" -describe("OtlpSerializer override behavior", () => { - it.effect("json layer provides json contentType", () => +describe("OtlpSerialization override behavior", () => { + it.effect("json layer provides json HttpBody", () => Effect.gen(function*() { - const serializer = yield* OtlpSerializer.OtlpSerializer - expect(serializer.contentType).toBe("application/json") - }).pipe(Effect.provide(OtlpSerializer.json))) + const serialization = yield* OtlpSerialization.OtlpSerialization + const body = serialization.traces({ test: "data" }) + expect(body.contentType).toBe("application/json") + }).pipe(Effect.provide(OtlpSerialization.layerJson))) - it.effect("protobuf layer provides protobuf contentType", () => + it.effect("protobuf layer provides protobuf HttpBody", () => Effect.gen(function*() { - const serializer = yield* OtlpSerializer.OtlpSerializer - expect(serializer.contentType).toBe("application/x-protobuf") - }).pipe(Effect.provide(OtlpSerializerProtobuf.protobuf))) + const serialization = yield* OtlpSerialization.OtlpSerialization + const body = serialization.traces({ resourceSpans: [] }) + expect(body.contentType).toBe("application/x-protobuf") + }).pipe(Effect.provide(OtlpSerialization.layerProtobuf))) - it.effect("layerWithSerializer pattern allows protobuf override", () => + it.effect("custom layer can be provided", () => Effect.gen(function*() { - // This simulates: Otlp.layerWithSerializer(...).pipe(Layer.provide(protobuf)) - // The layerWithSerializer does NOT provide json internally, so protobuf can be provided + // This simulates: makeLayer(...).pipe(Layer.provide(customSerialization)) + // The makeLayer does NOT provide json internally, so custom can be provided - // Layer that requires OtlpSerializer (like layerWithSerializer) + // Layer that requires OtlpSerialization (like makeLayer) const innerLayer = Layer.scopedDiscard( Effect.gen(function*() { - const serializer = yield* OtlpSerializer.OtlpSerializer + const serialization = yield* OtlpSerialization.OtlpSerialization // Protobuf should be used since we provide it externally - expect(serializer.contentType).toBe("application/x-protobuf") + const body = serialization.traces({ resourceSpans: [] }) + expect(body.contentType).toBe("application/x-protobuf") }) ) // Provide protobuf externally - const finalLayer = innerLayer.pipe(Layer.provide(OtlpSerializerProtobuf.protobuf)) + const finalLayer = innerLayer.pipe(Layer.provide(OtlpSerialization.layerProtobuf)) yield* Layer.build(finalLayer).pipe(Scope.extend(yield* Effect.scope)) }).pipe(Effect.scoped)) - it.effect("layer pattern uses json by default", () => + it.effect("json layer produces valid HttpBody", () => Effect.gen(function*() { - // This simulates: Otlp.layer(...) which provides json internally - const innerLayer = Layer.scopedDiscard( - Effect.gen(function*() { - const serializer = yield* OtlpSerializer.OtlpSerializer - expect(serializer.contentType).toBe("application/json") - }) - ).pipe(Layer.provide(OtlpSerializer.json)) - - yield* Layer.build(innerLayer).pipe(Scope.extend(yield* Effect.scope)) - }).pipe(Effect.scoped)) + const serialization = yield* OtlpSerialization.OtlpSerialization + const body = serialization.traces({ test: "data" }) + expect(body._tag).toBe("Uint8Array") + expect(body.contentType).toBe("application/json") + }).pipe(Effect.provide(OtlpSerialization.layerJson))) }) diff --git a/packages/opentelemetry/test/Tracer.test.ts b/packages/opentelemetry/test/Tracer.test.ts index 3df8e7c430f..62a4f8629c6 100644 --- a/packages/opentelemetry/test/Tracer.test.ts +++ b/packages/opentelemetry/test/Tracer.test.ts @@ -1,5 +1,5 @@ import * as NodeSdk from "@effect/opentelemetry/NodeSdk" -import * as OtlpSerializer from "@effect/opentelemetry/OtlpSerializer" +import * as OtlpSerialization from "@effect/opentelemetry/OtlpSerialization" import * as OtlpTracer from "@effect/opentelemetry/OtlpTracer" import * as Tracer from "@effect/opentelemetry/Tracer" import { HttpClient } from "@effect/platform" @@ -138,7 +138,7 @@ describe("Tracer", () => { resource: { serviceName: "test-otlp" } - }).pipe(Layer.provide(MockHttpClient), Layer.provide(OtlpSerializer.json)) + }).pipe(Layer.provide(MockHttpClient), Layer.provide(OtlpSerialization.layerJson)) it.effect("currentOtelSpan works with OTLP tracer", () => Effect.provide( From ab080b0fa0754373f698e01a7a477ee7eee78f7e Mon Sep 17 00:00:00 2001 From: David Golightly Date: Tue, 20 Jan 2026 18:03:58 -0800 Subject: [PATCH 04/10] refactor(opentelemetry): address PR feedback - Use class syntax with type ID for OtlpSerialization (Context.Tag pattern) - Remove OtlpKind from otlpExporter - exporter doesn't need to know data type - Lift encode function selection to individual services (OtlpTracer, OtlpMetrics, OtlpLogger) - Exporter now receives encode function directly via options Co-Authored-By: Claude Opus 4.5 --- packages/opentelemetry/src/OtlpLogger.ts | 5 +++-- packages/opentelemetry/src/OtlpMetrics.ts | 5 +++-- .../opentelemetry/src/OtlpSerialization.ts | 9 +++++---- packages/opentelemetry/src/OtlpTracer.ts | 5 +++-- .../src/internal/otlpExporter.ts | 20 +++---------------- 5 files changed, 17 insertions(+), 27 deletions(-) diff --git a/packages/opentelemetry/src/OtlpLogger.ts b/packages/opentelemetry/src/OtlpLogger.ts index 3a9e51d563e..3655df6b908 100644 --- a/packages/opentelemetry/src/OtlpLogger.ts +++ b/packages/opentelemetry/src/OtlpLogger.ts @@ -20,7 +20,7 @@ import * as Tracer from "effect/Tracer" import * as Exporter from "./internal/otlpExporter.js" import type { AnyValue, Fixed64, KeyValue, Resource } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" -import type { OtlpSerialization } from "./OtlpSerialization.js" +import { OtlpSerialization } from "./OtlpSerialization.js" /** * @since 1.0.0 @@ -49,6 +49,7 @@ export const make: ( const scope: IInstrumentationScope = { name: OtlpResource.unsafeServiceName(otelResource) } + const serialization = yield* OtlpSerialization const exporter = yield* Exporter.make({ label: "OtlpLogger", @@ -56,7 +57,7 @@ export const make: ( headers: options.headers, maxBatchSize: options.maxBatchSize ?? 1000, exportInterval: options.exportInterval ?? Duration.seconds(1), - kind: "logs", + encode: serialization.logs, body: (data): IExportLogsServiceRequest => ({ resourceLogs: [{ resource: otelResource, diff --git a/packages/opentelemetry/src/OtlpMetrics.ts b/packages/opentelemetry/src/OtlpMetrics.ts index a3b6bc93680..f2d251ed64d 100644 --- a/packages/opentelemetry/src/OtlpMetrics.ts +++ b/packages/opentelemetry/src/OtlpMetrics.ts @@ -15,7 +15,7 @@ import type * as Scope from "effect/Scope" import * as Exporter from "./internal/otlpExporter.js" import type { Fixed64, KeyValue } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" -import type { OtlpSerialization } from "./OtlpSerialization.js" +import { OtlpSerialization } from "./OtlpSerialization.js" /** * @since 1.0.0 @@ -43,6 +43,7 @@ export const make: (options: { const metricsScope: IInstrumentationScope = { name: OtlpResource.unsafeServiceName(resource) } + const serialization = yield* OtlpSerialization const snapshot = (): IExportMetricsServiceRequest => { const snapshot = Metric.unsafeSnapshot() @@ -272,7 +273,7 @@ export const make: (options: { headers: options.headers, maxBatchSize: "disabled", exportInterval: options.exportInterval ?? Duration.seconds(10), - kind: "metrics", + encode: serialization.metrics, body: snapshot, shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3) }) diff --git a/packages/opentelemetry/src/OtlpSerialization.ts b/packages/opentelemetry/src/OtlpSerialization.ts index 789968b3a73..dca2fcb16fd 100644 --- a/packages/opentelemetry/src/OtlpSerialization.ts +++ b/packages/opentelemetry/src/OtlpSerialization.ts @@ -29,7 +29,7 @@ import * as OtlpProtobuf from "./internal/otlpProtobuf.js" * @since 1.0.0 * @category Models */ -export interface OtlpSerialization { +export interface Service { /** * Encodes trace data for transmission. */ @@ -50,9 +50,10 @@ export interface OtlpSerialization { * @since 1.0.0 * @category Tags */ -export const OtlpSerialization: Context.Tag = Context.GenericTag< - OtlpSerialization ->("@effect/opentelemetry/OtlpSerialization") +export class OtlpSerialization extends Context.Tag("@effect/opentelemetry/OtlpSerialization")< + OtlpSerialization, + Service +>() {} /** * JSON serializer layer for OTLP telemetry data. diff --git a/packages/opentelemetry/src/OtlpTracer.ts b/packages/opentelemetry/src/OtlpTracer.ts index 47d228e43ed..efcf1ce7d39 100644 --- a/packages/opentelemetry/src/OtlpTracer.ts +++ b/packages/opentelemetry/src/OtlpTracer.ts @@ -17,7 +17,7 @@ import * as Exporter from "./internal/otlpExporter.js" import type { KeyValue, Resource } from "./OtlpResource.js" import { entriesToAttributes } from "./OtlpResource.js" import * as OtlpResource from "./OtlpResource.js" -import type { OtlpSerialization } from "./OtlpSerialization.js" +import { OtlpSerialization } from "./OtlpSerialization.js" const ATTR_EXCEPTION_TYPE = "exception.type" const ATTR_EXCEPTION_MESSAGE = "exception.message" @@ -50,6 +50,7 @@ export const make: ( const scope: Scope = { name: OtlpResource.unsafeServiceName(otelResource) } + const serialization = yield* OtlpSerialization const exporter = yield* Exporter.make({ label: "OtlpTracer", @@ -57,7 +58,7 @@ export const make: ( headers: options.headers, exportInterval: options.exportInterval ?? Duration.seconds(5), maxBatchSize: options.maxBatchSize ?? 1000, - kind: "traces", + encode: serialization.traces, body(spans) { const data: TraceData = { resourceSpans: [{ diff --git a/packages/opentelemetry/src/internal/otlpExporter.ts b/packages/opentelemetry/src/internal/otlpExporter.ts index d6693571ff5..53bae407d1c 100644 --- a/packages/opentelemetry/src/internal/otlpExporter.ts +++ b/packages/opentelemetry/src/internal/otlpExporter.ts @@ -10,13 +10,6 @@ import * as Num from "effect/Number" import * as Option from "effect/Option" import * as Schedule from "effect/Schedule" import * as Scope from "effect/Scope" -import { OtlpSerialization } from "../OtlpSerialization.js" - -/** - * OTLP telemetry kind for encoding - * @internal - */ -export type OtlpKind = "traces" | "metrics" | "logs" const policy = Schedule.forever.pipe( Schedule.passthrough, @@ -45,13 +38,13 @@ export const make: ( readonly exportInterval: Duration.DurationInput readonly maxBatchSize: number | "disabled" readonly body: (data: Array) => unknown - readonly kind: OtlpKind + readonly encode: (data: unknown) => HttpBody.HttpBody readonly shutdownTimeout: Duration.DurationInput } ) => Effect.Effect< { readonly push: (data: unknown) => void }, never, - HttpClient.HttpClient | OtlpSerialization | Scope.Scope + HttpClient.HttpClient | Scope.Scope > = Effect.fnUntraced(function*(options) { const clock = yield* Effect.clock const scope = yield* Effect.scope @@ -62,13 +55,6 @@ export const make: ( HttpClient.retryTransient({ schedule: policy, times: 3 }) ) - const serialization = yield* OtlpSerialization - const encode: (data: unknown) => HttpBody.HttpBody = options.kind === "traces" - ? serialization.traces - : options.kind === "metrics" - ? serialization.metrics - : serialization.logs - let headers = Headers.unsafeFromRecord({ "user-agent": `effect-opentelemetry-${options.label}/0.0.0` }) @@ -92,7 +78,7 @@ export const make: ( buffer = [] } const data = options.body(items) - const body = encode(data) + const body = options.encode(data) const requestWithBody = HttpClientRequest.setBody(request, body) return client.execute(requestWithBody).pipe( From 9069886726c977c87f3493a3e29621759f3c2282 Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Wed, 21 Jan 2026 15:34:38 +1300 Subject: [PATCH 05/10] further effect 4 alignment --- .../opentelemetry/examples/native-exporter.ts | 2 +- packages/opentelemetry/src/Otlp.ts | 110 +++++++++--------- packages/opentelemetry/src/OtlpLogger.ts | 20 ++-- packages/opentelemetry/src/OtlpMetrics.ts | 7 +- .../opentelemetry/src/OtlpSerialization.ts | 50 +++----- packages/opentelemetry/src/OtlpTracer.ts | 3 +- .../src/internal/otlpExporter.ts | 6 +- packages/opentelemetry/test/Protobuf.test.ts | 2 +- 8 files changed, 89 insertions(+), 111 deletions(-) diff --git a/packages/opentelemetry/examples/native-exporter.ts b/packages/opentelemetry/examples/native-exporter.ts index 18ed9fec637..ed11600ca93 100644 --- a/packages/opentelemetry/examples/native-exporter.ts +++ b/packages/opentelemetry/examples/native-exporter.ts @@ -4,7 +4,7 @@ import { Effect, Layer, Schedule } from "effect" import * as Logger from "effect/Logger" import * as LogLevel from "effect/LogLevel" -const Observability = Otlp.layer({ +const Observability = Otlp.layerJson({ baseUrl: "http://localhost:4318", resource: { serviceName: "my-service" diff --git a/packages/opentelemetry/src/Otlp.ts b/packages/opentelemetry/src/Otlp.ts index cd4a1455029..35a43b3dc75 100644 --- a/packages/opentelemetry/src/Otlp.ts +++ b/packages/opentelemetry/src/Otlp.ts @@ -5,6 +5,7 @@ import type * as Headers from "@effect/platform/Headers" import type * as HttpClient from "@effect/platform/HttpClient" import * as HttpClientRequest from "@effect/platform/HttpClientRequest" import type * as Duration from "effect/Duration" +import { flow } from "effect/Function" import * as Layer from "effect/Layer" import type * as Logger from "effect/Logger" import type * as Tracer from "effect/Tracer" @@ -14,31 +15,29 @@ import * as OtlpSerialization from "./OtlpSerialization.js" import * as OtlpTracer from "./OtlpTracer.js" /** - * Options for OTLP layer configuration. + * Creates an OTLP layer. * * @since 1.0.0 - * @category Models + * @category Layers */ -export interface OtlpLayerOptions { - readonly baseUrl: string - readonly resource?: { - readonly serviceName?: string | undefined - readonly serviceVersion?: string | undefined - readonly attributes?: Record +export const layer = ( + options: { + readonly baseUrl: string + readonly resource?: { + readonly serviceName?: string | undefined + readonly serviceVersion?: string | undefined + readonly attributes?: Record + } + readonly headers?: Headers.Input | undefined + readonly maxBatchSize?: number | undefined + readonly replaceLogger?: Logger.Logger | undefined + readonly tracerContext?: ((f: () => X, span: Tracer.AnySpan) => X) | undefined + readonly loggerExportInterval?: Duration.DurationInput | undefined + readonly loggerExcludeLogSpans?: boolean | undefined + readonly metricsExportInterval?: Duration.DurationInput | undefined + readonly tracerExportInterval?: Duration.DurationInput | undefined + readonly shutdownTimeout?: Duration.DurationInput | undefined } - readonly headers?: Headers.Input | undefined - readonly maxBatchSize?: number | undefined - readonly replaceLogger?: Logger.Logger | undefined - readonly tracerContext?: ((f: () => X, span: Tracer.AnySpan) => X) | undefined - readonly loggerExportInterval?: Duration.DurationInput | undefined - readonly loggerExcludeLogSpans?: boolean | undefined - readonly metricsExportInterval?: Duration.DurationInput | undefined - readonly tracerExportInterval?: Duration.DurationInput | undefined - readonly shutdownTimeout?: Duration.DurationInput | undefined -} - -const makeLayer = ( - options: OtlpLayerOptions ): Layer.Layer => { const baseReq = HttpClientRequest.get(options.baseUrl) const url = (path: string) => HttpClientRequest.appendUrl(baseReq, path).url @@ -75,48 +74,47 @@ const makeLayer = ( /** * Creates an OTLP layer with JSON serialization. * - * @example - * ```typescript - * import { Otlp } from "@effect/opentelemetry" - * - * const layer = Otlp.layerJson({ baseUrl: "http://localhost:4318" }) - * ``` - * * @since 1.0.0 * @category Layers */ -export const layerJson = (options: OtlpLayerOptions): Layer.Layer => - makeLayer(options).pipe(Layer.provide(OtlpSerialization.layerJson)) +export const layerJson: (options: { + readonly baseUrl: string + readonly resource?: { + readonly serviceName?: string | undefined + readonly serviceVersion?: string | undefined + readonly attributes?: Record + } + readonly headers?: Headers.Input | undefined + readonly maxBatchSize?: number | undefined + readonly replaceLogger?: Logger.Logger | undefined + readonly tracerContext?: ((f: () => X, span: Tracer.AnySpan) => X) | undefined + readonly loggerExportInterval?: Duration.DurationInput | undefined + readonly loggerExcludeLogSpans?: boolean | undefined + readonly metricsExportInterval?: Duration.DurationInput | undefined + readonly tracerExportInterval?: Duration.DurationInput | undefined + readonly shutdownTimeout?: Duration.DurationInput | undefined +}) => Layer.Layer = flow(layer, Layer.provide(OtlpSerialization.layerJson)) /** * Creates an OTLP layer with Protobuf serialization. * - * @example - * ```typescript - * import { Otlp } from "@effect/opentelemetry" - * - * const layer = Otlp.layerProtobuf({ baseUrl: "http://localhost:4318" }) - * ``` - * - * @since 1.0.0 - * @category Layers - */ -export const layerProtobuf = (options: OtlpLayerOptions): Layer.Layer => - makeLayer(options).pipe(Layer.provide(OtlpSerialization.layerProtobuf)) - -/** - * Creates an OTLP layer with JSON serialization (default). - * - * This is an alias for `layerJson` for backwards compatibility. - * - * @example - * ```typescript - * import { Otlp } from "@effect/opentelemetry" - * - * const layer = Otlp.layer({ baseUrl: "http://localhost:4318" }) - * ``` - * * @since 1.0.0 * @category Layers */ -export const layer = layerJson +export const layerProtobuf: (options: { + readonly baseUrl: string + readonly resource?: { + readonly serviceName?: string | undefined + readonly serviceVersion?: string | undefined + readonly attributes?: Record + } + readonly headers?: Headers.Input | undefined + readonly maxBatchSize?: number | undefined + readonly replaceLogger?: Logger.Logger | undefined + readonly tracerContext?: ((f: () => X, span: Tracer.AnySpan) => X) | undefined + readonly loggerExportInterval?: Duration.DurationInput | undefined + readonly loggerExcludeLogSpans?: boolean | undefined + readonly metricsExportInterval?: Duration.DurationInput | undefined + readonly tracerExportInterval?: Duration.DurationInput | undefined + readonly shutdownTimeout?: Duration.DurationInput | undefined +}) => Layer.Layer = flow(layer, Layer.provide(OtlpSerialization.layerProtobuf)) diff --git a/packages/opentelemetry/src/OtlpLogger.ts b/packages/opentelemetry/src/OtlpLogger.ts index 3655df6b908..651cef64ce4 100644 --- a/packages/opentelemetry/src/OtlpLogger.ts +++ b/packages/opentelemetry/src/OtlpLogger.ts @@ -57,16 +57,18 @@ export const make: ( headers: options.headers, maxBatchSize: options.maxBatchSize ?? 1000, exportInterval: options.exportInterval ?? Duration.seconds(1), - encode: serialization.logs, - body: (data): IExportLogsServiceRequest => ({ - resourceLogs: [{ - resource: otelResource, - scopeLogs: [{ - scope, - logRecords: data + body(data) { + const body: IExportLogsServiceRequest = { + resourceLogs: [{ + resource: otelResource, + scopeLogs: [{ + scope, + logRecords: data + }] }] - }] - }), + } + return serialization.logs(body) + }, shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3) }) diff --git a/packages/opentelemetry/src/OtlpMetrics.ts b/packages/opentelemetry/src/OtlpMetrics.ts index f2d251ed64d..54a221a0a11 100644 --- a/packages/opentelemetry/src/OtlpMetrics.ts +++ b/packages/opentelemetry/src/OtlpMetrics.ts @@ -45,7 +45,7 @@ export const make: (options: { } const serialization = yield* OtlpSerialization - const snapshot = (): IExportMetricsServiceRequest => { + const snapshot = () => { const snapshot = Metric.unsafeSnapshot() const nowNanos = clock.unsafeCurrentTimeNanos() const nowTime = String(nowNanos) @@ -256,7 +256,7 @@ export const make: (options: { } } - return { + const body: IExportMetricsServiceRequest = { resourceMetrics: [{ resource, scopeMetrics: [{ @@ -265,6 +265,8 @@ export const make: (options: { }] }] } + + return serialization.metrics(body) } yield* Exporter.make({ @@ -273,7 +275,6 @@ export const make: (options: { headers: options.headers, maxBatchSize: "disabled", exportInterval: options.exportInterval ?? Duration.seconds(10), - encode: serialization.metrics, body: snapshot, shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3) }) diff --git a/packages/opentelemetry/src/OtlpSerialization.ts b/packages/opentelemetry/src/OtlpSerialization.ts index dca2fcb16fd..add3dd616a6 100644 --- a/packages/opentelemetry/src/OtlpSerialization.ts +++ b/packages/opentelemetry/src/OtlpSerialization.ts @@ -2,19 +2,7 @@ * OtlpSerialization service for tree-shakable protobuf support. * * This module provides the `OtlpSerialization` service that abstracts the - * encoding of OTLP telemetry data to HttpBody. By default, the JSON serializer is used. - * To use protobuf encoding, use `Otlp.layerProtobuf` or provide the `layerProtobuf` layer. - * - * @example - * ```typescript - * import { Otlp } from "@effect/opentelemetry" - * - * // JSON encoding (default) - * const jsonLayer = Otlp.layerJson({ baseUrl: "http://localhost:4318" }) - * - * // Protobuf encoding - * const protobufLayer = Otlp.layerProtobuf({ baseUrl: "http://localhost:4318" }) - * ``` + * encoding of OTLP telemetry data to HttpBody. * * @since 1.0.0 */ @@ -23,27 +11,6 @@ import * as Context from "effect/Context" import * as Layer from "effect/Layer" import * as OtlpProtobuf from "./internal/otlpProtobuf.js" -/** - * Service interface for serializing OTLP telemetry data to HttpBody. - * - * @since 1.0.0 - * @category Models - */ -export interface Service { - /** - * Encodes trace data for transmission. - */ - readonly traces: (data: unknown) => HttpBody.HttpBody - /** - * Encodes metrics data for transmission. - */ - readonly metrics: (data: unknown) => HttpBody.HttpBody - /** - * Encodes logs data for transmission. - */ - readonly logs: (data: unknown) => HttpBody.HttpBody -} - /** * Tag for the OtlpSerialization service. * @@ -52,7 +19,20 @@ export interface Service { */ export class OtlpSerialization extends Context.Tag("@effect/opentelemetry/OtlpSerialization")< OtlpSerialization, - Service + { + /** + * Encodes trace data for transmission. + */ + readonly traces: (data: unknown) => HttpBody.HttpBody + /** + * Encodes metrics data for transmission. + */ + readonly metrics: (data: unknown) => HttpBody.HttpBody + /** + * Encodes logs data for transmission. + */ + readonly logs: (data: unknown) => HttpBody.HttpBody + } >() {} /** diff --git a/packages/opentelemetry/src/OtlpTracer.ts b/packages/opentelemetry/src/OtlpTracer.ts index efcf1ce7d39..1a6130d1227 100644 --- a/packages/opentelemetry/src/OtlpTracer.ts +++ b/packages/opentelemetry/src/OtlpTracer.ts @@ -58,7 +58,6 @@ export const make: ( headers: options.headers, exportInterval: options.exportInterval ?? Duration.seconds(5), maxBatchSize: options.maxBatchSize ?? 1000, - encode: serialization.traces, body(spans) { const data: TraceData = { resourceSpans: [{ @@ -69,7 +68,7 @@ export const make: ( }] }] } - return data + return serialization.traces(data) }, shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3) }) diff --git a/packages/opentelemetry/src/internal/otlpExporter.ts b/packages/opentelemetry/src/internal/otlpExporter.ts index 53bae407d1c..1e54798713b 100644 --- a/packages/opentelemetry/src/internal/otlpExporter.ts +++ b/packages/opentelemetry/src/internal/otlpExporter.ts @@ -37,8 +37,7 @@ export const make: ( readonly label: string readonly exportInterval: Duration.DurationInput readonly maxBatchSize: number | "disabled" - readonly body: (data: Array) => unknown - readonly encode: (data: unknown) => HttpBody.HttpBody + readonly body: (data: Array) => HttpBody.HttpBody readonly shutdownTimeout: Duration.DurationInput } ) => Effect.Effect< @@ -77,8 +76,7 @@ export const make: ( } buffer = [] } - const data = options.body(items) - const body = options.encode(data) + const body = options.body(items) const requestWithBody = HttpClientRequest.setBody(request, body) return client.execute(requestWithBody).pipe( diff --git a/packages/opentelemetry/test/Protobuf.test.ts b/packages/opentelemetry/test/Protobuf.test.ts index 6e10af8f5de..154bc7e9028 100644 --- a/packages/opentelemetry/test/Protobuf.test.ts +++ b/packages/opentelemetry/test/Protobuf.test.ts @@ -1,7 +1,7 @@ import { describe, expect, it } from "@effect/vitest" import * as Effect from "effect/Effect" -import * as Proto from "../src/internal/protobuf.js" import * as OtlpProtobuf from "../src/internal/otlpProtobuf.js" +import * as Proto from "../src/internal/protobuf.js" import * as OtlpSerialization from "../src/OtlpSerialization.js" describe("Protobuf encoding", () => { From a86f06a189cb175cc2673d95713cf25eaf6420f3 Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Wed, 21 Jan 2026 15:40:22 +1300 Subject: [PATCH 06/10] simplify changeset --- .changeset/add-otlp-protobuf-support.md | 37 ++++++++++--------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/.changeset/add-otlp-protobuf-support.md b/.changeset/add-otlp-protobuf-support.md index c2383c935f1..6f3fe54fa00 100644 --- a/.changeset/add-otlp-protobuf-support.md +++ b/.changeset/add-otlp-protobuf-support.md @@ -2,13 +2,20 @@ "@effect/opentelemetry": minor --- -Add tree-shakable protobuf protocol support for OTLP exporters with simplified API. +Add protobuf protocol support for OTLP exporters -This introduces an `OtlpSerialization` service and simplified layer functions for choosing between JSON and Protocol Buffers encoding when exporting telemetry data. +This introduces an `OtlpSerialization` service for choosing between JSON and Protobuf encoding. + +**Breaking changes:** + +- `Otlp.layer` now requires an `OtlpSerialization` layer to be provided for + the desired encoding format. + +**JSON encoding:** -**JSON encoding (default):** ```typescript -import { Otlp } from "@effect/opentelemetry" +import { Layer } from "effect" +import { Otlp, OtlpSerialization } from "@effect/opentelemetry" // Option 1: Explicit JSON layer const layer = Otlp.layerJson({ @@ -16,14 +23,15 @@ const layer = Otlp.layerJson({ resource: { serviceName: "my-service" } }) -// Option 2: Use `layer` alias (backwards compatible) +// Option 2: Use `layer` and provide OtlpSerialization JSON layer const layer = Otlp.layer({ baseUrl: "http://localhost:4318", resource: { serviceName: "my-service" } -}) +}).pipe(Layer.provide(OtlpSerialization.layerJson)) ``` **Protobuf encoding:** + ```typescript import { Otlp } from "@effect/opentelemetry" @@ -33,20 +41,3 @@ const layer = Otlp.layerProtobuf({ resource: { serviceName: "my-service" } }) ``` - -**New exports:** -- `Otlp.layerJson` - OTLP layer with JSON serialization -- `Otlp.layerProtobuf` - OTLP layer with Protobuf serialization -- `Otlp.layer` - Alias for `layerJson` (backwards compatible) -- `OtlpSerialization` - Service definition and layers (`layerJson`, `layerProtobuf`) - -**Breaking changes:** -- Removed `Otlp.layerWithSerializer` - use `Otlp.layerJson` or `Otlp.layerProtobuf` instead -- Removed `OtlpSerializer` module - use `OtlpSerialization` instead -- Removed `OtlpSerializerProtobuf` module - use `Otlp.layerProtobuf` or `OtlpSerialization.layerProtobuf` instead - -**Features:** -- No new dependencies - protobuf encoding implemented from scratch -- Tree-shakable - protobuf code not in bundle unless `layerProtobuf` is used -- Sets appropriate Content-Type header (`application/x-protobuf` vs `application/json`) -- Follows opentelemetry-proto specifications From f9b0402dfd96584cbb82a3017dacf8742232b34e Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Wed, 21 Jan 2026 15:41:12 +1300 Subject: [PATCH 07/10] wip --- packages/opentelemetry/src/OtlpSerialization.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/opentelemetry/src/OtlpSerialization.ts b/packages/opentelemetry/src/OtlpSerialization.ts index add3dd616a6..2174fab69de 100644 --- a/packages/opentelemetry/src/OtlpSerialization.ts +++ b/packages/opentelemetry/src/OtlpSerialization.ts @@ -12,8 +12,6 @@ import * as Layer from "effect/Layer" import * as OtlpProtobuf from "./internal/otlpProtobuf.js" /** - * Tag for the OtlpSerialization service. - * * @since 1.0.0 * @category Tags */ @@ -38,8 +36,7 @@ export class OtlpSerialization extends Context.Tag("@effect/opentelemetry/OtlpSe /** * JSON serializer layer for OTLP telemetry data. * - * This is the default serializer used by OTLP exporters. It encodes - * telemetry data as JSON with `application/json` content type. + * It encodes telemetry data as JSON with `application/json` content type. * * @since 1.0.0 * @category Layers From 354d7df94583e341307cc717790c8251edd55e10 Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Wed, 21 Jan 2026 15:42:24 +1300 Subject: [PATCH 08/10] wip --- packages/opentelemetry/src/Otlp.ts | 34 ++++++++++++++---------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/packages/opentelemetry/src/Otlp.ts b/packages/opentelemetry/src/Otlp.ts index 35a43b3dc75..72d6c356253 100644 --- a/packages/opentelemetry/src/Otlp.ts +++ b/packages/opentelemetry/src/Otlp.ts @@ -20,25 +20,23 @@ import * as OtlpTracer from "./OtlpTracer.js" * @since 1.0.0 * @category Layers */ -export const layer = ( - options: { - readonly baseUrl: string - readonly resource?: { - readonly serviceName?: string | undefined - readonly serviceVersion?: string | undefined - readonly attributes?: Record - } - readonly headers?: Headers.Input | undefined - readonly maxBatchSize?: number | undefined - readonly replaceLogger?: Logger.Logger | undefined - readonly tracerContext?: ((f: () => X, span: Tracer.AnySpan) => X) | undefined - readonly loggerExportInterval?: Duration.DurationInput | undefined - readonly loggerExcludeLogSpans?: boolean | undefined - readonly metricsExportInterval?: Duration.DurationInput | undefined - readonly tracerExportInterval?: Duration.DurationInput | undefined - readonly shutdownTimeout?: Duration.DurationInput | undefined +export const layer = (options: { + readonly baseUrl: string + readonly resource?: { + readonly serviceName?: string | undefined + readonly serviceVersion?: string | undefined + readonly attributes?: Record } -): Layer.Layer => { + readonly headers?: Headers.Input | undefined + readonly maxBatchSize?: number | undefined + readonly replaceLogger?: Logger.Logger | undefined + readonly tracerContext?: ((f: () => X, span: Tracer.AnySpan) => X) | undefined + readonly loggerExportInterval?: Duration.DurationInput | undefined + readonly loggerExcludeLogSpans?: boolean | undefined + readonly metricsExportInterval?: Duration.DurationInput | undefined + readonly tracerExportInterval?: Duration.DurationInput | undefined + readonly shutdownTimeout?: Duration.DurationInput | undefined +}): Layer.Layer => { const baseReq = HttpClientRequest.get(options.baseUrl) const url = (path: string) => HttpClientRequest.appendUrl(baseReq, path).url return Layer.mergeAll( From c973e33a191d4549ec142e2d8850423c615fea20 Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Wed, 21 Jan 2026 15:46:54 +1300 Subject: [PATCH 09/10] cleanup tests --- .../test/OtlpSerialization.test.ts | 22 ++++++++ .../test/SerializerOverride.test.ts | 50 ------------------- 2 files changed, 22 insertions(+), 50 deletions(-) create mode 100644 packages/opentelemetry/test/OtlpSerialization.test.ts delete mode 100644 packages/opentelemetry/test/SerializerOverride.test.ts diff --git a/packages/opentelemetry/test/OtlpSerialization.test.ts b/packages/opentelemetry/test/OtlpSerialization.test.ts new file mode 100644 index 00000000000..92964915c90 --- /dev/null +++ b/packages/opentelemetry/test/OtlpSerialization.test.ts @@ -0,0 +1,22 @@ +import { assert, describe, expect, it } from "@effect/vitest" +import * as Effect from "effect/Effect" +import * as OtlpSerialization from "../src/OtlpSerialization.js" + +describe("OtlpSerialization override behavior", () => { + it.effect("json roundtrip", () => + Effect.gen(function*() { + const serialization = yield* OtlpSerialization.OtlpSerialization + const body = serialization.traces({ test: "data" }) + assert(body._tag === "Uint8Array") + expect(body.contentType).toBe("application/json") + const result = JSON.parse(new TextDecoder().decode(body.body)) + expect(result).toEqual({ test: "data" }) + }).pipe(Effect.provide(OtlpSerialization.layerJson))) + + it.effect("protobuf layer provides protobuf HttpBody", () => + Effect.gen(function*() { + const serialization = yield* OtlpSerialization.OtlpSerialization + const body = serialization.traces({ resourceSpans: [] }) + expect(body.contentType).toBe("application/x-protobuf") + }).pipe(Effect.provide(OtlpSerialization.layerProtobuf))) +}) diff --git a/packages/opentelemetry/test/SerializerOverride.test.ts b/packages/opentelemetry/test/SerializerOverride.test.ts deleted file mode 100644 index 76749846f4f..00000000000 --- a/packages/opentelemetry/test/SerializerOverride.test.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { describe, expect, it } from "@effect/vitest" -import * as Effect from "effect/Effect" -import * as Layer from "effect/Layer" -import * as Scope from "effect/Scope" -import * as OtlpSerialization from "../src/OtlpSerialization.js" - -describe("OtlpSerialization override behavior", () => { - it.effect("json layer provides json HttpBody", () => - Effect.gen(function*() { - const serialization = yield* OtlpSerialization.OtlpSerialization - const body = serialization.traces({ test: "data" }) - expect(body.contentType).toBe("application/json") - }).pipe(Effect.provide(OtlpSerialization.layerJson))) - - it.effect("protobuf layer provides protobuf HttpBody", () => - Effect.gen(function*() { - const serialization = yield* OtlpSerialization.OtlpSerialization - const body = serialization.traces({ resourceSpans: [] }) - expect(body.contentType).toBe("application/x-protobuf") - }).pipe(Effect.provide(OtlpSerialization.layerProtobuf))) - - it.effect("custom layer can be provided", () => - Effect.gen(function*() { - // This simulates: makeLayer(...).pipe(Layer.provide(customSerialization)) - // The makeLayer does NOT provide json internally, so custom can be provided - - // Layer that requires OtlpSerialization (like makeLayer) - const innerLayer = Layer.scopedDiscard( - Effect.gen(function*() { - const serialization = yield* OtlpSerialization.OtlpSerialization - // Protobuf should be used since we provide it externally - const body = serialization.traces({ resourceSpans: [] }) - expect(body.contentType).toBe("application/x-protobuf") - }) - ) - - // Provide protobuf externally - const finalLayer = innerLayer.pipe(Layer.provide(OtlpSerialization.layerProtobuf)) - - yield* Layer.build(finalLayer).pipe(Scope.extend(yield* Effect.scope)) - }).pipe(Effect.scoped)) - - it.effect("json layer produces valid HttpBody", () => - Effect.gen(function*() { - const serialization = yield* OtlpSerialization.OtlpSerialization - const body = serialization.traces({ test: "data" }) - expect(body._tag).toBe("Uint8Array") - expect(body.contentType).toBe("application/json") - }).pipe(Effect.provide(OtlpSerialization.layerJson))) -}) From bf369fa793335f5e5e736d17ca6f2cbea1f0fa94 Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Wed, 21 Jan 2026 15:59:43 +1300 Subject: [PATCH 10/10] codegen --- packages/opentelemetry/src/index.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/opentelemetry/src/index.ts b/packages/opentelemetry/src/index.ts index 89c7091eda6..2e5985a4de8 100644 --- a/packages/opentelemetry/src/index.ts +++ b/packages/opentelemetry/src/index.ts @@ -34,6 +34,11 @@ export * as OtlpMetrics from "./OtlpMetrics.js" export * as OtlpResource from "./OtlpResource.js" /** + * OtlpSerialization service for tree-shakable protobuf support. + * + * This module provides the `OtlpSerialization` service that abstracts the + * encoding of OTLP telemetry data to HttpBody. + * * @since 1.0.0 */ export * as OtlpSerialization from "./OtlpSerialization.js"