Skip to content

Commit b1cc893

Browse files
committed
✨ Iterate Subscriptions with each()
A pesistent gotcha that folks have with iterating streams is that there can arise a race condition where you want to start consuming a stream, but items have already been sent to it by the time your consumer fires up. The fix is to capture a live subscription _before_ your consumer loop fires up. But this leaves you in a pickle because now you can't use the convenient `each()` helper to do your iteration. This allows the argument to `each()` to either be a stream or a subscription. That way, you can have the best of both worlds.
1 parent 153e420 commit b1cc893

2 files changed

Lines changed: 31 additions & 2 deletions

File tree

lib/each.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import { spawn } from "./spawn.ts";
2+
import { constant } from "./constant.ts";
13
import { createContext } from "./context.ts";
24
import { useScope } from "./scope.ts";
3-
import { spawn } from "./spawn.ts";
45
import type { Operation, Stream, Subscription } from "./types.ts";
56
import { withResolvers } from "./with-resolvers.ts";
67

@@ -28,9 +29,16 @@ import { withResolvers } from "./with-resolvers.ts";
2829
* @param stream - the stream to iterate
2930
* @returns an operation to iterate `stream`
3031
*/
31-
export function each<T>(stream: Stream<T, unknown>): Operation<Iterable<T>> {
32+
export function each<T>(
33+
enumerable: Stream<T, unknown> | Subscription<T, unknown>,
34+
): Operation<Iterable<T>> {
3235
return {
3336
*[Symbol.iterator]() {
37+
let stream = typeof (enumerable as Subscription<T, unknown>).next ===
38+
"function"
39+
? constant(enumerable as Subscription<T, unknown>)
40+
: enumerable as Stream<T, unknown>;
41+
3442
let scope = yield* useScope();
3543
if (!scope.hasOwn(EachStack)) {
3644
scope.set(EachStack, []);

test/each.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { describe, expect, it } from "./suite.ts";
22
import {
3+
createQueue,
34
each,
45
type Operation,
56
resource,
@@ -54,6 +55,26 @@ describe("each", () => {
5455
});
5556
});
5657

58+
it("can iterate subscriptions", async () => {
59+
await run(function* () {
60+
let queue = createQueue<number, void>();
61+
62+
queue.add(1);
63+
queue.add(2);
64+
queue.add(3);
65+
queue.close();
66+
67+
let items = [];
68+
69+
for (let num of yield* each(queue)) {
70+
items.push(num);
71+
yield* each.next();
72+
}
73+
74+
expect(items).toEqual([1, 2, 3]);
75+
});
76+
});
77+
5778
it("handles context correctly if you break out of a loop", async () => {
5879
await expect(run(function* () {
5980
let seq = sequence("hello world");

0 commit comments

Comments
 (0)