Skip to content

✨ Iterate Subscriptions with each()#1079

Open
cowboyd wants to merge 1 commit intov4from
iterate-subscriptions
Open

✨ Iterate Subscriptions with each()#1079
cowboyd wants to merge 1 commit intov4from
iterate-subscriptions

Conversation

@cowboyd
Copy link
Member

@cowboyd cowboyd commented Jan 17, 2026

Motivation

A pesistent gotcha that folks have with iterating streams is that there can arise a race condition where you want to start consuming a stream, but items have already been sent to it by the time your consumer fires up.

// this might be a race condition because stuff could be added to the stream before the sub task starts.
yield* spawn(function*() {
  for (let item of yield* each(stream)) {
    // do work
    yield* each.next();
  }
});

The fix is to capture a live subscription before your consumer loop fires up. But this leaves you in a pickle because now you can't use the convenient each() helper to do your iteration, so you have to manually iterate:

let subscription = yield* stream;
// we are now subscribed, so nothing will be missed.

yield* spawn(function*() {
  let next = yield* subscription.next();
  while (!next.done) {
    //do work with next.value
    next = yield* subscription.next();
  }
});

That's ugly, and you shouldn't have to do it as often as you have to.

Approach

This allows the argument to each() to either be a stream or a subscription. That way, you can have the best of both worlds. The above becomes:

let subscription = yield* stream;
yield* spawn(function*() {
  for (let item of yield* each(subscription)) {
    // do work
    yield* each.next();
  }
});

Now you get the nice syntax, but any items that arrive after you subscribe, but before your consumer task spins up are still delivered.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Jan 17, 2026

Open in StackBlitz

npm i https://pkg.pr.new/thefrontside/effection@1079

commit: b1cc893

A pesistent gotcha that folks have with iterating streams is that
there can arise a race condition where you want to start consuming a
stream, but items have already been sent to it by the time your
consumer fires up. The fix is to capture a live subscription _before_
your consumer loop fires up. But this leaves you in a pickle because
now you can't use the convenient `each()` helper to do your iteration.

This allows the argument to `each()` to either be a stream or a
subscription. That way, you can have the best of both worlds.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants