Skip to content

Commit f32f79e

Browse files
committed
feat(share): use another observable to control resets
1 parent c5dbfd6 commit f32f79e

1 file changed

Lines changed: 87 additions & 21 deletions

File tree

src/internal/operators/share.ts

Lines changed: 87 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import { Observable } from '../Observable';
2+
import { from } from '../observable/from';
3+
import { take } from '../operators/take';
14
import { Subject } from '../Subject';
2-
import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike } from '../types';
35
import { SafeSubscriber } from '../Subscriber';
4-
import { from } from '../observable/from';
6+
import { Subscription } from '../Subscription';
7+
import { MonoTypeOperatorFunction, SubjectLike } from '../types';
58
import { operate } from '../util/lift';
69

710
export interface ShareConfig<T> {
@@ -17,25 +20,31 @@ export interface ShareConfig<T> {
1720
* will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent retries
1821
* or resubscriptions will resubscribe to that same subject. In all cases, RxJS subjects will emit the same error again, however
1922
* {@link ReplaySubject} will also push its buffered values before pushing the error.
23+
* It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained
24+
* control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
2025
*/
21-
resetOnError?: boolean;
26+
resetOnError?: boolean | ((error: any) => Observable<any>);
2227
/**
2328
* If true, the resulting observable will reset internal state on completion from source and return to a "cold" state. This
2429
* allows the resulting observable to be "repeated" after it is done.
2530
* If false, when the source completes, it will push the completion through the connecting subject, and the subject
2631
* will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent repeats
2732
* or resubscriptions will resubscribe to that same subject.
33+
* It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained
34+
* control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
2835
*/
29-
resetOnComplete?: boolean;
36+
resetOnComplete?: boolean | (() => Observable<any>);
3037
/**
3138
* If true, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the
3239
* internal state will be reset and the resulting observable will return to a "cold" state. This means that the next
3340
* time the resulting observable is subscribed to, a new subject will be created and the source will be subscribed to
3441
* again.
3542
* If false, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject
3643
* will remain connected to the source, and new subscriptions to the result will be connected through that same subject.
44+
* It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained
45+
* control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
3746
*/
38-
resetOnRefCountZero?: boolean;
47+
resetOnRefCountZero?: boolean | (() => Observable<any>);
3948
}
4049

4150
export function share<T>(): MonoTypeOperatorFunction<T>;
@@ -84,30 +93,72 @@ export function share<T>(options: ShareConfig<T>): MonoTypeOperatorFunction<T>;
8493
* // ... and so on
8594
* ```
8695
*
96+
* ## Example with notifier factory: Delayed reset
97+
* ```ts
98+
* import { interval } from 'rxjs';
99+
* import { share, take, timer } from 'rxjs/operators';
100+
*
101+
* const source = interval(1000).pipe(take(3), share({ resetOnRefCountZero: () => timer(1000) }));
102+
*
103+
* const subscriptionOne = source.subscribe(x => console.log('subscription 1: ', x));
104+
* setTimeout(() => subscriptionOne.unsubscribe(), 1300);
105+
*
106+
* setTimeout(() => source.subscribe(x => console.log('subscription 2: ', x)), 1700);
107+
*
108+
* setTimeout(() => source.subscribe(x => console.log('subscription 3: ', x)), 5000);
109+
*
110+
* // Logs:
111+
* // subscription 1: 0
112+
* // (subscription 1 unsubscribes here)
113+
* // (subscription 2 subscribes here ~400ms later, source was not reset)
114+
* // subscription 2: 1
115+
* // subscription 2: 2
116+
* // (subscription 2 unsubscribes here)
117+
* // (subscription 3 subscribes here ~2000ms later, source did reset before)
118+
* // subscription 3: 0
119+
* // subscription 3: 1
120+
* // subscription 3: 2
121+
* ```
122+
*
87123
* @see {@link api/index/function/interval}
88124
* @see {@link map}
89125
*
90126
* @return A function that returns an Observable that mirrors the source.
91127
*/
92-
export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
93-
options = options || {};
94-
const { connector = () => new Subject<T>(), resetOnComplete = true, resetOnError = true, resetOnRefCountZero = true } = options;
128+
export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction<T> {
129+
const { connector = () => new Subject<T>(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options;
95130

96131
let connection: SafeSubscriber<T> | null = null;
132+
let resetConnection: Subscription | null = null;
97133
let subject: SubjectLike<T> | null = null;
98134
let refCount = 0;
99135
let hasCompleted = false;
100136
let hasErrored = false;
101137

138+
const cancelReset = () => {
139+
resetConnection?.unsubscribe();
140+
resetConnection = null;
141+
};
102142
// Used to reset the internal state to a "cold"
103143
// state, as though it had never been subscribed to.
104144
const reset = () => {
145+
cancelReset();
105146
connection = subject = null;
106147
hasCompleted = hasErrored = false;
107148
};
149+
const resetAndUnsubscribe = () => {
150+
// We need to capture the connection before
151+
// we reset (if we need to reset).
152+
const conn = connection;
153+
reset();
154+
conn?.unsubscribe();
155+
};
108156

109157
return operate((source, subscriber) => {
110158
refCount++;
159+
if (!hasErrored && !hasCompleted) {
160+
cancelReset();
161+
}
111162

112163
// Create the subject if we don't have one yet.
113164
subject = subject ?? connector();
@@ -129,19 +180,17 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
129180
// We need to capture the subject before
130181
// we reset (if we need to reset).
131182
const dest = subject!;
132-
if (resetOnError) {
133-
reset();
134-
}
183+
cancelReset();
184+
resetConnection = handleReset(reset, resetOnError, err);
135185
dest.error(err);
136186
},
137187
complete: () => {
138188
hasCompleted = true;
139-
const dest = subject!;
140189
// We need to capture the subject before
141190
// we reset (if we need to reset).
142-
if (resetOnComplete) {
143-
reset();
144-
}
191+
const dest = subject!;
192+
cancelReset();
193+
resetConnection = handleReset(reset, resetOnComplete);
145194
dest.complete();
146195
},
147196
});
@@ -155,13 +204,30 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
155204
// If we're resetting on refCount === 0, and it's 0, we only want to do
156205
// that on "unsubscribe", really. Resetting on error or completion is a different
157206
// configuration.
158-
if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) {
159-
// We need to capture the connection before
160-
// we reset (if we need to reset).
161-
const conn = connection;
162-
reset();
163-
conn?.unsubscribe();
207+
if (refCount === 0 && !hasErrored && !hasCompleted) {
208+
cancelReset(); // paranoia, there should never be a resetConnection, if we reached this point
209+
resetConnection = handleReset(resetAndUnsubscribe, resetOnRefCountZero);
164210
}
165211
};
166212
});
167213
}
214+
215+
function handleReset<T extends unknown[] = never[]>(
216+
fn: () => void,
217+
on: boolean | ((...args: T) => Observable<any>),
218+
...args: T
219+
): Subscription | null {
220+
if (on === true) {
221+
fn();
222+
223+
return null;
224+
}
225+
226+
if (on === false) {
227+
return null;
228+
}
229+
230+
return on(...args)
231+
.pipe(take(1))
232+
.subscribe(() => fn());
233+
}

0 commit comments

Comments
 (0)