javascript - how do i create a RXJS pausable data flow that doesnt reset on each resume? -
i have tried out documented example rxjs pausable , while pauses ok resets on resume. how modify example below have stream resume paused rather reset?
var pauser = new rx.subject(); var source = rx.observable .interval(1000) .timeinterval() .pausable(pauser); var subscription = source.subscribe( function (x) { $("#result").append('next: ' + json.stringify(x) + '<br>'); }, function (err) { $("#result").append('error: ' + err); }, function () { $("#result").append('completed'); }); pauser.onnext(true); var paused = false; $("#result").click(function() { $(this).append("mouse clicked"); paused = (paused === false) ? true : false; pauser.onnext(paused); });
this giving me following output:
next: {"value":0,"interval":1002} next: {"value":1,"interval":1000} next: {"value":2,"interval":999} mouse clicked mouse clicked next: {"value":0,"interval":1001} next: {"value":1,"interval":999} next: {"value":2,"interval":1000}
as mentioned in pausable
documentation, pausable
used on hot sources.
one way make source hot use share
. however, not work in conjunction pausable
because share
disconnect source when has no subscribers, happen when pause.
so here 2 ways make work. 1 use share
, keep dummy subscriber share
never disconnects source there @ least 1 subscriber. second way use publish
, , connect
observable once wiring has been made.
example 1 dummy subscriber:
var pauser = new rx.subject(); function noop(){} var source = rx.observable .interval(1000) .timeinterval() .share(); var pausablesource = source.pausable(pauser); var subscription = pausablesource.subscribe( function (x) { $("#ta_result").append('next: ' + json.stringify(x) + '<br>'); }, function (err) { $("#ta_result").append('error: ' + err); }, function () { $("#ta_result").append('completed'); }); source.subscribe(noop); pauser.onnext(false); var paused = false; $("#result").click(function() { $("#ta_change").append("mouse clicked\n"); paused = !paused; pauser.onnext(paused); });
example 2 connect
:
var pauser = new rx.subject(); var source = rx.observable .interval(1000) .timeinterval() .publish(); var pausablesource = source.pausable(pauser); // source.subscribe(function(){}); var subscription = pausablesource.subscribe( function (x) { $("#ta_result").append('next: ' + json.stringify(x) + '<br>'); }, function (err) { $("#ta_result").append('error: ' + err); }, function () { $("#ta_result").append('completed'); }); source.connect(); pauser.onnext(false); var paused = false; $("#result").click(function() { $("#ta_change").append("mouse clicked\n"); paused = !paused; pauser.onnext(paused); });
Comments
Post a Comment