-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Expand file tree
/
Copy pathObservableStream.ts
More file actions
139 lines (122 loc) · 4.4 KB
/
ObservableStream.ts
File metadata and controls
139 lines (122 loc) · 4.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import type { Tester } from "@jest/expect-utils";
import { equals, iterableEquality } from "@jest/expect-utils";
import { expect } from "@jest/globals";
import * as matcherUtils from "jest-matcher-utils";
import type {
Observable,
ObservableSubscription,
} from "../../utilities/index.js";
import { ReadableStream } from "node:stream/web";
export interface TakeOptions {
timeout?: number;
}
type ObservableEvent<T> =
| { type: "next"; value: T }
| { type: "error"; error: any }
| { type: "complete" };
export class ObservableStream<T> {
private reader: ReadableStreamDefaultReader<ObservableEvent<T>>;
private subscription!: ObservableSubscription;
private readerQueue: Array<Promise<ObservableEvent<T>>> = [];
constructor(observable: Observable<T>) {
this.reader = new ReadableStream<ObservableEvent<T>>({
start: (controller) => {
this.subscription = observable.subscribe(
(value) => controller.enqueue({ type: "next", value }),
(error) => controller.enqueue({ type: "error", error }),
() => controller.enqueue({ type: "complete" })
);
},
}).getReader();
}
peek({ timeout = 100 }: TakeOptions = {}) {
// Calling `peek` multiple times in a row should not advance the reader
// multiple times until this value has been consumed.
let readerPromise = this.readerQueue[0];
if (!readerPromise) {
// Since this.reader.read() advances the reader in the stream, we don't
// want to consume this promise entirely, otherwise we will miss it when
// calling `take`. Instead, we push it into a queue that can be consumed
// by `take` the next time its called so that we avoid advancing the
// reader until we are finished processing all peeked values.
readerPromise = this.readNextValue();
this.readerQueue.push(readerPromise);
}
return Promise.race([
readerPromise,
new Promise<ObservableEvent<T>>((_, reject) => {
setTimeout(
reject,
timeout,
new Error("Timeout waiting for next event")
);
}),
]);
}
take({ timeout = 100 }: TakeOptions = {}) {
return Promise.race([
this.readerQueue.shift() || this.readNextValue(),
new Promise<ObservableEvent<T>>((_, reject) => {
setTimeout(
reject,
timeout,
new Error("Timeout waiting for next event")
);
}),
]);
}
unsubscribe() {
this.subscription.unsubscribe();
}
async takeNext(options?: TakeOptions): Promise<T> {
const event = await this.take(options);
validateEquals(event, { type: "next", value: expect.anything() });
return (event as ObservableEvent<T> & { type: "next" }).value;
}
async takeError(options?: TakeOptions): Promise<any> {
const event = await this.take(options);
validateEquals(event, { type: "error", error: expect.anything() });
return (event as ObservableEvent<T> & { type: "error" }).error;
}
async takeComplete(options?: TakeOptions): Promise<void> {
const event = await this.take(options);
validateEquals(event, { type: "complete" });
}
private async readNextValue() {
return this.reader.read().then((result) => result.value!);
}
}
// Lightweight expect(...).toEqual(...) check that avoids using `expect` so that
// `expect.assertions(num)` does not double count assertions when using the take*
// functions inside of expect(stream).toEmit* matchers.
function validateEquals(
actualEvent: ObservableEvent<any>,
expectedEvent: ObservableEvent<any>
) {
// Uses the same matchers as expect(...).toEqual(...)
// https://github.com/jestjs/jest/blob/611d1a4ba0008d67b5dcda485177f0813b2b573e/packages/expect/src/matchers.ts#L626-L629
const isEqual = equals(actualEvent, expectedEvent, [
...getCustomMatchers(),
iterableEquality,
]);
if (isEqual) {
return;
}
const hint = matcherUtils.matcherHint("toEqual", "stream", "expected");
throw new Error(
hint +
"\n\n" +
matcherUtils.printDiffOrStringify(
expectedEvent,
actualEvent,
"Expected",
"Received",
true
)
);
}
function getCustomMatchers(): Array<Tester> {
// https://github.com/jestjs/jest/blob/611d1a4ba0008d67b5dcda485177f0813b2b573e/packages/expect/src/jestMatchersObject.ts#L141-L143
const JEST_MATCHERS_OBJECT = Symbol.for("$$jest-matchers-object");
return (globalThis as any)[JEST_MATCHERS_OBJECT].customEqualityTesters;
}