-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinterface.go
More file actions
63 lines (53 loc) · 2.83 KB
/
interface.go
File metadata and controls
63 lines (53 loc) · 2.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Package eventstream implements a single-producer, multiple-consumer event stream.
package eventstream
import (
"context"
"errors"
)
// EventStream allows a single producer to publish events to multiple asynchronous consumers, who each
// receive all events. Note that event values are opaque to the event stream and no copies are made:
// consumers should not mutate consumed event values!
type EventStream[T any] interface {
// Publish adds the next value to the stream. This method can be called concurrently with subscriber reads,
// but not with other publisher operations. External synchronization is required if there are multiple concurrent
// publishers.
Publish(T)
// Close ends the stream; the same concurrency rules apply to Close() and Publish().
Close()
// Subscribe returns a Promise to the next unpublished event. The returned Promise gives the caller
// the events in the stream from the current position forward.
Subscribe() Promise[T]
}
// Promise is a handle to the next event in the stream, plus all events following.
// A Promise is effectively immutable and can be shared. To be used concurrently with Publish operations.
type Promise[T any] interface {
// Ready returns the ready channel for this node; the channel closes when this Promise is ready.
Ready() <-chan struct{}
// Next returns the next event in the stream, and the next Promise. Multiple calls return consistent results.
// Returns (zero, nil) when the stream is Closed.
//
// Note that this method internally blocks until the Ready() channel is closed!
// Typical callers will not call Next() until this Promise is ready.
Next() (T, Promise[T])
// Iterator creates an Iterator based on this Promise. The Promise is unchanged.
Iterator() Iterator[T]
}
// ErrDone is returned by Iterator.Next() when the underlying EventStream is closed.
var ErrDone = errors.New("no more items in iterator")
// Iterator iterates an event stream. To be used concurrently with Publish operations.
//
// Unlike Promises, Iterators are stateful and should not be shared across go routines.
type Iterator[T any] interface {
// Next returns the next event in the stream.
// - Returns (<event>, nil) when the next event is published.
// - Returns (zero, ErrDone) when the stream is exhausted.
// - Returns (zero, ctx.Err()) if the context is cancelled.
// Blocks until one of these three outcomes occurs.
Next(ctx context.Context) (T, error)
// Consume iterates the remainder of the stream, calling the provided callback with each successive value.
// - Returns `nil` when the stream is exhausted, or if the callback returns ErrDone.
// - Returns `<error>` if the callback returns any other non-nil error.
// - Returns `ctx.Err()` if the context is cancelled.
// Blocks until one of these three outcomes occurs.
Consume(ctx context.Context, callback func(context.Context, T) error) error
}