Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .changeset/add-otlp-protobuf-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
"@effect/opentelemetry": minor
---

Add protobuf protocol support for OTLP exporters

This introduces an `OtlpSerialization` service for choosing between JSON and Protobuf encoding.

**Breaking changes:**

- `Otlp.layer` now requires an `OtlpSerialization` layer to be provided for
the desired encoding format.

**JSON encoding:**

```typescript
import { Layer } from "effect"
import { Otlp, OtlpSerialization } from "@effect/opentelemetry"

// Option 1: Explicit JSON layer
const layer = Otlp.layerJson({
baseUrl: "http://localhost:4318",
resource: { serviceName: "my-service" }
})

// Option 2: Use `layer` and provide OtlpSerialization JSON layer
const layer = Otlp.layer({
baseUrl: "http://localhost:4318",
resource: { serviceName: "my-service" }
}).pipe(Layer.provide(OtlpSerialization.layerJson))
```

**Protobuf encoding:**

```typescript
import { Otlp } from "@effect/opentelemetry"

// Simply use layerProtobuf for protobuf encoding
const layer = Otlp.layerProtobuf({
baseUrl: "http://localhost:4318",
resource: { serviceName: "my-service" }
})
```
2 changes: 1 addition & 1 deletion packages/opentelemetry/examples/native-exporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Effect, Layer, Schedule } from "effect"
import * as Logger from "effect/Logger"
import * as LogLevel from "effect/LogLevel"

const Observability = Otlp.layer({
const Observability = Otlp.layerJson({
baseUrl: "http://localhost:4318",
resource: {
serviceName: "my-service"
Expand Down
54 changes: 53 additions & 1 deletion packages/opentelemetry/src/Otlp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import type * as Headers from "@effect/platform/Headers"
import type * as HttpClient from "@effect/platform/HttpClient"
import * as HttpClientRequest from "@effect/platform/HttpClientRequest"
import type * as Duration from "effect/Duration"
import { flow } from "effect/Function"
import * as Layer from "effect/Layer"
import type * as Logger from "effect/Logger"
import type * as Tracer from "effect/Tracer"
import * as OtlpLogger from "./OtlpLogger.js"
import * as OtlpMetrics from "./OtlpMetrics.js"
import * as OtlpSerialization from "./OtlpSerialization.js"
import * as OtlpTracer from "./OtlpTracer.js"

/**
* Creates an OTLP layer.
*
* @since 1.0.0
* @category Layers
*/
Expand All @@ -32,7 +36,7 @@ export const layer = (options: {
readonly metricsExportInterval?: Duration.DurationInput | undefined
readonly tracerExportInterval?: Duration.DurationInput | undefined
readonly shutdownTimeout?: Duration.DurationInput | undefined
}): Layer.Layer<never, never, HttpClient.HttpClient> => {
}): Layer.Layer<never, never, HttpClient.HttpClient | OtlpSerialization.OtlpSerialization> => {
const baseReq = HttpClientRequest.get(options.baseUrl)
const url = (path: string) => HttpClientRequest.appendUrl(baseReq, path).url
return Layer.mergeAll(
Expand Down Expand Up @@ -64,3 +68,51 @@ export const layer = (options: {
})
)
}

/**
* Creates an OTLP layer with JSON serialization.
*
* @since 1.0.0
* @category Layers
*/
export const layerJson: (options: {
readonly baseUrl: string
readonly resource?: {
readonly serviceName?: string | undefined
readonly serviceVersion?: string | undefined
readonly attributes?: Record<string, unknown>
}
readonly headers?: Headers.Input | undefined
readonly maxBatchSize?: number | undefined
readonly replaceLogger?: Logger.Logger<any, any> | undefined
readonly tracerContext?: (<X>(f: () => X, span: Tracer.AnySpan) => X) | undefined
readonly loggerExportInterval?: Duration.DurationInput | undefined
readonly loggerExcludeLogSpans?: boolean | undefined
readonly metricsExportInterval?: Duration.DurationInput | undefined
readonly tracerExportInterval?: Duration.DurationInput | undefined
readonly shutdownTimeout?: Duration.DurationInput | undefined
}) => Layer.Layer<never, never, HttpClient.HttpClient> = flow(layer, Layer.provide(OtlpSerialization.layerJson))

/**
* Creates an OTLP layer with Protobuf serialization.
*
* @since 1.0.0
* @category Layers
*/
export const layerProtobuf: (options: {
readonly baseUrl: string
readonly resource?: {
readonly serviceName?: string | undefined
readonly serviceVersion?: string | undefined
readonly attributes?: Record<string, unknown>
}
readonly headers?: Headers.Input | undefined
readonly maxBatchSize?: number | undefined
readonly replaceLogger?: Logger.Logger<any, any> | undefined
readonly tracerContext?: (<X>(f: () => X, span: Tracer.AnySpan) => X) | undefined
readonly loggerExportInterval?: Duration.DurationInput | undefined
readonly loggerExcludeLogSpans?: boolean | undefined
readonly metricsExportInterval?: Duration.DurationInput | undefined
readonly tracerExportInterval?: Duration.DurationInput | undefined
readonly shutdownTimeout?: Duration.DurationInput | undefined
}) => Layer.Layer<never, never, HttpClient.HttpClient> = flow(layer, Layer.provide(OtlpSerialization.layerProtobuf))
25 changes: 15 additions & 10 deletions packages/opentelemetry/src/OtlpLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import * as Tracer from "effect/Tracer"
import * as Exporter from "./internal/otlpExporter.js"
import type { AnyValue, Fixed64, KeyValue, Resource } from "./OtlpResource.js"
import * as OtlpResource from "./OtlpResource.js"
import { OtlpSerialization } from "./OtlpSerialization.js"

/**
* @since 1.0.0
Expand All @@ -42,28 +43,32 @@ export const make: (
) => Effect.Effect<
Logger.Logger<unknown, void>,
never,
HttpClient.HttpClient | Scope.Scope
HttpClient.HttpClient | OtlpSerialization | Scope.Scope
> = Effect.fnUntraced(function*(options) {
const otelResource = yield* OtlpResource.fromConfig(options.resource)
const scope: IInstrumentationScope = {
name: OtlpResource.unsafeServiceName(otelResource)
}
const serialization = yield* OtlpSerialization

const exporter = yield* Exporter.make({
label: "OtlpLogger",
url: options.url,
headers: options.headers,
maxBatchSize: options.maxBatchSize ?? 1000,
exportInterval: options.exportInterval ?? Duration.seconds(1),
body: (data): IExportLogsServiceRequest => ({
resourceLogs: [{
resource: otelResource,
scopeLogs: [{
scope,
logRecords: data
body(data) {
const body: IExportLogsServiceRequest = {
resourceLogs: [{
resource: otelResource,
scopeLogs: [{
scope,
logRecords: data
}]
}]
}]
}),
}
return serialization.logs(body)
},
shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3)
})

Expand Down Expand Up @@ -92,7 +97,7 @@ export const layer = (options: {
readonly maxBatchSize?: number | undefined
readonly shutdownTimeout?: Duration.DurationInput | undefined
readonly excludeLogSpans?: boolean | undefined
}): Layer.Layer<never, never, HttpClient.HttpClient> =>
}): Layer.Layer<never, never, HttpClient.HttpClient | OtlpSerialization> =>
options.replaceLogger ? Logger.replaceScoped(options.replaceLogger, make(options)) : Logger.addScoped(make(options))

// internal
Expand Down
12 changes: 8 additions & 4 deletions packages/opentelemetry/src/OtlpMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type * as Scope from "effect/Scope"
import * as Exporter from "./internal/otlpExporter.js"
import type { Fixed64, KeyValue } from "./OtlpResource.js"
import * as OtlpResource from "./OtlpResource.js"
import { OtlpSerialization } from "./OtlpSerialization.js"

/**
* @since 1.0.0
Expand All @@ -33,7 +34,7 @@ export const make: (options: {
}) => Effect.Effect<
void,
never,
HttpClient.HttpClient | Scope.Scope
HttpClient.HttpClient | OtlpSerialization | Scope.Scope
> = Effect.fnUntraced(function*(options) {
const clock = yield* Effect.clock
const startTime = String(clock.unsafeCurrentTimeNanos())
Expand All @@ -42,8 +43,9 @@ export const make: (options: {
const metricsScope: IInstrumentationScope = {
name: OtlpResource.unsafeServiceName(resource)
}
const serialization = yield* OtlpSerialization

const snapshot = (): IExportMetricsServiceRequest => {
const snapshot = () => {
const snapshot = Metric.unsafeSnapshot()
const nowNanos = clock.unsafeCurrentTimeNanos()
const nowTime = String(nowNanos)
Expand Down Expand Up @@ -254,7 +256,7 @@ export const make: (options: {
}
}

return {
const body: IExportMetricsServiceRequest = {
resourceMetrics: [{
resource,
scopeMetrics: [{
Expand All @@ -263,6 +265,8 @@ export const make: (options: {
}]
}]
}

return serialization.metrics(body)
}

yield* Exporter.make({
Expand Down Expand Up @@ -290,7 +294,7 @@ export const layer = (options: {
readonly headers?: Headers.Input | undefined
readonly exportInterval?: Duration.DurationInput | undefined
readonly shutdownTimeout?: Duration.DurationInput | undefined
}): Layer.Layer<never, never, HttpClient.HttpClient> => Layer.scopedDiscard(make(options))
}): Layer.Layer<never, never, HttpClient.HttpClient | OtlpSerialization> => Layer.scopedDiscard(make(options))

// internal

Expand Down
64 changes: 64 additions & 0 deletions packages/opentelemetry/src/OtlpSerialization.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* OtlpSerialization service for tree-shakable protobuf support.
*
* This module provides the `OtlpSerialization` service that abstracts the
* encoding of OTLP telemetry data to HttpBody.
*
* @since 1.0.0
*/
import * as HttpBody from "@effect/platform/HttpBody"
import * as Context from "effect/Context"
import * as Layer from "effect/Layer"
import * as OtlpProtobuf from "./internal/otlpProtobuf.js"

/**
* @since 1.0.0
* @category Tags
*/
export class OtlpSerialization extends Context.Tag("@effect/opentelemetry/OtlpSerialization")<
OtlpSerialization,
{
/**
* Encodes trace data for transmission.
*/
readonly traces: (data: unknown) => HttpBody.HttpBody
/**
* Encodes metrics data for transmission.
*/
readonly metrics: (data: unknown) => HttpBody.HttpBody
/**
* Encodes logs data for transmission.
*/
readonly logs: (data: unknown) => HttpBody.HttpBody
}
>() {}

/**
* JSON serializer layer for OTLP telemetry data.
*
* It encodes telemetry data as JSON with `application/json` content type.
*
* @since 1.0.0
* @category Layers
*/
export const layerJson: Layer.Layer<OtlpSerialization> = Layer.succeed(OtlpSerialization, {
traces: (data) => HttpBody.unsafeJson(data),
metrics: (data) => HttpBody.unsafeJson(data),
logs: (data) => HttpBody.unsafeJson(data)
})

/**
* Protobuf serializer layer for OTLP telemetry data.
*
* This serializer encodes telemetry data using Protocol Buffers binary
* format with `application/x-protobuf` content type. It provides more
* efficient wire format compared to JSON.
*
* @since 1.0.0
* @category Layers
*/
export const layerProtobuf: Layer.Layer<OtlpSerialization> = Layer.succeed(OtlpSerialization, {
traces: (data) => HttpBody.uint8Array(OtlpProtobuf.encodeTracesData(data as any), "application/x-protobuf"),
metrics: (data) => HttpBody.uint8Array(OtlpProtobuf.encodeMetricsData(data as any), "application/x-protobuf"),
logs: (data) => HttpBody.uint8Array(OtlpProtobuf.encodeLogsData(data as any), "application/x-protobuf")
})
9 changes: 6 additions & 3 deletions packages/opentelemetry/src/OtlpTracer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import * as Exporter from "./internal/otlpExporter.js"
import type { KeyValue, Resource } from "./OtlpResource.js"
import { entriesToAttributes } from "./OtlpResource.js"
import * as OtlpResource from "./OtlpResource.js"
import { OtlpSerialization } from "./OtlpSerialization.js"

const ATTR_EXCEPTION_TYPE = "exception.type"
const ATTR_EXCEPTION_MESSAGE = "exception.message"
Expand All @@ -43,12 +44,13 @@ export const make: (
) => Effect.Effect<
Tracer.Tracer,
never,
HttpClient.HttpClient | Scope.Scope
HttpClient.HttpClient | OtlpSerialization | Scope.Scope
> = Effect.fnUntraced(function*(options) {
const otelResource = yield* OtlpResource.fromConfig(options.resource)
const scope: Scope = {
name: OtlpResource.unsafeServiceName(otelResource)
}
const serialization = yield* OtlpSerialization

const exporter = yield* Exporter.make({
label: "OtlpTracer",
Expand All @@ -66,7 +68,7 @@ export const make: (
}]
}]
}
return data
return serialization.traces(data)
},
shutdownTimeout: options.shutdownTimeout ?? Duration.seconds(3)
})
Expand Down Expand Up @@ -119,7 +121,8 @@ export const layer = (options: {
readonly maxBatchSize?: number | undefined
readonly context?: (<X>(f: () => X, span: Tracer.AnySpan) => X) | undefined
readonly shutdownTimeout?: Duration.DurationInput | undefined
}): Layer.Layer<never, never, HttpClient.HttpClient> => Layer.unwrapScoped(Effect.map(make(options), Layer.setTracer))
}): Layer.Layer<never, never, HttpClient.HttpClient | OtlpSerialization> =>
Layer.unwrapScoped(Effect.map(make(options), Layer.setTracer))

// internal

Expand Down
10 changes: 10 additions & 0 deletions packages/opentelemetry/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ export * as OtlpMetrics from "./OtlpMetrics.js"
*/
export * as OtlpResource from "./OtlpResource.js"

/**
* OtlpSerialization service for tree-shakable protobuf support.
*
* This module provides the `OtlpSerialization` service that abstracts the
* encoding of OTLP telemetry data to HttpBody.
*
* @since 1.0.0
*/
export * as OtlpSerialization from "./OtlpSerialization.js"

/**
* @since 1.0.0
*/
Expand Down
10 changes: 6 additions & 4 deletions packages/opentelemetry/src/internal/otlpExporter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as Headers from "@effect/platform/Headers"
import type * as HttpBody from "@effect/platform/HttpBody"
import * as HttpClient from "@effect/platform/HttpClient"
import * as HttpClientError from "@effect/platform/HttpClientError"
import * as HttpClientRequest from "@effect/platform/HttpClientRequest"
Expand Down Expand Up @@ -36,7 +37,7 @@ export const make: (
readonly label: string
readonly exportInterval: Duration.DurationInput
readonly maxBatchSize: number | "disabled"
readonly body: (data: Array<any>) => unknown
readonly body: (data: Array<any>) => HttpBody.HttpBody
readonly shutdownTimeout: Duration.DurationInput
}
) => Effect.Effect<
Expand Down Expand Up @@ -75,9 +76,10 @@ export const make: (
}
buffer = []
}
return client.execute(
HttpClientRequest.bodyUnsafeJson(request, options.body(items))
).pipe(
const body = options.body(items)
const requestWithBody = HttpClientRequest.setBody(request, body)

return client.execute(requestWithBody).pipe(
Effect.asVoid,
Effect.withTracerEnabled(false)
)
Expand Down
Loading