Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add observable utilities

+74
+74
src/observable.ts
··· 1 + import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types' 2 + import { push, start, talkbackPlaceholder } from './helpers' 3 + 4 + interface ObservableSubscription { 5 + closed?: boolean; 6 + unsubscribe(): void; 7 + } 8 + 9 + export interface ObservableObserver<T> { 10 + next(value: T): void; 11 + error(error: any): void; 12 + complete(): void; 13 + } 14 + 15 + interface Observable<T> { 16 + subscribe(observer: ObservableObserver<T>): ObservableSubscription; 17 + } 18 + 19 + const observableSymbol: unique symbol = typeof Symbol === 'function' 20 + ? (Symbol as any).observable || ((Symbol as any).observable = Symbol('observable')) 21 + : '@@observable' 22 + 23 + export function fromObservable<T>(input: Observable<T>): Source<T> { 24 + const observable: Observable<T> = input[observableSymbol] 25 + ? (input as any)[observableSymbol]() 26 + : input; 27 + return (sink) => { 28 + const subscription = observable.subscribe({ 29 + next(value: T) { 30 + sink(push(value)); 31 + }, 32 + complete() { 33 + sink(SignalKind.End); 34 + }, 35 + error() {/*noop*/}, 36 + }); 37 + sink(start((signal) => { 38 + if (signal === TalkbackKind.Close) subscription.unsubscribe(); 39 + })); 40 + }; 41 + } 42 + 43 + export function toObservable<T>(source: Source<T>): Observable<T> { 44 + const observable: Observable<T> = { 45 + subscribe(observer: ObservableObserver<T>) { 46 + let talkback = talkbackPlaceholder; 47 + let ended = false; 48 + source((signal) => { 49 + if (ended) { 50 + /*noop*/ 51 + } else if (signal === SignalKind.End) { 52 + ended = true; 53 + observer.complete(); 54 + } else if (signal.tag === SignalKind.Start) { 55 + (talkback = signal[0])(TalkbackKind.Pull); 56 + } else { 57 + observer.next(signal[0]); 58 + talkback(TalkbackKind.Pull); 59 + } 60 + }); 61 + const subscription = { 62 + closed: false, 63 + unsubscribe() { 64 + subscription.closed = true; 65 + ended = true; 66 + talkback(TalkbackKind.Close); 67 + }, 68 + }; 69 + return subscription; 70 + }, 71 + }; 72 + observable[observableSymbol] = () => observable; 73 + return observable; 74 + }