Commit b983b063 authored by Arthur Verschaeve's avatar Arthur Verschaeve

Webrx: update dependencies

parent 196c517b
......@@ -40,7 +40,7 @@
defaultSubComparer = Rx.helpers.defaultSubComparer = function (x, y) { return x > y ? 1 : (x < y ? -1 : 0); },
defaultKeySerializer = Rx.helpers.defaultKeySerializer = function (x) { return x.toString(); },
defaultError = Rx.helpers.defaultError = function (err) { throw err; },
isPromise = Rx.helpers.isPromise = function (p) { return !!p && typeof p.then === 'function'; },
isPromise = Rx.helpers.isPromise = function (p) { return !!p && typeof p.subscribe !== 'function' && typeof p.then === 'function'; },
asArray = Rx.helpers.asArray = function () { return Array.prototype.slice.call(arguments); },
not = Rx.helpers.not = function (a) { return !a; },
isFunction = Rx.helpers.isFunction = (function () {
......@@ -1053,7 +1053,6 @@
}
});
}
recursiveAction(state);
return group;
}
......@@ -1092,8 +1091,7 @@
* @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
*/
schedulerProto.scheduleRecursive = function (action) {
return this.scheduleRecursiveWithState(action, function (_action, self) {
_action(function () { self(_action); }); });
return this.scheduleRecursiveWithState(action, scheduleInnerRecursive);
};
/**
......@@ -1610,188 +1608,6 @@
};
}());
var Enumerator = Rx.internals.Enumerator = function (next) {
this._next = next;
};
Enumerator.prototype.next = function () {
return this._next();
};
Enumerator.prototype[$iterator$] = function () { return this; }
var Enumerable = Rx.internals.Enumerable = function (iterator) {
this._iterator = iterator;
};
Enumerable.prototype[$iterator$] = function () {
return this._iterator();
};
Enumerable.prototype.concat = function () {
var sources = this;
return new AnonymousObservable(function (o) {
var e = sources[$iterator$]();
var isDisposed, subscription = new SerialDisposable();
var cancelable = immediateScheduler.scheduleRecursive(function (self) {
if (isDisposed) { return; }
try {
var currentItem = e.next();
} catch (ex) {
return o.onError(ex);
}
if (currentItem.done) {
return o.onCompleted();
}
// Check if promise
var currentValue = currentItem.value;
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
var d = new SingleAssignmentDisposable();
subscription.setDisposable(d);
d.setDisposable(currentValue.subscribe(
function(x) { o.onNext(x); },
function(err) { o.onError(err); },
self)
);
});
return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
isDisposed = true;
}));
});
};
Enumerable.prototype.catchError = function () {
var sources = this;
return new AnonymousObservable(function (o) {
var e = sources[$iterator$]();
var isDisposed, subscription = new SerialDisposable();
var cancelable = immediateScheduler.scheduleRecursiveWithState(null, function (lastException, self) {
if (isDisposed) { return; }
try {
var currentItem = e.next();
} catch (ex) {
return observer.onError(ex);
}
if (currentItem.done) {
if (lastException !== null) {
o.onError(lastException);
} else {
o.onCompleted();
}
return;
}
// Check if promise
var currentValue = currentItem.value;
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
var d = new SingleAssignmentDisposable();
subscription.setDisposable(d);
d.setDisposable(currentValue.subscribe(
function(x) { o.onNext(x); },
self,
function() { o.onCompleted(); }));
});
return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
isDisposed = true;
}));
});
};
Enumerable.prototype.catchErrorWhen = function (notificationHandler) {
var sources = this;
return new AnonymousObservable(function (o) {
var exceptions = new Subject(),
notifier = new Subject(),
handled = notificationHandler(exceptions),
notificationDisposable = handled.subscribe(notifier);
var e = sources[$iterator$]();
var isDisposed,
lastException,
subscription = new SerialDisposable();
var cancelable = immediateScheduler.scheduleRecursive(function (self) {
if (isDisposed) { return; }
try {
var currentItem = e.next();
} catch (ex) {
return o.onError(ex);
}
if (currentItem.done) {
if (lastException) {
o.onError(lastException);
} else {
o.onCompleted();
}
return;
}
// Check if promise
var currentValue = currentItem.value;
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
var outer = new SingleAssignmentDisposable();
var inner = new SingleAssignmentDisposable();
subscription.setDisposable(new CompositeDisposable(inner, outer));
outer.setDisposable(currentValue.subscribe(
function(x) { o.onNext(x); },
function (exn) {
inner.setDisposable(notifier.subscribe(self, function(ex) {
o.onError(ex);
}, function() {
o.onCompleted();
}));
exceptions.onNext(exn);
},
function() { o.onCompleted(); }));
});
return new CompositeDisposable(notificationDisposable, subscription, cancelable, disposableCreate(function () {
isDisposed = true;
}));
});
};
var enumerableRepeat = Enumerable.repeat = function (value, repeatCount) {
if (repeatCount == null) { repeatCount = -1; }
return new Enumerable(function () {
var left = repeatCount;
return new Enumerator(function () {
if (left === 0) { return doneEnumerator; }
if (left > 0) { left--; }
return { done: false, value: value };
});
});
};
var enumerableOf = Enumerable.of = function (source, selector, thisArg) {
if (selector) {
var selectorFn = bindCallback(selector, thisArg, 3);
}
return new Enumerable(function () {
var index = -1;
return new Enumerator(
function () {
return ++index < source.length ?
{ done: false, value: !selector ? source[index] : selectorFn(source[index], index, source) } :
doneEnumerator;
});
});
};
/**
* Supports push-style iteration over an observable sequence.
*/
......@@ -2233,66 +2049,296 @@
return ObservableBase;
}(Observable));
/**
* Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
*
* This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects
* that require to be run on a scheduler, use subscribeOn.
*
* @param {Scheduler} scheduler Scheduler to notify observers on.
* @returns {Observable} The source sequence whose observations happen on the specified scheduler.
*/
observableProto.observeOn = function (scheduler) {
var source = this;
return new AnonymousObservable(function (observer) {
return source.subscribe(new ObserveOnObserver(scheduler, observer));
}, source);
};
var Enumerable = Rx.internals.Enumerable = function () { };
/**
* Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used;
* see the remarks section for more information on the distinction between subscribeOn and observeOn.
var ConcatEnumerableObservable = (function(__super__) {
inherits(ConcatEnumerableObservable, __super__);
function ConcatEnumerableObservable(sources) {
this.sources = sources;
__super__.call(this);
}
ConcatEnumerableObservable.prototype.subscribeCore = function (o) {
var isDisposed, subscription = new SerialDisposable();
var cancelable = immediateScheduler.scheduleRecursiveWithState(this.sources[$iterator$](), function (e, self) {
if (isDisposed) { return; }
var currentItem = tryCatch(e.next).call(e);
if (currentItem === errorObj) { return o.onError(currentItem.e); }
* This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer
* callbacks on a scheduler, use observeOn.
if (currentItem.done) {
return o.onCompleted();
}
* @param {Scheduler} scheduler Scheduler to perform subscription and unsubscription actions on.
* @returns {Observable} The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
observableProto.subscribeOn = function (scheduler) {
var source = this;
return new AnonymousObservable(function (observer) {
var m = new SingleAssignmentDisposable(), d = new SerialDisposable();
d.setDisposable(m);
m.setDisposable(scheduler.schedule(function () {
d.setDisposable(new ScheduledDisposable(scheduler, source.subscribe(observer)));
// Check if promise
var currentValue = currentItem.value;
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
var d = new SingleAssignmentDisposable();
subscription.setDisposable(d);
d.setDisposable(currentValue.subscribe(new InnerObserver(o, self, e)));
});
return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
isDisposed = true;
}));
return d;
}, source);
};
function InnerObserver(o, s, e) {
this.o = o;
this.s = s;
this.e = e;
this.isStopped = false;
}
InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.o.onNext(x); } };
InnerObserver.prototype.onError = function (err) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(err);
}
};
InnerObserver.prototype.onCompleted = function () {
if (!this.isStopped) {
this.isStopped = true;
this.s(this.e);
}
};
InnerObserver.prototype.dispose = function () { this.isStopped = true; };
InnerObserver.prototype.fail = function (err) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(err);
return true;
}
return false;
};
return ConcatEnumerableObservable;
}(ObservableBase));
Enumerable.prototype.concat = function () {
return new ConcatEnumerableObservable(this);
};
var CatchErrorObservable = (function(__super__) {
inherits(CatchErrorObservable, __super__);
function CatchErrorObservable(sources) {
this.sources = sources;
__super__.call(this);
}
CatchErrorObservable.prototype.subscribeCore = function (o) {
var e = this.sources[$iterator$]();
/**
* Converts a Promise to an Observable sequence
* @param {Promise} An ES6 Compliant promise.
* @returns {Observable} An Observable sequence which wraps the existing promise success and failure.
*/
var observableFromPromise = Observable.fromPromise = function (promise) {
return observableDefer(function () {
var subject = new Rx.AsyncSubject();
var isDisposed, subscription = new SerialDisposable();
var cancelable = immediateScheduler.scheduleRecursiveWithState(null, function (lastException, self) {
if (isDisposed) { return; }
var currentItem = tryCatch(e.next).call(e);
if (currentItem === errorObj) { return o.onError(currentItem.e); }
promise.then(
function (value) {
subject.onNext(value);
subject.onCompleted();
},
subject.onError.bind(subject));
if (currentItem.done) {
return lastException !== null ? o.onError(lastException) : o.onCompleted();
}
return subject;
});
// Check if promise
var currentValue = currentItem.value;
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
var d = new SingleAssignmentDisposable();
subscription.setDisposable(d);
d.setDisposable(currentValue.subscribe(
function(x) { o.onNext(x); },
self,
function() { o.onCompleted(); }));
});
return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
isDisposed = true;
}));
};
return CatchErrorObservable;
}(ObservableBase));
Enumerable.prototype.catchError = function () {
return new CatchErrorObservable(this);
};
/*
* Converts an existing observable sequence to an ES6 Compatible Promise
Enumerable.prototype.catchErrorWhen = function (notificationHandler) {
var sources = this;
return new AnonymousObservable(function (o) {
var exceptions = new Subject(),
notifier = new Subject(),
handled = notificationHandler(exceptions),
notificationDisposable = handled.subscribe(notifier);
var e = sources[$iterator$]();
var isDisposed,
lastException,
subscription = new SerialDisposable();
var cancelable = immediateScheduler.scheduleRecursive(function (self) {
if (isDisposed) { return; }
var currentItem = tryCatch(e.next).call(e);
if (currentItem === errorObj) { return o.onError(currentItem.e); }
if (currentItem.done) {
if (lastException) {
o.onError(lastException);
} else {
o.onCompleted();
}
return;
}
// Check if promise
var currentValue = currentItem.value;
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
var outer = new SingleAssignmentDisposable();
var inner = new SingleAssignmentDisposable();
subscription.setDisposable(new CompositeDisposable(inner, outer));
outer.setDisposable(currentValue.subscribe(
function(x) { o.onNext(x); },
function (exn) {
inner.setDisposable(notifier.subscribe(self, function(ex) {
o.onError(ex);
}, function() {
o.onCompleted();
}));
exceptions.onNext(exn);
},
function() { o.onCompleted(); }));
});
return new CompositeDisposable(notificationDisposable, subscription, cancelable, disposableCreate(function () {
isDisposed = true;
}));
});
};
var RepeatEnumerable = (function (__super__) {
inherits(RepeatEnumerable, __super__);
function RepeatEnumerable(v, c) {
this.v = v;
this.c = c == null ? -1 : c;
}
RepeatEnumerable.prototype[$iterator$] = function () {
return new RepeatEnumerator(this);
};
function RepeatEnumerator(p) {
this.v = p.v;
this.l = p.c;
}
RepeatEnumerator.prototype.next = function () {
if (this.l === 0) { return doneEnumerator; }
if (this.l > 0) { this.l--; }
return { done: false, value: this.v };
};
return RepeatEnumerable;
}(Enumerable));
var enumerableRepeat = Enumerable.repeat = function (value, repeatCount) {
return new RepeatEnumerable(value, repeatCount);
};
var OfEnumerable = (function(__super__) {
inherits(OfEnumerable, __super__);
function OfEnumerable(s, fn, thisArg) {
this.s = s;
this.fn = fn ? bindCallback(fn, thisArg, 3) : null;
}
OfEnumerable.prototype[$iterator$] = function () {
return new OfEnumerator(this);
};
function OfEnumerator(p) {
this.i = -1;
this.s = p.s;
this.l = this.s.length;
this.fn = p.fn;
}
OfEnumerator.prototype.next = function () {
return ++this.i < this.l ?
{ done: false, value: !this.fn ? this.s[this.i] : this.fn(this.s[this.i], this.i, this.s) } :
doneEnumerator;
};
return OfEnumerable;
}(Enumerable));
var enumerableOf = Enumerable.of = function (source, selector, thisArg) {
return new OfEnumerable(source, selector, thisArg);
};
/**
* Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
*
* This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects
* that require to be run on a scheduler, use subscribeOn.
*
* @param {Scheduler} scheduler Scheduler to notify observers on.
* @returns {Observable} The source sequence whose observations happen on the specified scheduler.
*/
observableProto.observeOn = function (scheduler) {
var source = this;
return new AnonymousObservable(function (observer) {
return source.subscribe(new ObserveOnObserver(scheduler, observer));
}, source);
};
/**
* Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used;
* see the remarks section for more information on the distinction between subscribeOn and observeOn.
* This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer
* callbacks on a scheduler, use observeOn.
* @param {Scheduler} scheduler Scheduler to perform subscription and unsubscription actions on.
* @returns {Observable} The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
observableProto.subscribeOn = function (scheduler) {
var source = this;
return new AnonymousObservable(function (observer) {
var m = new SingleAssignmentDisposable(), d = new SerialDisposable();
d.setDisposable(m);
m.setDisposable(scheduler.schedule(function () {
d.setDisposable(new ScheduledDisposable(scheduler, source.subscribe(observer)));
}));
return d;
}, source);
};
var FromPromiseObservable = (function(__super__) {
inherits(FromPromiseObservable, __super__);
function FromPromiseObservable(p) {
this.p = p;
__super__.call(this);
}
FromPromiseObservable.prototype.subscribeCore = function(o) {
this.p.then(function (data) {
o.onNext(data);
o.onCompleted();
}, function (err) { o.onError(err); });
return disposableEmpty;
};
return FromPromiseObservable;
}(ObservableBase));
/**
* Converts a Promise to an Observable sequence
* @param {Promise} An ES6 Compliant promise.
* @returns {Observable} An Observable sequence which wraps the existing promise success and failure.
*/
var observableFromPromise = Observable.fromPromise = function (promise) {
return new FromPromiseObservable(promise);
};
/*
* Converts an existing observable sequence to an ES6 Compatible Promise
* @example
* var promise = Rx.Observable.return(42).toPromise(RSVP.Promise);
*
......@@ -2325,42 +2371,42 @@
__super__.call(this);
}
ToArrayObservable.prototype.subscribeCore = function(observer) {
return this.source.subscribe(new ToArrayObserver(observer));
ToArrayObservable.prototype.subscribeCore = function(o) {
return this.source.subscribe(new InnerObserver(o));
};
return ToArrayObservable;
}(ObservableBase));
function ToArrayObserver(observer) {
this.observer = observer;
this.a = [];
this.isStopped = false;
}
ToArrayObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.a.push(x); } };
ToArrayObserver.prototype.onError = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.observer.onError(e);
}
};
ToArrayObserver.prototype.onCompleted = function () {
if (!this.isStopped) {
this.isStopped = true;
this.observer.onNext(this.a);
this.observer.onCompleted();
}
};
ToArrayObserver.prototype.dispose = function () { this.isStopped = true; }
ToArrayObserver.prototype.fail = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.observer.onError(e);
return true;
function InnerObserver(o) {
this.o = o;
this.a = [];
this.isStopped = false;
}
InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.a.push(x); } };
InnerObserver.prototype.onError = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(e);
}
};
InnerObserver.prototype.onCompleted = function () {
if (!this.isStopped) {
this.isStopped = true;
this.o.onNext(this.a);
this.o.onCompleted();
}
};
InnerObserver.prototype.dispose = function () { this.isStopped = true; }
InnerObserver.prototype.fail = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(e);
return true;
}
return false;
};
return false;
};
return ToArrayObservable;
}(ObservableBase));
/**
* Creates an array from an observable sequence.
......@@ -2753,7 +2799,7 @@
*/
Observable.ofObjectChanges = function(obj) {
if (obj == null) { throw new TypeError('object must not be null or undefined.'); }
if (typeof Object.observe !== 'function' && typeof Object.unobserve !== 'function') { throw new TypeError('Array.observe is not supported on your platform') }
if (typeof Object.observe !== 'function' && typeof Object.unobserve !== 'function') { throw new TypeError('Object.observe is not supported on your platform') }
return new AnonymousObservable(function(observer) {
function observerFn(changes) {
for(var i = 0, len = changes.length; i < len; i++) {
......@@ -2842,7 +2888,7 @@
inherits(RangeObservable, __super__);
function RangeObservable(start, count, scheduler) {
this.start = start;
this.count = count;
this.rangeCount = count;
this.scheduler = scheduler;
__super__.call(this);
}
......@@ -2862,7 +2908,7 @@
}
RangeSink.prototype.run = function () {
var start = this.parent.start, count = this.parent.count, observer = this.observer;
var start = this.parent.start, count = this.parent.rangeCount, observer = this.observer;
function loopRecursive(i, recurse) {
if (i < count) {
observer.onNext(start + i);
......@@ -2989,23 +3035,23 @@
__super__.call(this);
}
ThrowObservable.prototype.subscribeCore = function (observer) {
var sink = new ThrowSink(observer, this);
ThrowObservable.prototype.subscribeCore = function (o) {
var sink = new ThrowSink(o, this);
return sink.run();
};
function ThrowSink(observer, parent) {
this.observer = observer;
this.parent = parent;
function ThrowSink(o, p) {
this.o = o;
this.p = p;
}
function scheduleItem(s, state) {
var error = state[0], observer = state[1];
observer.onError(error);
var e = state[0], o = state[1];
o.onError(e);
}
ThrowSink.prototype.run = function () {
return this.parent.scheduler.scheduleWithState([this.parent.error, this.observer], scheduleItem);
return this.p.scheduler.scheduleWithState([this.p.error, this.o], scheduleItem);
};
return ThrowObservable;
......@@ -3074,36 +3120,24 @@
leftSubscription.setDisposable(leftSource.subscribe(function (left) {
choiceL();
if (choice === leftChoice) {
observer.onNext(left);
}
choice === leftChoice && observer.onNext(left);
}, function (err) {
choiceL();
if (choice === leftChoice) {
observer.onError(err);
}
choice === leftChoice && observer.onError(err);
}, function () {
choiceL();
if (choice === leftChoice) {
observer.onCompleted();
}
choice === leftChoice && observer.onCompleted();
}));
rightSubscription.setDisposable(rightSource.subscribe(function (right) {
choiceR();
if (choice === rightChoice) {
observer.onNext(right);
}
choice === rightChoice && observer.onNext(right);
}, function (err) {
choiceR();
if (choice === rightChoice) {
observer.onError(err);
}
choice === rightChoice && observer.onError(err);
}, function () {
choiceR();
if (choice === rightChoice) {
observer.onCompleted();
}
choice === rightChoice && observer.onCompleted();
}));
return new CompositeDisposable(leftSubscription, rightSubscription);
......@@ -3275,6 +3309,52 @@
return observableConcat.apply(null, args);
};
var ConcatObservable = (function(__super__) {
inherits(ConcatObservable, __super__);
function ConcatObservable(sources) {
this.sources = sources;
__super__.call(this);
}
ConcatObservable.prototype.subscribeCore = function(o) {
var sink = new ConcatSink(this.sources, o);
return sink.run();
};
function ConcatSink(sources, o) {
this.sources = sources;
this.o = o;
}
ConcatSink.prototype.run = function () {
var isDisposed, subscription = new SerialDisposable(), sources = this.sources, length = sources.length, o = this.o;
var cancelable = immediateScheduler.scheduleRecursiveWithState(0, function (i, self) {
if (isDisposed) { return; }
if (i === length) {
return o.onCompleted();
}
// Check if promise
var currentValue = sources[i];
isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
var d = new SingleAssignmentDisposable();
subscription.setDisposable(d);
d.setDisposable(currentValue.subscribe(
function (x) { o.onNext(x); },
function (e) { o.onError(e); },
function () { self(i + 1); }
));
});
return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
isDisposed = true;
}));
};
return ConcatObservable;
}(ObservableBase));
/**
* Concatenates all the observable sequences.
* @param {Array | Arguments} args Arguments or an array to concat to the observable sequence.
......@@ -3288,7 +3368,7 @@
args = new Array(arguments.length);
for(var i = 0, len = arguments.length; i < len; i++) { args[i] = arguments[i]; }
}
return enumerableOf(args).concat();
return new ConcatObservable(args);
};
/**
......@@ -3463,12 +3543,7 @@
m.setDisposable(this.source.subscribe(new MergeAllObserver(observer, g)));
return g;
};
return MergeAllObservable;
}(ObservableBase));
var MergeAllObserver = (function() {
function MergeAllObserver(o, g) {
this.o = o;
this.g = g;
......@@ -3540,9 +3615,8 @@
return false;
};
return MergeAllObserver;
}());
return MergeAllObservable;
}(ObservableBase));
/**
* Merges an observable sequence of observable sequences into an observable sequence.
......@@ -3706,87 +3780,171 @@
}, source);
};
var SwitchObservable = (function(__super__) {
inherits(SwitchObservable, __super__);
function SwitchObservable(source) {
this.source = source;
__super__.call(this);
}
SwitchObservable.prototype.subscribeCore = function (o) {
var inner = new SerialDisposable(), s = this.source.subscribe(new SwitchObserver(o, inner));
return new CompositeDisposable(s, inner);
};
function SwitchObserver(o, inner) {
this.o = o;
this.inner = inner;
this.stopped = false;
this.latest = 0;
this.hasLatest = false;
this.isStopped = false;
}
SwitchObserver.prototype.onNext = function (innerSource) {
if (this.isStopped) { return; }
var d = new SingleAssignmentDisposable(), id = ++this.latest;
this.hasLatest = true;
this.inner.setDisposable(d);
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
d.setDisposable(innerSource.subscribe(new InnerObserver(this, id)));
};
SwitchObserver.prototype.onError = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(e);
}
};
SwitchObserver.prototype.onCompleted = function () {
if (!this.isStopped) {
this.isStopped = true;
this.stopped = true;
!this.hasLatest && this.o.onCompleted();
}
};
SwitchObserver.prototype.dispose = function () { this.isStopped = true; };
SwitchObserver.prototype.fail = function (e) {
if(!this.isStopped) {
this.isStopped = true;
this.o.onError(e);
return true;
}
return false;
};
function InnerObserver(parent, id) {
this.parent = parent;
this.id = id;
this.isStopped = false;
}
InnerObserver.prototype.onNext = function (x) {
if (this.isStopped) { return; }
this.parent.latest === this.id && this.parent.o.onNext(x);
};
InnerObserver.prototype.onError = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.parent.latest === this.id && this.parent.o.onError(e);
}
};
InnerObserver.prototype.onCompleted = function () {
if (!this.isStopped) {
this.isStopped = true;
if (this.parent.latest === this.id) {
this.parent.hasLatest = false;
this.parent.isStopped && this.parent.o.onCompleted();
}
}
};
InnerObserver.prototype.dispose = function () { this.isStopped = true; }
InnerObserver.prototype.fail = function (e) {
if(!this.isStopped) {
this.isStopped = true;
this.parent.o.onError(e);
return true;
}
return false;
};
return SwitchObservable;
}(ObservableBase));
/**
* Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
* @returns {Observable} The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received.
*/
* Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.
* @returns {Observable} The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received.
*/
observableProto['switch'] = observableProto.switchLatest = function () {
var sources = this;
return new AnonymousObservable(function (observer) {
var hasLatest = false,
innerSubscription = new SerialDisposable(),
isStopped = false,
latest = 0,
subscription = sources.subscribe(
function (innerSource) {
var d = new SingleAssignmentDisposable(), id = ++latest;
hasLatest = true;
innerSubscription.setDisposable(d);
// Check if Promise or Observable
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
d.setDisposable(innerSource.subscribe(
function (x) { latest === id && observer.onNext(x); },
function (e) { latest === id && observer.onError(e); },
function () {
if (latest === id) {
hasLatest = false;
isStopped && observer.onCompleted();
}
}));
},
function (e) { observer.onError(e); },
function () {
isStopped = true;
!hasLatest && observer.onCompleted();
});
return new CompositeDisposable(subscription, innerSubscription);
}, sources);
return new SwitchObservable(this);
};
var TakeUntilObservable = (function(__super__) {
inherits(TakeUntilObservable, __super__);
function TakeUntilObservable(source, other) {
this.source = source;
this.other = isPromise(other) ? observableFromPromise(other) : other;
__super__.call(this);
}
TakeUntilObservable.prototype.subscribeCore = function(o) {
return new CompositeDisposable(
this.source.subscribe(o),
this.other.subscribe(new InnerObserver(o))
);
};
function InnerObserver(o) {
this.o = o;
this.isStopped = false;
}
InnerObserver.prototype.onNext = function (x) {
if (this.isStopped) { return; }
this.o.onCompleted();
};
InnerObserver.prototype.onError = function (err) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(err);
}
};
InnerObserver.prototype.onCompleted = function () {
!this.isStopped && (this.isStopped = true);
};
InnerObserver.prototype.dispose = function() { this.isStopped = true; };
InnerObserver.prototype.fail = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(e);
return true;
}
return false;
};
return TakeUntilObservable;
}(ObservableBase));
/**
* Returns the values from the source observable sequence until the other observable sequence produces a value.
* @param {Observable | Promise} other Observable sequence or Promise that terminates propagation of elements of the source sequence.
* @returns {Observable} An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
*/
observableProto.takeUntil = function (other) {
var source = this;
return new AnonymousObservable(function (o) {
isPromise(other) && (other = observableFromPromise(other));
return new CompositeDisposable(
source.subscribe(o),
other.subscribe(function () { o.onCompleted(); }, function (e) { o.onError(e); }, noop)
);
}, source);
return new TakeUntilObservable(this, other);
};
function falseFactory() { return false; }
/**
* Merges the specified observable sequences into one observable sequence by using the selector function only when the (first) source observable sequence produces an element.
*
* @example
* 1 - obs = obs1.withLatestFrom(obs2, obs3, function (o1, o2, o3) { return o1 + o2 + o3; });
* 2 - obs = obs1.withLatestFrom([obs2, obs3], function (o1, o2, o3) { return o1 + o2 + o3; });
* @returns {Observable} An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
observableProto.withLatestFrom = function () {
var len = arguments.length, args = new Array(len)
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
var resultSelector = args.pop(), source = this;
if (typeof source === 'undefined') {
throw new Error('Source observable not found for withLatestFrom().');
}
if (typeof resultSelector !== 'function') {
throw new Error('withLatestFrom() expects a resultSelector function.');
}
if (Array.isArray(args[0])) {
args = args[0];
}
Array.isArray(args[0]) && (args = args[0]);
return new AnonymousObservable(function (observer) {
var falseFactory = function () { return false; },
n = args.length,
var n = args.length,
hasValue = arrayInitialize(n, falseFactory),
hasValueAll = false,
values = new Array(n);
......@@ -3800,24 +3958,19 @@
values[i] = x;
hasValue[i] = true;
hasValueAll = hasValue.every(identity);
}, observer.onError.bind(observer), function () {}));
}, function (e) { observer.onError(e); }, noop));
subscriptions[i] = sad;
}(idx));
}
var sad = new SingleAssignmentDisposable();
sad.setDisposable(source.subscribe(function (x) {
var res;
var allValues = [x].concat(values);
if (!hasValueAll) return;
try {
res = resultSelector.apply(null, allValues);
} catch (ex) {
observer.onError(ex);
return;
}
if (!hasValueAll) { return; }
var res = tryCatch(resultSelector).apply(null, allValues);
if (res === errorObj) { return observer.onError(res.e); }
observer.onNext(res);
}, observer.onError.bind(observer), function () {
}, function (e) { observer.onError(e); }, function () {
observer.onCompleted();
}));
subscriptions[n] = sad;
......@@ -3828,21 +3981,17 @@
function zipArray(second, resultSelector) {
var first = this;
return new AnonymousObservable(function (observer) {
return new AnonymousObservable(function (o) {
var index = 0, len = second.length;
return first.subscribe(function (left) {
if (index < len) {
var right = second[index++], result;
try {
result = resultSelector(left, right);
} catch (e) {
return observer.onError(e);
}
observer.onNext(result);
var right = second[index++], res = tryCatch(resultSelector)(left, right);
if (res === errorObj) { return o.onError(res.e); }
o.onNext(res);
} else {
observer.onCompleted();
o.onCompleted();
}
}, function (e) { observer.onError(e); }, function () { observer.onCompleted(); });
}, function (e) { o.onError(e); }, function () { o.onCompleted(); });
}, first);
}
......@@ -3852,10 +4001,6 @@
/**
* Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences or an array have produced an element at a corresponding index.
* The last element in the arguments must be a function to invoke for each series of elements at corresponding indexes in the args.
*
* @example
* 1 - res = obs1.zip(obs2, fn);
* 1 - res = x1.zip([1,2,3], fn);
* @returns {Observable} An observable sequence containing the result of combining elements of the args using the specified result selector function.
*/
observableProto.zip = function () {
......@@ -3865,34 +4010,11 @@
var parent = this, resultSelector = args.pop();
args.unshift(parent);
return new AnonymousObservable(function (observer) {
return new AnonymousObservable(function (o) {
var n = args.length,
queues = arrayInitialize(n, emptyArrayFactory),
isDone = arrayInitialize(n, falseFactory);
function next(i) {
var res, queuedValues;
if (queues.every(function (x) { return x.length > 0; })) {
try {
queuedValues = queues.map(function (x) { return x.shift(); });
res = resultSelector.apply(parent, queuedValues);
} catch (ex) {
observer.onError(ex);
return;
}
observer.onNext(res);
} else if (isDone.filter(function (x, j) { return j !== i; }).every(identity)) {
observer.onCompleted();
}
};
function done(i) {
isDone[i] = true;
if (isDone.every(function (x) { return x; })) {
observer.onCompleted();
}
}
var subscriptions = new Array(n);
for (var idx = 0; idx < n; idx++) {
(function (i) {
......@@ -3900,9 +4022,17 @@
isPromise(source) && (source = observableFromPromise(source));
sad.setDisposable(source.subscribe(function (x) {
queues[i].push(x);
next(i);
}, function (e) { observer.onError(e); }, function () {
done(i);
if (queues.every(function (x) { return x.length > 0; })) {
var queuedValues = queues.map(function (x) { return x.shift(); }),
res = tryCatch(resultSelector).apply(parent, queuedValues);
if (res === errorObj) { return o.onError(res.e); }
o.onNext(res);
} else if (isDone.filter(function (x, j) { return j !== i; }).every(identity)) {
o.onCompleted();
}
}, function (e) { o.onError(e); }, function () {
isDone[i] = true;
isDone.every(identity) && o.onCompleted();
}));
subscriptions[i] = sad;
})(idx);
......@@ -3925,6 +4055,9 @@
return first.zip.apply(first, args);
};
function falseFactory() { return false; }
function arrayFactory() { return []; }
/**
* Merges the specified observable sequences into one observable sequence by emitting a list with the elements of the observable sequences at corresponding indexes.
* @param arguments Observable sources.
......@@ -3939,28 +4072,10 @@
sources = new Array(len);
for(var i = 0; i < len; i++) { sources[i] = arguments[i]; }
}
return new AnonymousObservable(function (observer) {
return new AnonymousObservable(function (o) {
var n = sources.length,
queues = arrayInitialize(n, function () { return []; }),
isDone = arrayInitialize(n, function () { return false; });
function next(i) {
if (queues.every(function (x) { return x.length > 0; })) {
var res = queues.map(function (x) { return x.shift(); });
observer.onNext(res);
} else if (isDone.filter(function (x, j) { return j !== i; }).every(identity)) {
observer.onCompleted();
return;
}
};
function done(i) {
isDone[i] = true;
if (isDone.every(identity)) {
observer.onCompleted();
return;
}
}
queues = arrayInitialize(n, arrayFactory),
isDone = arrayInitialize(n, falseFactory);
var subscriptions = new Array(n);
for (var idx = 0; idx < n; idx++) {
......@@ -3968,9 +4083,15 @@
subscriptions[i] = new SingleAssignmentDisposable();
subscriptions[i].setDisposable(sources[i].subscribe(function (x) {
queues[i].push(x);
next(i);
}, function (e) { observer.onError(e); }, function () {
done(i);
if (queues.every(function (x) { return x.length > 0; })) {
var res = queues.map(function (x) { return x.shift(); });
o.onNext(res);
} else if (isDone.filter(function (x, j) { return j !== i; }).every(identity)) {
return o.onCompleted();
}
}, function (e) { o.onError(e); }, function () {
isDone[i] = true;
isDone.every(identity) && o.onCompleted();
}));
})(idx);
}
......@@ -3985,7 +4106,7 @@
*/
observableProto.asObservable = function () {
var source = this;
return new AnonymousObservable(function (o) { return source.subscribe(o); }, this);
return new AnonymousObservable(function (o) { return source.subscribe(o); }, source);
};
/**
......@@ -4039,20 +4160,12 @@
return source.subscribe(function (value) {
var key = value;
if (keySelector) {
try {
key = keySelector(value);
} catch (e) {
o.onError(e);
return;
}
key = tryCatch(keySelector)(value);
if (key === errorObj) { return o.onError(key.e); }
}
if (hasCurrentKey) {
try {
var comparerEquals = comparer(currentKey, key);
} catch (e) {
o.onError(e);
return;
}
var comparerEquals = tryCatch(comparer)(currentKey, key);
if (comparerEquals === errorObj) { return o.onError(comparerEquals.e); }
}
if (!hasCurrentKey || !comparerEquals) {
hasCurrentKey = true;
......@@ -4063,75 +4176,101 @@
}, this);
};
/**
* Invokes an action for each element in the observable sequence and invokes an action upon graceful or exceptional termination of the observable sequence.
* This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
* @param {Function | Observer} observerOrOnNext Action to invoke for each element in the observable sequence or an observer.
* @param {Function} [onError] Action to invoke upon exceptional termination of the observable sequence. Used if only the observerOrOnNext parameter is also a function.
* @param {Function} [onCompleted] Action to invoke upon graceful termination of the observable sequence. Used if only the observerOrOnNext parameter is also a function.
* @returns {Observable} The source sequence with the side-effecting behavior applied.
*/
observableProto['do'] = observableProto.tap = observableProto.doAction = function (observerOrOnNext, onError, onCompleted) {
var source = this;
return new AnonymousObservable(function (observer) {
var tapObserver = !observerOrOnNext || isFunction(observerOrOnNext) ?
var TapObservable = (function(__super__) {
inherits(TapObservable,__super__);
function TapObservable(source, observerOrOnNext, onError, onCompleted) {
this.source = source;
this.t = !observerOrOnNext || isFunction(observerOrOnNext) ?
observerCreate(observerOrOnNext || noop, onError || noop, onCompleted || noop) :
observerOrOnNext;
__super__.call(this);
}
return source.subscribe(function (x) {
try {
tapObserver.onNext(x);
} catch (e) {
observer.onError(e);
}
observer.onNext(x);
}, function (err) {
try {
tapObserver.onError(err);
} catch (e) {
observer.onError(e);
}
observer.onError(err);
}, function () {
try {
tapObserver.onCompleted();
} catch (e) {
observer.onError(e);
}
observer.onCompleted();
});
}, this);
TapObservable.prototype.subscribeCore = function(o) {
return this.source.subscribe(new InnerObserver(o, this.t));
};
function InnerObserver(o, t) {
this.o = o;
this.t = t;
this.isStopped = false;
}
InnerObserver.prototype.onNext = function(x) {
if (this.isStopped) { return; }
var res = tryCatch(this.t.onNext).call(this.t, x);
if (res === errorObj) { this.o.onError(res.e); }
this.o.onNext(x);
};
InnerObserver.prototype.onError = function(err) {
if (!this.isStopped) {
this.isStopped = true;
var res = tryCatch(this.t.onError).call(this.t, err);
if (res === errorObj) { return this.o.onError(res.e); }
this.o.onError(err);
}
};
InnerObserver.prototype.onCompleted = function() {
if (!this.isStopped) {
this.isStopped = true;
var res = tryCatch(this.t.onCompleted).call(this.t);
if (res === errorObj) { return this.o.onError(res.e); }
this.o.onCompleted();
}
};
InnerObserver.prototype.dispose = function() { this.isStopped = true; };
InnerObserver.prototype.fail = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(e);
return true;
}
return false;
};
return TapObservable;
}(ObservableBase));
/**
* Invokes an action for each element in the observable sequence and invokes an action upon graceful or exceptional termination of the observable sequence.
* This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
* @param {Function | Observer} observerOrOnNext Action to invoke for each element in the observable sequence or an o.
* @param {Function} [onError] Action to invoke upon exceptional termination of the observable sequence. Used if only the observerOrOnNext parameter is also a function.
* @param {Function} [onCompleted] Action to invoke upon graceful termination of the observable sequence. Used if only the observerOrOnNext parameter is also a function.
* @returns {Observable} The source sequence with the side-effecting behavior applied.
*/
observableProto['do'] = observableProto.tap = observableProto.doAction = function (observerOrOnNext, onError, onCompleted) {
return new TapObservable(this, observerOrOnNext, onError, onCompleted);
};
/**
* Invokes an action for each element in the observable sequence.
* This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
* @param {Function} onNext Action to invoke for each element in the observable sequence.
* @param {Any} [thisArg] Object to use as this when executing callback.
* @returns {Observable} The source sequence with the side-effecting behavior applied.
*/
* Invokes an action for each element in the observable sequence.
* This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
* @param {Function} onNext Action to invoke for each element in the observable sequence.
* @param {Any} [thisArg] Object to use as this when executing callback.
* @returns {Observable} The source sequence with the side-effecting behavior applied.
*/
observableProto.doOnNext = observableProto.tapOnNext = function (onNext, thisArg) {
return this.tap(typeof thisArg !== 'undefined' ? function (x) { onNext.call(thisArg, x); } : onNext);
};
/**
* Invokes an action upon exceptional termination of the observable sequence.
* This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
* @param {Function} onError Action to invoke upon exceptional termination of the observable sequence.
* @param {Any} [thisArg] Object to use as this when executing callback.
* @returns {Observable} The source sequence with the side-effecting behavior applied.
*/
* Invokes an action upon exceptional termination of the observable sequence.
* This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
* @param {Function} onError Action to invoke upon exceptional termination of the observable sequence.
* @param {Any} [thisArg] Object to use as this when executing callback.
* @returns {Observable} The source sequence with the side-effecting behavior applied.
*/
observableProto.doOnError = observableProto.tapOnError = function (onError, thisArg) {
return this.tap(noop, typeof thisArg !== 'undefined' ? function (e) { onError.call(thisArg, e); } : onError);
};
/**
* Invokes an action upon graceful termination of the observable sequence.
* This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
* @param {Function} onCompleted Action to invoke upon graceful termination of the observable sequence.
* @param {Any} [thisArg] Object to use as this when executing callback.
* @returns {Observable} The source sequence with the side-effecting behavior applied.
*/
* Invokes an action upon graceful termination of the observable sequence.
* This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
* @param {Function} onCompleted Action to invoke upon graceful termination of the observable sequence.
* @param {Any} [thisArg] Object to use as this when executing callback.
* @returns {Observable} The source sequence with the side-effecting behavior applied.
*/
observableProto.doOnCompleted = observableProto.tapOnCompleted = function (onCompleted, thisArg) {
return this.tap(noop, null, typeof thisArg !== 'undefined' ? function () { onCompleted.call(thisArg); } : onCompleted);
};
......@@ -4171,15 +4310,55 @@
return this.ensure(action);
};
var IgnoreElementsObservable = (function(__super__) {
inherits(IgnoreElementsObservable, __super__);
function IgnoreElementsObservable(source) {
this.source = source;
__super__.call(this);
}
IgnoreElementsObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new InnerObserver(o));
};
function InnerObserver(o) {
this.o = o;
this.isStopped = false;
}
InnerObserver.prototype.onNext = noop;
InnerObserver.prototype.onError = function (err) {
if(!this.isStopped) {
this.isStopped = true;
this.o.onError(err);
}
};
InnerObserver.prototype.onCompleted = function () {
if(!this.isStopped) {
this.isStopped = true;
this.o.onCompleted();
}
};
InnerObserver.prototype.dispose = function() { this.isStopped = true; };
InnerObserver.prototype.fail = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.observer.onError(e);
return true;
}
return false;
};
return IgnoreElementsObservable;
}(ObservableBase));
/**
* Ignores all elements in an observable sequence leaving only the termination messages.
* @returns {Observable} An empty observable sequence that signals termination, successful or exceptional, of the source sequence.
*/
observableProto.ignoreElements = function () {
var source = this;
return new AnonymousObservable(function (o) {
return source.subscribe(noop, function (e) { o.onError(e); }, function () { o.onCompleted(); });
}, source);
return new IgnoreElementsObservable(this);
};
/**
......@@ -4237,16 +4416,78 @@
observableProto.retryWhen = function (notifier) {
return enumerableRepeat(this).catchErrorWhen(notifier);
};
var ScanObservable = (function(__super__) {
inherits(ScanObservable, __super__);
function ScanObservable(source, accumulator, hasSeed, seed) {
this.source = source;
this.accumulator = accumulator;
this.hasSeed = hasSeed;
this.seed = seed;
__super__.call(this);
}
ScanObservable.prototype.subscribeCore = function(observer) {
return this.source.subscribe(new ScanObserver(observer,this));
};
return ScanObservable;
}(ObservableBase));
function ScanObserver(observer, parent) {
this.observer = observer;
this.accumulator = parent.accumulator;
this.hasSeed = parent.hasSeed;
this.seed = parent.seed;
this.hasAccumulation = false;
this.accumulation = null;
this.hasValue = false;
this.isStopped = false;
}
ScanObserver.prototype.onNext = function (x) {
if (this.isStopped) { return; }
!this.hasValue && (this.hasValue = true);
try {
if (this.hasAccumulation) {
this.accumulation = this.accumulator(this.accumulation, x);
} else {
this.accumulation = this.hasSeed ? this.accumulator(this.seed, x) : x;
this.hasAccumulation = true;
}
} catch (e) {
return this.observer.onError(e);
}
this.observer.onNext(this.accumulation);
};
ScanObserver.prototype.onError = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.observer.onError(e);
}
};
ScanObserver.prototype.onCompleted = function () {
if (!this.isStopped) {
this.isStopped = true;
!this.hasValue && this.hasSeed && this.observer.onNext(this.seed);
this.observer.onCompleted();
}
};
ScanObserver.prototype.dispose = function() { this.isStopped = true; };
ScanObserver.prototype.fail = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.observer.onError(e);
return true;
}
return false;
};
/**
* Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value.
* For aggregation behavior with no intermediate results, see Observable.aggregate.
* @example
* var res = source.scan(function (acc, x) { return acc + x; });
* var res = source.scan(0, function (acc, x) { return acc + x; });
* @param {Mixed} [seed] The initial accumulator value.
* @param {Function} accumulator An accumulator function to be invoked on each element.
* @returns {Observable} An observable sequence containing the accumulated values.
*/
* Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value.
* For aggregation behavior with no intermediate results, see Observable.aggregate.
* @param {Mixed} [seed] The initial accumulator value.
* @param {Function} accumulator An accumulator function to be invoked on each element.
* @returns {Observable} An observable sequence containing the accumulated values.
*/
observableProto.scan = function () {
var hasSeed = false, seed, accumulator, source = this;
if (arguments.length === 2) {
......@@ -4256,32 +4497,7 @@
} else {
accumulator = arguments[0];
}
return new AnonymousObservable(function (o) {
var hasAccumulation, accumulation, hasValue;
return source.subscribe (
function (x) {
!hasValue && (hasValue = true);
try {
if (hasAccumulation) {
accumulation = accumulator(accumulation, x);
} else {
accumulation = hasSeed ? accumulator(seed, x) : x;
hasAccumulation = true;
}
} catch (e) {
o.onError(e);
return;
}
o.onNext(accumulation);
},
function (e) { o.onError(e); },
function () {
!hasValue && hasSeed && o.onNext(seed);
o.onCompleted();
}
);
}, source);
return new ScanObservable(this, accumulator, hasSeed, seed);
};
/**
......@@ -4724,53 +4940,56 @@
this.selector = bindCallback(selector, thisArg, 3);
__super__.call(this);
}
function innerMap(selector, self) {
return function (x, i, o) { return selector.call(this, self.selector(x, i, o), i, o); }
}
MapObservable.prototype.internalMap = function (selector, thisArg) {
var self = this;
return new MapObservable(this.source, function (x, i, o) { return selector.call(this, self.selector(x, i, o), i, o); }, thisArg)
return new MapObservable(this.source, innerMap(selector, this), thisArg);
};
MapObservable.prototype.subscribeCore = function (observer) {
return this.source.subscribe(new MapObserver(observer, this.selector, this));
MapObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new InnerObserver(o, this.selector, this));
};
function InnerObserver(o, selector, source) {
this.o = o;
this.selector = selector;
this.source = source;
this.i = 0;
this.isStopped = false;
}
InnerObserver.prototype.onNext = function(x) {
if (this.isStopped) { return; }
var result = tryCatch(this.selector)(x, this.i++, this.source);
if (result === errorObj) {
return this.o.onError(result.e);
}
this.o.onNext(result);
};
InnerObserver.prototype.onError = function (e) {
if(!this.isStopped) { this.isStopped = true; this.o.onError(e); }
};
InnerObserver.prototype.onCompleted = function () {
if(!this.isStopped) { this.isStopped = true; this.o.onCompleted(); }
};
InnerObserver.prototype.dispose = function() { this.isStopped = true; };
InnerObserver.prototype.fail = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(e);
return true;
}
return false;
};
return MapObservable;
}(ObservableBase));
function MapObserver(observer, selector, source) {
this.observer = observer;
this.selector = selector;
this.source = source;
this.i = 0;
this.isStopped = false;
}
MapObserver.prototype.onNext = function(x) {
if (this.isStopped) { return; }
var result = tryCatch(this.selector).call(this, x, this.i++, this.source);
if (result === errorObj) {
return this.observer.onError(result.e);
}
this.observer.onNext(result);
};
MapObserver.prototype.onError = function (e) {
if(!this.isStopped) { this.isStopped = true; this.observer.onError(e); }
};
MapObserver.prototype.onCompleted = function () {
if(!this.isStopped) { this.isStopped = true; this.observer.onCompleted(); }
};
MapObserver.prototype.dispose = function() { this.isStopped = true; };
MapObserver.prototype.fail = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.observer.onError(e);
return true;
}
return false;
};
/**
* Projects each element of an observable sequence into a new form by incorporating the element's index.
* @param {Function} selector A transform function to apply to each source element; the second parameter of the function represents the index of the source element.
......@@ -4917,6 +5136,51 @@
return this.select(selector, thisArg).switchLatest();
};
var SkipObservable = (function(__super__) {
inherits(SkipObservable, __super__);
function SkipObservable(source, count) {
this.source = source;
this.skipCount = count;
__super__.call(this);
}
SkipObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new InnerObserver(o, this.skipCount));
};
function InnerObserver(o, c) {
this.c = c;
this.r = c;
this.o = o;
this.isStopped = false;
}
InnerObserver.prototype.onNext = function (x) {
if (this.isStopped) { return; }
if (this.r <= 0) {
this.o.onNext(x);
} else {
this.r--;
}
};
InnerObserver.prototype.onError = function(e) {
if (!this.isStopped) { this.isStopped = true; this.o.onError(e); }
};
InnerObserver.prototype.onCompleted = function() {
if (!this.isStopped) { this.isStopped = true; this.o.onCompleted(); }
};
InnerObserver.prototype.dispose = function() { this.isStopped = true; };
InnerObserver.prototype.fail = function(e) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(e);
return true;
}
return false;
};
return SkipObservable;
}(ObservableBase));
/**
* Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
* @param {Number} count The number of elements to skip before returning the remaining elements.
......@@ -4924,19 +5188,8 @@
*/
observableProto.skip = function (count) {
if (count < 0) { throw new ArgumentOutOfRangeError(); }
var source = this;
return new AnonymousObservable(function (o) {
var remaining = count;
return source.subscribe(function (x) {
if (remaining <= 0) {
o.onNext(x);
} else {
remaining--;
}
}, function (e) { o.onError(e); }, function () { o.onCompleted(); });
}, source);
return new SkipObservable(this, count);
};
/**
* Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.
* The element's index is used in the logic of the predicate function.
......@@ -4984,7 +5237,7 @@
return source.subscribe(function (x) {
if (remaining-- > 0) {
o.onNext(x);
remaining === 0 && o.onCompleted();
remaining <= 0 && o.onCompleted();
}
}, function (e) { o.onError(e); }, function () { o.onCompleted(); });
}, source);
......@@ -5029,51 +5282,54 @@
__super__.call(this);
}
FilterObservable.prototype.subscribeCore = function (observer) {
return this.source.subscribe(new FilterObserver(observer, this.predicate, this));
FilterObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new InnerObserver(o, this.predicate, this));
};
function innerPredicate(predicate, self) {
return function(x, i, o) { return self.predicate(x, i, o) && predicate.call(this, x, i, o); }
}
FilterObservable.prototype.internalFilter = function(predicate, thisArg) {
var self = this;
return new FilterObservable(this.source, function(x, i, o) { return self.predicate(x, i, o) && predicate.call(this, x, i, o); }, thisArg);
return new FilterObservable(this.source, innerPredicate(predicate, this), thisArg);
};
function InnerObserver(o, predicate, source) {
this.o = o;
this.predicate = predicate;
this.source = source;
this.i = 0;
this.isStopped = false;
}
InnerObserver.prototype.onNext = function(x) {
if (this.isStopped) { return; }
var shouldYield = tryCatch(this.predicate)(x, this.i++, this.source);
if (shouldYield === errorObj) {
return this.o.onError(shouldYield.e);
}
shouldYield && this.o.onNext(x);
};
InnerObserver.prototype.onError = function (e) {
if(!this.isStopped) { this.isStopped = true; this.o.onError(e); }
};
InnerObserver.prototype.onCompleted = function () {
if(!this.isStopped) { this.isStopped = true; this.o.onCompleted(); }
};
InnerObserver.prototype.dispose = function() { this.isStopped = true; };
InnerObserver.prototype.fail = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(e);
return true;
}
return false;
};
return FilterObservable;
}(ObservableBase));
function FilterObserver(observer, predicate, source) {
this.observer = observer;
this.predicate = predicate;
this.source = source;
this.i = 0;
this.isStopped = false;
}
FilterObserver.prototype.onNext = function(x) {
if (this.isStopped) { return; }
var shouldYield = tryCatch(this.predicate).call(this, x, this.i++, this.source);
if (shouldYield === errorObj) {
return this.observer.onError(shouldYield.e);
}
shouldYield && this.observer.onNext(x);
};
FilterObserver.prototype.onError = function (e) {
if(!this.isStopped) { this.isStopped = true; this.observer.onError(e); }
};
FilterObserver.prototype.onCompleted = function () {
if(!this.isStopped) { this.isStopped = true; this.observer.onCompleted(); }
};
FilterObserver.prototype.dispose = function() { this.isStopped = true; };
FilterObserver.prototype.fail = function (e) {
if (!this.isStopped) {
this.isStopped = true;
this.observer.onError(e);
return true;
}
return false;
};
/**
* Filters the elements of an observable sequence based on a predicate by incorporating the element's index.
* @param {Function} predicate A function to test each source element for a condition; the second parameter of the function represents the index of the source element.
......@@ -5169,44 +5425,80 @@
}, source);
};
var ReduceObservable = (function(__super__) {
inherits(ReduceObservable, __super__);
function ReduceObservable(source, acc, hasSeed, seed) {
this.source = source;
this.acc = acc;
this.hasSeed = hasSeed;
this.seed = seed;
__super__.call(this);
}
ReduceObservable.prototype.subscribeCore = function(observer) {
return this.source.subscribe(new InnerObserver(observer,this));
};
function InnerObserver(o, parent) {
this.o = o;
this.acc = parent.acc;
this.hasSeed = parent.hasSeed;
this.seed = parent.seed;
this.hasAccumulation = false;
this.result = null;
this.hasValue = false;
this.isStopped = false;
}
InnerObserver.prototype.onNext = function (x) {
if (this.isStopped) { return; }
!this.hasValue && (this.hasValue = true);
if (this.hasAccumulation) {
this.result = tryCatch(this.acc)(this.result, x);
} else {
this.result = this.hasSeed ? tryCatch(this.acc)(this.seed, x) : x;
this.hasAccumulation = true;
}
if (this.result === errorObj) { this.o.onError(this.result.e); }
};
InnerObserver.prototype.onError = function (e) {
if (!this.isStopped) { this.isStopped = true; this.o.onError(e); }
};
InnerObserver.prototype.onCompleted = function () {
if (!this.isStopped) {
this.isStopped = true;
this.hasValue && this.o.onNext(this.result);
!this.hasValue && this.hasSeed && this.o.onNext(this.seed);
!this.hasValue && !this.hasSeed && this.o.onError(new EmptyError());
this.o.onCompleted();
}
};
InnerObserver.prototype.dispose = function () { this.isStopped = true; };
InnerObserver.prototype.fail = function(e) {
if (!this.isStopped) {
this.isStopped = true;
this.o.onError(e);
return true;
}
return false;
};
return ReduceObservable;
}(ObservableBase));
/**
* Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
* For aggregation behavior with incremental intermediate results, see Observable.scan.
* @param {Function} accumulator An accumulator function to be invoked on each element.
* @param {Any} [seed] The initial accumulator value.
* @returns {Observable} An observable sequence containing a single element with the final accumulator value.
*/
* Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
* For aggregation behavior with incremental intermediate results, see Observable.scan.
* @param {Function} accumulator An accumulator function to be invoked on each element.
* @param {Any} [seed] The initial accumulator value.
* @returns {Observable} An observable sequence containing a single element with the final accumulator value.
*/
observableProto.reduce = function (accumulator) {
var hasSeed = false, seed, source = this;
var hasSeed = false;
if (arguments.length === 2) {
hasSeed = true;
seed = arguments[1];
var seed = arguments[1];
}
return new AnonymousObservable(function (o) {
var hasAccumulation, accumulation, hasValue;
return source.subscribe (
function (x) {
!hasValue && (hasValue = true);
try {
if (hasAccumulation) {
accumulation = accumulator(accumulation, x);
} else {
accumulation = hasSeed ? accumulator(seed, x) : x;
hasAccumulation = true;
}
} catch (e) {
return o.onError(e);
}
},
function (e) { o.onError(e); },
function () {
hasValue && o.onNext(accumulation);
!hasValue && hasSeed && o.onNext(seed);
!hasValue && !hasSeed && o.onError(new EmptyError());
o.onCompleted();
}
);
}, source);
return new ReduceObservable(this, accumulator, hasSeed, seed);
};
/**
......@@ -6153,8 +6445,9 @@
function createEventListener (el, eventName, handler) {
var disposables = new CompositeDisposable();
// Asume NodeList
if (Object.prototype.toString.call(el) === '[object NodeList]') {
// Asume NodeList or HTMLCollection
var toStr = Object.prototype.toString;
if (toStr.call(el) === '[object NodeList]' || toStr.call(el) === '[object HTMLCollection]') {
for (var i = 0, len = el.length; i < len; i++) {
disposables.add(createEventListener(el.item(i), eventName, handler));
}
......@@ -6333,25 +6626,14 @@
function next(x, i) {
values[i] = x
var res;
hasValue[i] = true;
if (hasValueAll || (hasValueAll = hasValue.every(identity))) {
if (err) {
o.onError(err);
return;
}
try {
res = resultSelector.apply(null, values);
} catch (ex) {
o.onError(ex);
return;
}
if (err) { return o.onError(err); }
var res = tryCatch(resultSelector).apply(null, values);
if (res === errorObj) { return o.onError(res.e); }
o.onNext(res);
}
if (isDone && values[1]) {
o.onCompleted();
}
isDone && values[1] && o.onCompleted();
}
return new CompositeDisposable(
......@@ -6390,6 +6672,8 @@
function subscribe(o) {
var q = [], previousShouldFire;
function drainQueue() { while (q.length > 0) { o.onNext(q.shift()); } }
var subscription =
combineLatestSource(
this.source,
......@@ -6402,11 +6686,7 @@
if (previousShouldFire !== undefined && results.shouldFire != previousShouldFire) {
previousShouldFire = results.shouldFire;
// change in shouldFire
if (results.shouldFire) {
while (q.length > 0) {
o.onNext(q.shift());
}
}
if (results.shouldFire) { drainQueue(); }
} else {
previousShouldFire = results.shouldFire;
// new data
......@@ -6418,17 +6698,11 @@
}
},
function (err) {
// Empty buffer before sending error
while (q.length > 0) {
o.onNext(q.shift());
}
drainQueue();
o.onError(err);
},
function () {
// Empty buffer before sending completion
while (q.length > 0) {
o.onNext(q.shift());
}
drainQueue();
o.onCompleted();
}
);
......@@ -6599,7 +6873,7 @@
* source.request(3); // Reads 3 values
* @param {bool} enableQueue truthy value to determine if values should be queued pending the next request
* @param {Scheduler} scheduler determines how the requests will be scheduled
* @returns {Observable} The observable sequence which is paused based upon the pauser.
* @returns {Observable} The observable sequence which only propagates values on request.
*/
observableProto.controlled = function (enableQueue, scheduler) {
......@@ -7237,6 +7511,27 @@
return ConnectableObservable;
}(Observable));
/**
* Returns an observable sequence that shares a single subscription to the underlying sequence. This observable sequence
* can be resubscribed to, even if all prior subscriptions have ended. (unlike `.publish().refCount()`)
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source.
*/
observableProto.singleInstance = function() {
var source = this, hasObservable = false, observable;
function getObservable() {
if (!hasObservable) {
hasObservable = true;
observable = source.finally(function() { hasObservable = false; }).publish().refCount();
}
return observable;
};
return new AnonymousObservable(function(o) {
return getObservable().subscribe(o);
});
};
var Dictionary = (function () {
var primes = [1, 3, 7, 13, 31, 61, 127, 251, 509, 1021, 2039, 4093, 8191, 16381, 32749, 65521, 131071, 262139, 524287, 1048573, 2097143, 4194301, 8388593, 16777213, 33554393, 67108859, 134217689, 268435399, 536870909, 1073741789, 2147483647],
......@@ -7862,15 +8157,28 @@
];
};
var WhileEnumerable = (function(__super__) {
inherits(WhileEnumerable, __super__);
function WhileEnumerable(c, s) {
this.c = c;
this.s = s;
}
WhileEnumerable.prototype[$iterator$] = function () {
var self = this;
return {
next: function () {
return self.c() ?
{ done: false, value: self.s } :
{ done: true, value: void 0 };
}
};
};
return WhileEnumerable;
}(Enumerable));
function enumerableWhile(condition, source) {
return new Enumerable(function () {
return new Enumerator(function () {
return condition() ?
{ done: false, value: source } :
{ done: true, value: undefined };
});
});
}
return new WhileEnumerable(condition, source);
}
/**
* Returns an observable sequence that is the result of invoking the selector on the source sequence, without sharing subscriptions.
......@@ -8187,7 +8495,7 @@
* @param {Object} scheduler Scheduler used to execute the operation. If not specified, defaults to the ImmediateScheduler.
* @returns {Observable} An observable sequence which results from the comonadic bind operation.
*/
observableProto.manySelect = function (selector, scheduler) {
observableProto.manySelect = observableProto.extend = function (selector, scheduler) {
isScheduler(scheduler) || (scheduler = immediateScheduler);
var source = this;
return observableDefer(function () {
......@@ -8953,25 +9261,33 @@
};
function sampleObservable(source, sampler) {
return new AnonymousObservable(function (observer) {
var atEnd, value, hasValue;
return new AnonymousObservable(function (o) {
var atEnd = false, value, hasValue = false;
function sampleSubscribe() {
if (hasValue) {
hasValue = false;
observer.onNext(value);
o.onNext(value);
}
atEnd && observer.onCompleted();
atEnd && o.onCompleted();
}
return new CompositeDisposable(
source.subscribe(function (newValue) {
var sourceSubscription = new SingleAssignmentDisposable();
sourceSubscription.setDisposable(source.subscribe(
function (newValue) {
hasValue = true;
value = newValue;
}, observer.onError.bind(observer), function () {
},
function (e) { o.onError(e); },
function () {
atEnd = true;
}),
sampler.subscribe(sampleSubscribe, observer.onError.bind(observer), sampleSubscribe)
sourceSubscription.dispose();
}
));
return new CompositeDisposable(
sourceSubscription,
sampler.subscribe(sampleSubscribe, function (e) { o.onError(e); }, sampleSubscribe)
);
}, source);
}
......@@ -9075,12 +9391,9 @@
isScheduler(scheduler) || (scheduler = timeoutScheduler);
return new AnonymousObservable(function (observer) {
var first = true,
hasResult = false,
result,
state = initialState,
time;
return scheduler.scheduleRecursiveWithAbsolute(scheduler.now(), function (self) {
hasResult && observer.onNext(result);
hasResult = false;
return scheduler.scheduleRecursiveWithAbsoluteAndState(initialState, scheduler.now(), function (state, self) {
hasResult && observer.onNext(state);
try {
if (first) {
......@@ -9090,15 +9403,15 @@
}
hasResult = condition(state);
if (hasResult) {
result = resultSelector(state);
time = timeSelector(state);
var result = resultSelector(state);
var time = timeSelector(state);
}
} catch (e) {
observer.onError(e);
return;
}
if (hasResult) {
self(time);
self(result, time);
} else {
observer.onCompleted();
}
......@@ -9129,12 +9442,9 @@
isScheduler(scheduler) || (scheduler = timeoutScheduler);
return new AnonymousObservable(function (observer) {
var first = true,
hasResult = false,
result,
state = initialState,
time;
return scheduler.scheduleRecursiveWithRelative(0, function (self) {
hasResult && observer.onNext(result);
hasResult = false;
return scheduler.scheduleRecursiveWithRelativeAndState(initialState, 0, function (state, self) {
hasResult && observer.onNext(state);
try {
if (first) {
......@@ -9144,15 +9454,15 @@
}
hasResult = condition(state);
if (hasResult) {
result = resultSelector(state);
time = timeSelector(state);
var result = resultSelector(state);
var time = timeSelector(state);
}
} catch (e) {
observer.onError(e);
return;
}
if (hasResult) {
self(time);
self(result, time);
} else {
observer.onCompleted();
}
......
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment