Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix take operator not supporting take(0) and not respecting Close (#47)

* Fix take operator with max <= 0

* Fix passesSinkClose for take operator

The take operator needs to keep track of ended
actively to avoid confusion with the state.taken
number and the max <= 0 case.

* Add !state.ended condition to take's Push signal

authored by kitten.sh and committed by

GitHub 1bdbaccc 01f29fb4

+22 -14
+19 -9
src/wonka_operators.re
··· 773 773 switchMap((. x) => x, source); 774 774 775 775 type takeStateT = { 776 + mutable ended: bool, 776 777 mutable taken: int, 777 778 mutable talkback: (. talkbackT) => unit, 778 779 }; ··· 781 782 let take = (max: int): operatorT('a, 'a) => 782 783 curry(source => 783 784 curry(sink => { 784 - let state: takeStateT = {taken: 0, talkback: talkbackPlaceholder}; 785 + let state: takeStateT = { 786 + ended: false, 787 + taken: 0, 788 + talkback: talkbackPlaceholder, 789 + }; 785 790 786 791 source((. signal) => 787 792 switch (signal) { 793 + | Start(tb) when max <= 0 => 794 + state.ended = true; 795 + sink(. End); 796 + tb(. Close); 788 797 | Start(tb) => state.talkback = tb 789 - | Push(_) when state.taken < max => 798 + | Push(_) when state.taken < max && !state.ended => 790 799 state.taken = state.taken + 1; 791 800 sink(. signal); 792 - 793 - if (state.taken === max) { 801 + if (!state.ended && state.taken >= max) { 802 + state.ended = true; 794 803 sink(. End); 795 804 state.talkback(. Close); 796 805 }; 797 806 | Push(_) => () 798 - | End when state.taken < max => 799 - state.taken = max; 807 + | End when !state.ended => 808 + state.ended = true; 800 809 sink(. End); 801 810 | End => () 802 811 } ··· 805 814 sink(. 806 815 Start( 807 816 (. signal) => 808 - if (state.taken < max) { 817 + if (!state.ended) { 809 818 switch (signal) { 810 - | Pull => state.talkback(. Pull) 819 + | Pull when state.taken < max => state.talkback(. Pull) 820 + | Pull => () 811 821 | Close => 812 - state.taken = max; 822 + state.ended = true; 813 823 state.talkback(. Close); 814 824 }; 815 825 },
+3 -5
src/wonka_operators.test.ts
··· 208 208 if (tb === deriving.pull) { 209 209 pulls++; 210 210 sink(deriving.end()); 211 - sink(deriving.push(0)); 211 + sink(deriving.push(123)); 212 212 } 213 213 })); 214 214 }; ··· 851 851 const noop = operators.take(10); 852 852 passesPassivePull(noop); 853 853 passesActivePush(noop); 854 - // TODO: passesSinkClose(noop); 854 + passesSinkClose(noop); 855 855 passesSourceEnd(noop); 856 856 passesSingleStart(noop); 857 857 passesStrictEnd(noop); 858 858 passesAsyncSequence(noop); 859 859 860 - // TODO: take(0) seems to be broken 861 - const ending = operators.take(1); 862 - passesCloseAndEnd(ending); 860 + passesCloseAndEnd(operators.take(0)); 863 861 864 862 it('emits values until a maximum is reached', () => { 865 863 const { source, next } = sources.makeSubject<number>();