Skip to content

Commit e2374c2

Browse files
make RpcSerialization.msgPack options configurable (#6161)
Co-authored-by: Tim Smart <hello@timsmart.co>
1 parent c016642 commit e2374c2

4 files changed

Lines changed: 229 additions & 32 deletions

File tree

.changeset/long-buckets-love.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/rpc": patch
3+
---
4+
5+
add RpcSerialization.makeMsgPack
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import {
2+
ClusterSchema,
3+
Entity,
4+
MessageStorage,
5+
RunnerAddress,
6+
RunnerHealth,
7+
RunnerStorage,
8+
ShardingConfig,
9+
SocketRunner
10+
} from "@effect/cluster"
11+
import { NodeClusterSocket } from "@effect/platform-node"
12+
import { Rpc, RpcSerialization } from "@effect/rpc"
13+
import { describe, it } from "@effect/vitest"
14+
import { BigDecimal, Effect, Layer, Logger, LogLevel, Option, PrimaryKey, Schema } from "effect"
15+
16+
class TestPayload extends Schema.Class<TestPayload>("TestPayload")({
17+
id: Schema.String,
18+
amount: Schema.BigDecimal
19+
}) {
20+
[PrimaryKey.symbol]() {
21+
return this.id
22+
}
23+
}
24+
25+
const TestEntity = Entity
26+
.make("TestEntity", [
27+
Rpc.make("Process", {
28+
payload: TestPayload,
29+
success: Schema.Void
30+
})
31+
])
32+
.annotateRpcs(ClusterSchema.Persisted, true)
33+
.annotateRpcs(ClusterSchema.Uninterruptible, true)
34+
35+
const TestEntityLayer = TestEntity.toLayer(
36+
Effect.succeed({
37+
Process: () => Effect.void
38+
})
39+
)
40+
41+
const RUNNER_PORT = 50_123
42+
// Build shared storage instances once, so runner and client see the same state.
43+
// MessageStorage.layerMemory requires ShardingConfig, so we provide a minimal one.
44+
const SharedStorage = Layer.mergeAll(
45+
RunnerStorage.layerMemory,
46+
MessageStorage.layerMemory
47+
).pipe(
48+
Layer.provide(ShardingConfig.layerDefaults)
49+
)
50+
51+
const makeRunnerLayer = (port: number) =>
52+
TestEntityLayer.pipe(
53+
Layer.provideMerge(SocketRunner.layer),
54+
Layer.provide(RunnerHealth.layerNoop),
55+
Layer.provide(NodeClusterSocket.layerSocketServer),
56+
Layer.provide(NodeClusterSocket.layerClientProtocol),
57+
Layer.provide(ShardingConfig.layer({
58+
runnerAddress: Option.some(RunnerAddress.make("localhost", port)),
59+
entityTerminationTimeout: 0,
60+
entityMessagePollInterval: 5000,
61+
sendRetryInterval: 100
62+
})),
63+
Layer.provide(RpcSerialization.layerMsgPack)
64+
)
65+
66+
const makeClientLayer = (port: number) =>
67+
SocketRunner.layerClientOnly.pipe(
68+
Layer.provide(NodeClusterSocket.layerClientProtocol),
69+
Layer.provide(ShardingConfig.layer({
70+
runnerAddress: Option.some(RunnerAddress.make("localhost", port)),
71+
runnerListenAddress: Option.some(RunnerAddress.make("localhost", port)),
72+
entityTerminationTimeout: 0,
73+
entityMessagePollInterval: 5000,
74+
sendRetryInterval: 100
75+
})),
76+
Layer.provide(RpcSerialization.layerMsgPack)
77+
)
78+
79+
// BigDecimal.normalize creates a circular `normalized` self-reference.
80+
// When a persisted message is sent with discard: true, the notify path in Runners.makeRpc
81+
// passes the raw envelope (with circular BigDecimal payload) to the runner via msgpack,
82+
// causing RangeError: Maximum call stack size exceeded.
83+
describe("SocketRunner", () => {
84+
it.scopedLive(
85+
"entity call with BigDecimal and discard should not stack overflow",
86+
() =>
87+
Effect.gen(function*() {
88+
// Start the runner (with socket server and entity handler)
89+
yield* Layer.launch(makeRunnerLayer(RUNNER_PORT)).pipe(Effect.forkScoped)
90+
91+
// Give the runner time to start and acquire shards
92+
yield* Effect.sleep("2 seconds")
93+
yield* Effect.log("Before starting the client")
94+
95+
// Send a message from the client with discard: true.
96+
// The BigDecimal is normalized to trigger the circular `normalized` self-reference.
97+
yield* Effect.gen(function*() {
98+
yield* Effect.log("Starting the client")
99+
yield* Effect.sleep("2 seconds")
100+
const makeClient = yield* TestEntity.client
101+
// Give the client time to discover the runner
102+
yield* Effect.sleep("3 seconds")
103+
const client = makeClient("entity-1")
104+
105+
const amount = BigDecimal.unsafeFromString("123.45")
106+
107+
yield* client.Process(
108+
TestPayload.make({ id: "req-1", amount }),
109+
{ discard: true }
110+
)
111+
}).pipe(
112+
Effect.provide(makeClientLayer(RUNNER_PORT)),
113+
Effect.scoped
114+
)
115+
}).pipe(Effect.provide(
116+
SharedStorage.pipe(Layer.provideMerge(
117+
Logger.minimumLogLevel(LogLevel.None)
118+
))
119+
)),
120+
30_000
121+
)
122+
})

packages/rpc/src/RpcSerialization.ts

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -352,43 +352,52 @@ interface JsonRpcResponse {
352352
type JsonRpcMessage = JsonRpcRequest | JsonRpcResponse
353353

354354
/**
355+
* Create a MessagePack serialization with custom msgpackr options.
356+
*
355357
* @since 1.0.0
356358
* @category serialization
357359
*/
358-
export const msgPack: RpcSerialization["Type"] = RpcSerialization.of({
359-
contentType: "application/msgpack",
360-
includesFraming: true,
361-
unsafeMake: () => {
362-
const unpackr = new Msgpackr.Unpackr()
363-
const packr = new Msgpackr.Packr()
364-
const encoder = new TextEncoder()
365-
let incomplete: Uint8Array | undefined = undefined
366-
return {
367-
decode: (bytes) => {
368-
let buf = typeof bytes === "string" ? encoder.encode(bytes) : bytes
369-
if (incomplete !== undefined) {
370-
const prev = buf
371-
bytes = new Uint8Array(incomplete.length + buf.length)
372-
bytes.set(incomplete)
373-
bytes.set(prev, incomplete.length)
374-
buf = bytes
375-
incomplete = undefined
376-
}
377-
try {
378-
return unpackr.unpackMultiple(buf)
379-
} catch (error_) {
380-
const error = error_ as any
381-
if (error.incomplete) {
382-
incomplete = buf.subarray(error.lastPosition)
383-
return error.values ?? []
360+
export const makeMsgPack = (options?: Msgpackr.Options | undefined): RpcSerialization["Type"] =>
361+
RpcSerialization.of({
362+
contentType: "application/msgpack",
363+
includesFraming: true,
364+
unsafeMake() {
365+
const unpackr = new Msgpackr.Unpackr(options)
366+
const packr = new Msgpackr.Packr(options)
367+
const encoder = new TextEncoder()
368+
let incomplete: Uint8Array | undefined = undefined
369+
return {
370+
decode(bytes) {
371+
let buf = typeof bytes === "string" ? encoder.encode(bytes) : bytes
372+
if (incomplete !== undefined) {
373+
const prev = buf
374+
bytes = new Uint8Array(incomplete.length + buf.length)
375+
bytes.set(incomplete)
376+
bytes.set(prev, incomplete.length)
377+
buf = bytes
378+
incomplete = undefined
384379
}
385-
return []
386-
}
387-
},
388-
encode: (response) => packr.pack(response)
380+
try {
381+
return unpackr.unpackMultiple(buf)
382+
} catch (error_) {
383+
const error = error_ as any
384+
if (error.incomplete) {
385+
incomplete = buf.subarray(error.lastPosition)
386+
return error.values ?? []
387+
}
388+
throw error_
389+
}
390+
},
391+
encode: (response) => packr.pack(response)
392+
}
389393
}
390-
}
391-
})
394+
})
395+
396+
/**
397+
* @since 1.0.0
398+
* @category serialization
399+
*/
400+
export const msgPack: RpcSerialization["Type"] = makeMsgPack({ useRecords: true })
392401

393402
/**
394403
* A rpc serialization layer that uses JSON for serialization.
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { RpcSerialization } from "@effect/rpc"
2+
import { assert, describe, it } from "@effect/vitest"
3+
4+
describe("RpcSerialization", () => {
5+
describe("msgPack", () => {
6+
it("encode and decode correctly", () => {
7+
const parser = RpcSerialization.msgPack.unsafeMake()
8+
const payload = { _tag: "Request", id: 1, method: "echo" }
9+
const encoded = parser.encode(payload)
10+
const decoded = parser.decode(encoded as Uint8Array)
11+
assert.strictEqual(decoded.length, 1)
12+
assert.deepStrictEqual(decoded[0], payload)
13+
})
14+
15+
it("handles incomplete frames gracefully", () => {
16+
const parser = RpcSerialization.msgPack.unsafeMake()
17+
const helper = RpcSerialization.msgPack.unsafeMake()
18+
19+
const msg1 = helper.encode({ a: 1 }) as Uint8Array
20+
const msg2 = helper.encode({ b: 2 }) as Uint8Array
21+
const combined = new Uint8Array(msg1.length + msg2.length)
22+
combined.set(msg1)
23+
combined.set(msg2, msg1.length)
24+
25+
const truncated = combined.subarray(0, msg1.length + 2)
26+
const decoded = parser.decode(truncated)
27+
28+
assert.strictEqual(decoded.length, 1)
29+
assert.deepStrictEqual(decoded[0], { a: 1 })
30+
})
31+
})
32+
33+
describe("makeMsgPack", () => {
34+
it("useRecords false encode and decode correctly", () => {
35+
const parser = RpcSerialization.makeMsgPack({ useRecords: false }).unsafeMake()
36+
const payload = { _tag: "Request", id: 1, method: "echo" }
37+
const encoded = parser.encode(payload)
38+
const decoded = parser.decode(encoded as Uint8Array)
39+
assert.strictEqual(decoded.length, 1)
40+
assert.deepStrictEqual(decoded[0], payload)
41+
})
42+
43+
it("useRecords false handles nested objects with repeated structures", () => {
44+
const parser = RpcSerialization.makeMsgPack({ useRecords: false }).unsafeMake()
45+
const payload = {
46+
_tag: "Chunk",
47+
requestId: "1",
48+
values: [
49+
{ _tag: "Exit", requestId: "1", exit: { _tag: "Success", value: { _tag: "Ok", data: "a" } } },
50+
{ _tag: "Exit", requestId: "2", exit: { _tag: "Success", value: { _tag: "Ok", data: "b" } } },
51+
{ _tag: "Exit", requestId: "3", exit: { _tag: "Success", value: { _tag: "Ok", data: "c" } } },
52+
{ _tag: "Exit", requestId: "4", exit: { _tag: "Success", value: { _tag: "Ok", data: "d" } } }
53+
]
54+
}
55+
const encoded = parser.encode(payload)
56+
const decoded = parser.decode(encoded as Uint8Array)
57+
assert.strictEqual(decoded.length, 1)
58+
assert.deepStrictEqual(decoded[0], payload)
59+
})
60+
})
61+
})

0 commit comments

Comments
 (0)