Characteristic function for an Observable
instance, that
creates the subscription and that eventually starts the streaming
of events to the given Observer, being meant to be overridden
in custom combinators or in classes implementing Observable.
Characteristic function for an Observable
instance, that
creates the subscription and that eventually starts the streaming
of events to the given Observer, being meant to be overridden
in custom combinators or in classes implementing Observable.
This function is "unsafe" to call because it does not protect the
calls to the given Observer implementation in regards to
unexpected exceptions that violate the contract, therefore the
given instance must respect its contract and not throw any
exceptions when the observable calls onNext
, onComplete
and
onError
. If it does, then the behavior is undefined.
Concatenates the source Observable with the other Observable, as specified.
Concatenates the source Observable with the other Observable, as specified.
Ordering of subscription is preserved, so the second observable
starts only after the source observable is completed successfully with
an onComplete
. On the other hand, the second observable is never
subscribed if the source completes with an error.
val concat = Observable(1,2,3) ++ Observable(4,5) concat.dump("O").subscribe() // 0: O-->1 // 1: O-->2 // 2: O-->3 // 3: O-->4 // 4: O-->5 // 5: O completed
Creates a new Observable that emits the given element and then it also emits the events of the source (prepend operation).
Creates a new Observable that emits the given element and then it also emits the events of the source (prepend operation).
val source = 1 +: Observable(2, 3, 4) source.dump("O").subscribe() // 0: O-->1 // 1: O-->2 // 2: O-->3 // 3: O-->4 // 4: O completed
Creates a new Observable that emits the events of the source and then it also emits the given element (appended to the stream).
Creates a new Observable that emits the events of the source and then it also emits the given element (appended to the stream).
val source = Observable(1, 2, 3) :+ 4 source.dump("O").subscribe() // 0: O-->1 // 1: O-->2 // 2: O-->3 // 3: O-->4 // 4: O completed
Given the source observable and another Observable
, emits all of the items
from the first of these Observables to emit an item and cancel the other.
Returns the first generated result as a Future and then cancels the subscription.
Forces a buffered asynchronous boundary.
Forces a buffered asynchronous boundary.
Internally it wraps the observer implementation given to
onSubscribe
into a
BufferedSubscriber.
Normally Monifu's implementation guarantees that events
are not emitted concurrently, and that the publisher MUST
NOT emit the next event without acknowledgement from the
consumer that it may proceed, however for badly behaved
publishers, this wrapper provides the guarantee that the
downstream Observer given in
subscribe
will not receive concurrent events.
WARNING: if the buffer created by this operator is unbounded, it can blow up the process if the data source is pushing events faster than what the observer can consume, as it introduces an asynchronous boundary that eliminates the back-pressure requirements of the data source. Unbounded is the default overflowStrategy, see OverflowStrategy for options.
- the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details
- a function that is used for signaling a special event used to inform the consumers that an overflow event happened, function that receives the number of dropped events as a parameter (see OverflowStrategy.Evicted)
Forces a buffered asynchronous boundary.
Forces a buffered asynchronous boundary.
Internally it wraps the observer implementation given to
onSubscribe
into a
BufferedSubscriber.
Normally Monifu's implementation guarantees that events
are not emitted concurrently, and that the publisher MUST
NOT emit the next event without acknowledgement from the
consumer that it may proceed, however for badly behaved
publishers, this wrapper provides the guarantee that the
downstream Observer given in
subscribe
will not receive concurrent events.
WARNING: if the buffer created by this operator is unbounded, it can blow up the process if the data source is pushing events faster than what the observer can consume, as it introduces an asynchronous boundary that eliminates the back-pressure requirements of the data source. Unbounded is the default overflowStrategy, see OverflowStrategy for options.
- the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a BehaviorSubject.
Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time.
Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time.
The resulting Observable emits connected, non-overlapping buffers, each of
a fixed duration specified by the timespan
argument or a maximum size
specified by the maxSize
argument (whichever is reached first). When the
source Observable completes or encounters an error, the resulting
Observable emits the current buffer and propagates the notification from
the source Observable.
the interval of time at which it should emit the buffered bundle
is the maximum bundle size
Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time.
Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time.
This version of buffer
emits a new bundle of items periodically,
every timespan amount of time, containing all items emitted by the
source Observable since the previous bundle emission.
the interval of time at which it should emit the buffered bundle
Returns an Observable that emits buffers of items it collects from the source Observable.
Returns an Observable that emits buffers of items it collects from the
source Observable. The resulting Observable emits buffers every skip
items, each containing count
items. When the source Observable completes
or encounters an error, the resulting Observable emits the current buffer
and propagates the notification from the source Observable.
There are 3 possibilities:
1. in case skip == count
, then there are no items dropped and no overlap,
the call being equivalent to window(count)
2. in case skip < count
, then overlap between windows happens, with the
number of elements being repeated being count - skip
3. in case skip > count
, then skip - count
elements start getting
dropped between windows
the maximum size of each buffer before it should be emitted
how many items emitted by the source Observable should be
skipped before starting a new buffer. Note that when skip and
count are equal, this is the same operation as buffer(count)
Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time.
Periodically gather items emitted by an Observable into bundles and emit
these bundles rather than emitting the items one at a time. This version
of buffer
is emitting items once the internal buffer has the reached the
given count.
the maximum size of each buffer before it should be emitted
Buffers signals while busy, after which it emits the buffered events as a single bundle.
Buffers signals while busy, after which it emits the buffered events as a single bundle.
This operator starts applying back-pressure when the underlying buffer's size is exceeded.
Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers.
Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers. This method has similar behavior to replay except that this auto-subscribes to the source Observable rather than returning a ConnectableObservable for which you must call connect to activate the subscription.
When you call cache, it does not yet subscribe to the source Observable
and so does not yet begin caching items. This only happens when the
first Subscriber calls the resulting Observable's subscribe
method.
is the maximum buffer size after which old events start being dropped (according to what happens when using ReplaySubject.createWithSize)
an Observable that, when first subscribed to, caches all of its items and notifications for the benefit of subsequent subscribers
Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers.
Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers. This method has similar behavior to replay except that this auto-subscribes to the source Observable rather than returning a ConnectableObservable for which you must call connect to activate the subscription.
When you call cache, it does not yet subscribe to the source Observable
and so does not yet begin caching items. This only happens when the
first Subscriber calls the resulting Observable's subscribe
method.
Note: You sacrifice the ability to cancel the origin when you use the cache operator so be careful not to use this on Observables that emit an infinite or very large number of items that will use up memory.
an Observable that, when first subscribed to, caches all of its items and notifications for the benefit of subsequent subscribers
Returns an Observable by applying the given partial function to the source observable for each element for which the given partial function is defined.
Returns an Observable by applying the given partial function to the source observable for each element for which the given partial function is defined.
Useful to be used instead of a filter & map combination.
the function that filters and maps the resulting observable
an Observable that emits the transformed items by the given partial function
Creates a new Observable from this Observable and another given Observable.
Creates a new Observable from this Observable and another given Observable.
This operator behaves in a similar way to zip, but while zip
emits items
only when all of the zipped source Observables have emitted a previously unzipped item,
combine
emits an item whenever any of the source Observables emits
an item (so long as each of the source Observables has emitted at least one item).
Creates a new Observable from this Observable and another given Observable.
Creates a new Observable from this Observable and another given Observable.
This operator behaves in a similar way to zip, but while zip
emits items
only when all of the zipped source Observables have emitted a previously unzipped item,
combine
emits an item whenever any of the source Observables emits
an item (so long as each of the source Observables has emitted at least one item).
This version of combineLatest
is reserving onError
notifications until all of the combined Observables
complete and only then passing it along to the observers.
Ignores all items emitted by the source Observable and only calls onCompleted or onError.
Ignores all items emitted by the source Observable and only calls onCompleted or onError.
an empty Observable that only calls onCompleted or onError, based on which one is called by the source Observable
Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.
Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.
The difference between the concat
operation and
merge is that concat
cares about
ordering of emitted items (e.g. all items emitted by the
first observable in the sequence will come before the
elements emitted by the second observable), whereas
merge
doesn't care about that (elements get emitted as
they come). Because of back-pressure applied to
observables, Observable!.concat is safe to use in all
contexts, whereas merge
requires buffering.
an Observable that emits items that are the result of
flattening the items emitted by the Observables emitted
by this
Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.
Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.
The difference between the concat
operation and
merge is that concat
cares about
ordering of emitted items (e.g. all items emitted by the
first observable in the sequence will come before the
elements emitted by the second observable), whereas
merge
doesn't care about that (elements get emitted as
they come). Because of back-pressure applied to
observables, Observable!.concat is safe to use in all
contexts, whereas merge
requires buffering.
This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.
an Observable that emits items that are the result of
flattening the items emitted by the Observables emitted
by this
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.
a function that, when applied to an item emitted by the source Observable, returns an Observable
an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and concatenating the results of the Observables obtained from this transformation.
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.
It's like Observable!.concatMap, except that the created observable is reserving onError notifications until all of the merged Observables complete and only then passing it along to the observers.
a function that, when applied to an item emitted by the source Observable, returns an Observable
an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and concatenating the results of the Observables obtained from this transformation.
Creates a new Observable that emits the total number of onNext
events
that were emitted by the source.
Creates a new Observable that emits the total number of onNext
events
that were emitted by the source.
Note that this Observable emits only one item after the source is complete. And in case the source emits an error, then only that error will be emitted.
Only emit an item from an Observable if a particular
timespan has passed without it emitting another item,
a timespan indicated by the completion of an observable
generated the selector
function.
Only emit an item from an Observable if a particular
timespan has passed without it emitting another item,
a timespan indicated by the completion of an observable
generated the selector
function.
Note: If the source Observable keeps emitting items more frequently than the length of the time window then no items will be emitted by the resulting Observable.
function to retrieve a sequence that indicates the throttle duration for each item
is a function that receives the last element generated by the source, generating an observable to be subscribed when the source is timing out
Only emit an item from an Observable if a particular
timespan has passed without it emitting another item,
a timespan indicated by the completion of an observable
generated the selector
function.
Only emit an item from an Observable if a particular
timespan has passed without it emitting another item,
a timespan indicated by the completion of an observable
generated the selector
function.
Note: If the source Observable keeps emitting items more frequently than the length of the time window then no items will be emitted by the resulting Observable.
function to retrieve a sequence that indicates the throttle duration for each item
Doesn't emit anything until a timeout
period passes without
the source emitting anything.
Doesn't emit anything until a timeout
period passes without
the source emitting anything. When that timeout happens,
we subscribe to the observable generated by the given function,
an observable that will keep emitting until the source will
break the silence by emitting another event.
Note: If the source Observable keeps emitting items more frequently than the length of the time window, then no items will be emitted by the resulting Observable.
the length of the window of time that must pass after the emission of an item from the source Observable in which that Observable emits no items in order for the item to be emitted by the resulting Observable
is a function that receives the last element generated by the source, generating an observable to be subscribed when the source is timing out
Only emit an item from an Observable if a particular timespan has passed without it emitting another item.
Only emit an item from an Observable if a particular timespan has passed without it emitting another item.
Note: If the source Observable keeps emitting items more frequently than the length of the time window then no items will be emitted by the resulting Observable.
the length of the window of time that must pass after the emission of an item from the source Observable in which that Observable emits no items in order for the item to be emitted by the resulting Observable
echoOnce for a similar operator that also mirrors the source observable
Emits the last item from the source Observable if a particular timespan has passed without it emitting another item, and keeps emitting that item at regular intervals until the source breaks the silence.
Emits the last item from the source Observable if a particular timespan has passed without it emitting another item, and keeps emitting that item at regular intervals until the source breaks the silence.
So compared to regular debounce it keeps emitting the last item of the source.
Note: If the source Observable keeps emitting items more frequently than the length of the time window then no items will be emitted by the resulting Observable.
the length of the window of time that must pass after the emission of an item from the source Observable in which that Observable emits no items in order for the item to be emitted by the resulting Observable at regular intervals, also determined by period
echoRepeated for a similar operator that also mirrors the source observable
Emit items from the source Observable, or emit a default item if the source Observable completes after emitting no items.
Returns an Observable that emits the items emitted by the source Observable shifted forward in time.
Returns an Observable that emits the items emitted by the source Observable shifted forward in time.
This variant of delay
sets its delay duration on a per-item basis by
passing each item from the source Observable into a function that returns
an Observable and then monitoring those Observables. When any such
Observable emits an item or completes, the Observable returned
by delay emits the associated item.
- a function that returns an Observable for each item
emitted by the source Observable, which is then used
to delay the emission of that item by the resulting
Observable until the Observable returned
from selector
emits an item
the source Observable shifted in time by the specified delay
delay(duration) for the other variant
Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a specified delay.
Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a specified delay.
Each time the source Observable emits an item, delay starts a timer, and when that timer reaches the given duration, the Observable returned from delay emits the same item.
NOTE: this delay refers strictly to the time between the onNext
event coming from our source and the time it takes the downstream
observer to get this event. On the other hand the operator is also
applying back-pressure, so on slow observers the actual time passing
between two successive events may be higher than the
specified duration
.
- the delay to shift the source by
the source Observable shifted in time by the specified delay
Hold an Observer's subscription request for a specified amount of time before passing it on to the source Observable.
Hold an Observer's subscription request for a specified amount of time before passing it on to the source Observable.
is the time to wait before the subscription is being initiated.
Hold an Observer's subscription request until the given trigger
observable either emits an item or completes, before passing it on to
the source Observable.
Hold an Observer's subscription request until the given trigger
observable either emits an item or completes, before passing it on to
the source Observable.
If the given trigger
completes in error, then the subscription is
terminated with onError
.
- the observable that must either emit an item or complete in order for the source to be subscribed.
Given a function that returns a key for each element emitted by the source Observable, suppress duplicates items.
Given a function that returns a key for each element emitted by the source Observable, suppress duplicates items.
WARNING: this requires unbounded buffering.
Suppress the duplicate elements emitted by the source Observable.
Suppress the duplicate elements emitted by the source Observable.
WARNING: this requires unbounded buffering.
Suppress duplicate consecutive items emitted by the source Observable
Suppress duplicate consecutive items emitted by the source Observable
Executes the given callback if the downstream observer has canceled the streaming.
Executes the given callback when the stream has ended, but before the complete event is emitted.
Executes the given callback when the stream has ended, but before the complete event is emitted.
the callback to execute when the subscription is canceled
Executes the given callback when the stream is interrupted
with an error, before the onError
event is emitted downstream.
Executes the given callback when the stream is interrupted
with an error, before the onError
event is emitted downstream.
NOTE: should protect the code in this callback, because if it
throws an exception the onError
event will prefer signaling the
original exception and otherwise the behavior is undefined.
Executes the given callback only for the first element generated by the source Observable, useful for doing a piece of computation only when the stream started.
Executes the given callback only for the first element generated by the source Observable, useful for doing a piece of computation only when the stream started.
a new Observable that executes the specified callback only for the first element
Executes the given callback for each element generated by the source Observable, useful for doing side-effects.
Executes the given callback for each element generated by the source Observable, useful for doing side-effects.
a new Observable that executes the specified callback for each element
Drops the first n elements (from the start).
Drops the first n elements (from the start).
the number of elements to drop
a new Observable that drops the first n elements emitted by the source
Creates a new Observable that drops the events of the source, only
for the specified timestamp
window.
Creates a new Observable that drops the events of the source, only
for the specified timestamp
window.
the window of time during which the new Observable is must drop the events emitted by the source
Drops the longest prefix of elements that satisfy the given predicate and returns a new Observable that emits the rest.
Drops the longest prefix of elements that satisfy the given function and returns a new Observable that emits the rest.
Drops the longest prefix of elements that satisfy the given function and returns a new Observable that emits the rest. In comparison with dropWhile, this version accepts a function that takes an additional parameter: the zero-based index of the element.
Utility that can be used for debugging purposes.
Mirror the source observable as long as the source keeps emitting items,
otherwise if timeout
passes without the source emitting anything new
then the observable will emit the last item.
Mirror the source observable as long as the source keeps emitting items,
otherwise if timeout
passes without the source emitting anything new
then the observable will emit the last item.
This is the rough equivalent of:
Observable.merge(source, source.debounce(period))
Note: If the source Observable keeps emitting items more frequently than the length of the time window then the resulting observable will mirror the source exactly.
the window of silence that must pass in order for the observable to echo the last item
Mirror the source observable as long as the source keeps emitting items,
otherwise if timeout
passes without the source emitting anything new
then the observable will start emitting the last item repeatedly.
Mirror the source observable as long as the source keeps emitting items,
otherwise if timeout
passes without the source emitting anything new
then the observable will start emitting the last item repeatedly.
This is the rough equivalent of:
source.switch { e =>
e +: Observable.intervalWithFixedDelay(delay, delay)
}
Note: If the source Observable keeps emitting items more frequently than the length of the time window then the resulting observable will mirror the source exactly.
the window of silence that must pass in order for the observable to start echoing the last item
Creates a new Observable that emits the events of the source and then it also emits the given elements (appended to the stream).
Emits the given exception instead of onComplete
.
Emits the given exception instead of onComplete
.
the exception to emit onComplete
a new Observable that emits an exception onComplete
Returns an Observable that emits a single Throwable, in case an error was thrown by the source Observable, otherwise it isn't going to emit anything.
Returns an Observable which emits a single value, either true, in case the given predicate holds for at least one item, or false otherwise.
Returns an Observable which emits a single value, either true, in case the given predicate holds for at least one item, or false otherwise.
a function that evaluates the items emitted by the source Observable, returning true
if they pass the filter
an Observable that emits only true or false in case the given predicate holds or not for at least one item
Returns an Observable which only emits those items for which the given predicate holds.
Returns an Observable which only emits those items for which the given predicate holds.
a function that evaluates the items emitted by the source Observable,
returning true
if they pass the filter
an Observable that emits only those items in the original Observable
for which the filter evaluates as true
Returns an Observable which only emits the first item for which the predicate holds.
Returns an Observable which only emits the first item for which the predicate holds.
a function that evaluates the items emitted by the source Observable, returning true
if they pass the filter
an Observable that emits only the first item in the original Observable for which the filter evaluates as true
Emits the first element emitted by the source, or otherwise if the source is completed without
emitting anything, then the default
is emitted.
Emits the first element emitted by the source, or otherwise if the source is completed without
emitting anything, then the default
is emitted.
Alias for headOrElse
.
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.
a function that, when applied to an item emitted by the source Observable, returns an Observable
an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and concatenating the results of the Observables obtained from this transformation.
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.
It's an alias for Observable!.concatMapDelayError.
a function that, when applied to an item emitted by the source Observable, returns an Observable
an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and concatenating the results of the Observables obtained from this transformation.
An alias of Observable!.switchMap.
An alias of Observable!.switchMap.
Returns a new Observable that emits the items emitted by the Observable most recently generated by the mapping function.
Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.
Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.
It's the combination between scan and monifu.reactive.Observable.flatten.
Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.
Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.
It's the combination between scan and monifu.reactive.Observable.flattenDelayError.
Alias for Observable!.concat.
Alias for Observable!.concat.
Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.
The difference between the concat
operation and
merge is that concat
cares about
ordering of emitted items (e.g. all items emitted by the
first observable in the sequence will come before the
elements emitted by the second observable), whereas
merge
doesn't care about that (elements get emitted as
they come). Because of back-pressure applied to
observables, Observable!.concat is safe to use in all
contexts, whereas merge
requires buffering.
an Observable that emits items that are the result of
flattening the items emitted by the Observables emitted
by this
Alias for Observable!.concatDelayError.
Alias for Observable!.concatDelayError.
Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.
The difference between the concat
operation and
merge is that concat
cares about
ordering of emitted items (e.g. all items emitted by the
first observable in the sequence will come before the
elements emitted by the second observable), whereas
merge
doesn't care about that (elements get emitted as
they come). Because of back-pressure applied to
observables, Observable!.concat is safe to use in all
contexts, whereas merge
requires buffering.
This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.
an Observable that emits items that are the result of
flattening the items emitted by the Observables emitted
by this
Alias for Observable!.switch
Alias for Observable!.switch
Convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables.
Applies a binary operator to a start value and all elements of this Observable,
going left to right and returns a new Observable that emits only one item
before onComplete
.
Returns an Observable that emits a single boolean, either true, in case the given predicate holds for all the items emitted by the source, or false in case at least one item is not verifying the given predicate.
Returns an Observable that emits a single boolean, either true, in case the given predicate holds for all the items emitted by the source, or false in case at least one item is not verifying the given predicate.
a function that evaluates the items emitted by the source Observable, returning true
if they pass the filter
an Observable that emits only true or false in case the given predicate holds or not for all the items
Subscribes to the source Observable
and foreach element emitted by the source
it executes the given callback.
Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.
Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.
A GroupedObservable
will cache the items it is to emit until such time as it is
subscribed to. For this reason, in order to avoid memory leaks,
you should not simply ignore those GroupedObservables that do not
concern you. Instead, you can signal to them that they may
discard their buffers by doing something like source.take(0)
.
This variant of groupBy
specifies a keyBufferSize
representing the
size of the buffer that holds our keys. We cannot block when emitting
new GroupedObservable
. So by specifying a buffer size, on overflow
the resulting observable will terminate with an onError.
- the buffer size used for buffering keys
- a function that extracts the key for each item
Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.
Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.
Note: A GroupedObservable
will cache the items it is to emit until such time as it is
subscribed to. For this reason, in order to avoid memory leaks,
you should not simply ignore those GroupedObservables that do not
concern you. Instead, you can signal to them that they may
discard their buffers by doing something like source.take(0)
.
- a function that extracts the key for each item
Only emits the first element emitted by the source observable, after which it's completed immediately.
Emits the first element emitted by the source, or otherwise if the source is completed without
emitting anything, then the default
is emitted.
Alias for Observable!.complete.
Alias for Observable!.complete.
Ignores all items emitted by the source Observable and only calls onCompleted or onError.
an empty Observable that only calls onCompleted or onError, based on which one is called by the source Observable
Returns an Observable that emits true if the source Observable is empty, otherwise false.
Only emits the last element emitted by the source observable, after which it's completed immediately.
Given a function that transforms an Observable[T]
into an Observable[U]
,
it transforms the source observable into an Observable[U]
.
Returns an Observable that applies the given function to each item emitted by an Observable and emits the result.
Returns an Observable that applies the given function to each item emitted by an Observable and emits the result.
a function to apply to each item emitted by the Observable
an Observable that emits the items from the source Observable, transformed by the given function
Converts the source Observable that emits T
into an Observable
that emits Notification[T]
.
Converts the source Observable that emits T
into an Observable
that emits Notification[T]
.
NOTE: onComplete
is still emitted after an onNext(OnComplete)
notification
however an onError(ex)
notification is emitted as an onNext(OnError(ex))
followed by an onComplete
.
Takes the elements of the source Observable and emits the maximum value, after the source has completed.
Takes the elements of the source Observable and emits the element that has
the maximum key value, where the key is generated by the given function f
.
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.
- the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details
- a function that is used for signaling a special event used to inform the consumers that an overflow event happened, function that receives the number of dropped events as a parameter (see OverflowStrategy.Evicted)
an Observable that emits items that are the
result of flattening the items emitted by the Observables
emitted by this
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.
- the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details
an Observable that emits items that are the
result of flattening the items emitted by the Observables
emitted by this
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.
an Observable that emits items that are the
result of flattening the items emitted by the Observables
emitted by this
this operation needs to do buffering and by not specifying an OverflowStrategy, the default strategy is being used.
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.
This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.
- the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details
- a function that is used for signaling a special event used to inform the consumers that an overflow event happened, function that receives the number of dropped events as a parameter (see OverflowStrategy.Evicted)
an Observable that emits items that are the
result of flattening the items emitted by the Observables
emitted by this
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.
This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.
- the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details
an Observable that emits items that are the
result of flattening the items emitted by the Observables
emitted by this
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.
This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.
an Observable that emits items that are the
result of flattening the items emitted by the Observables
emitted by this
this operation needs to do buffering and by not specifying an OverflowStrategy, the default strategy is being used.
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
This function is the equivalent of observable.map(f).merge
.
The difference between concat and
merge
is that concat
cares about ordering of emitted
items (e.g. all items emitted by the first observable in
the sequence will come before the elements emitted by the
second observable), whereas merge
doesn't care about that
(elements get emitted as they come). Because of
back-pressure applied to observables, concat
is safe to use in all contexts, whereas
merge requires buffering.
- the transformation function
an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and merging the results of the Observables obtained from this transformation.
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
This function is the equivalent of observable.map(f).merge
.
The difference between concat and
merge
is that concat
cares about ordering of emitted
items (e.g. all items emitted by the first observable in
the sequence will come before the elements emitted by the
second observable), whereas merge
doesn't care about that
(elements get emitted as they come). Because of
back-pressure applied to observables, concat
is safe to use in all contexts, whereas
merge requires buffering.
This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.
- the transformation function
an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and merging the results of the Observables obtained from this transformation.
Takes the elements of the source Observable and emits the minimum value, after the source has completed.
Takes the elements of the source Observable and emits the element that has
the minimum key value, where the key is generated by the given function f
.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers).
Returns an Observable that emits false if the source Observable is empty, otherwise true.
Returns an Observable that mirrors the behavior of the source,
unless the source is terminated with an onError
, in which
case the streaming of events continues with the specified
backup sequence.
Returns an Observable that mirrors the behavior of the source,
unless the source is terminated with an onError
, in which
case the streaming of events continues with the specified
backup sequence.
The created Observable mirrors the behavior of the source in case the source does not end with an error.
NOTE that compared with onErrorResumeNext
from Rx.NET,
the streaming is not resumed in case the source is
terminated normally with an onComplete
.
- a backup sequence that's being subscribed in case the source terminates with an error.
Returns an Observable that mirrors the behavior of the source,
unless the source is terminated with an onError
, in which
case the streaming of events continues with the specified
backup sequence generated by the given partial function.
Returns an Observable that mirrors the behavior of the source,
unless the source is terminated with an onError
, in which
case the streaming of events continues with the specified
backup sequence generated by the given partial function.
The created Observable mirrors the behavior of the source
in case the source does not end with an error or if the
thrown Throwable
is not matched.
NOTE that compared with onErrorResumeNext
from Rx.NET,
the streaming is not resumed in case the source is
terminated normally with an onComplete
.
- a partial function that matches errors with a backup throwable that is subscribed when the source throws an error.
Returns an Observable that mirrors the behavior of the source,
unless the source is terminated with an onError
, in which case
it tries subscribing to the source again in the hope that
it will complete without an error.
Returns an Observable that mirrors the behavior of the source,
unless the source is terminated with an onError
, in which case
it tries subscribing to the source again in the hope that
it will complete without an error.
The number of retries is limited by the specified maxRetries
parameter, so for an Observable that always ends in error the
total number of subscriptions that will eventually happen is
maxRetries + 1
.
Returns an Observable that mirrors the behavior of the source,
unless the source is terminated with an onError
, in which case
it tries subscribing to the source again in the hope that
it will complete without an error.
Returns an Observable that mirrors the behavior of the source,
unless the source is terminated with an onError
, in which case
it tries subscribing to the source again in the hope that
it will complete without an error.
The given predicate establishes if the subscription should be retried or not.
Returns an Observable that mirrors the behavior of the source,
unless the source is terminated with an onError
, in which case
it tries subscribing to the source again in the hope that
it will complete without an error.
Returns an Observable that mirrors the behavior of the source,
unless the source is terminated with an onError
, in which case
it tries subscribing to the source again in the hope that
it will complete without an error.
NOTE: The number of retries is unlimited, so something like
Observable.error(new RuntimeException).onErrorRetryUnlimited
will loop
forever.
Subscribes to the stream.
Subscribes to the stream.
This function is "unsafe" to call because it does not protect the
calls to the given Observer implementation in regards to
unexpected exceptions that violate the contract, therefore the
given instance must respect its contract and not throw any
exceptions when the observable calls onNext
, onComplete
and
onError
. If it does, then the behavior is undefined.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a PublishSubject.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a AsyncSubject.
Applies a binary operator to a start value and all elements of this Observable,
going left to right and returns a new Observable that emits only one item
before onComplete
.
Repeats the items emitted by this Observable continuously.
Repeats the items emitted by this Observable continuously. It caches the generated items until onComplete
and repeats them ad infinitum. On error it terminates.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a ReplaySubject.
is the size of the buffer limiting the number of items that can be replayed (on overflow the head starts being dropped)
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a ReplaySubject.
Returns an Observable that, when the specified sampler Observable emits an item or completes, emits the most recently emitted item (if any) emitted by the source Observable since the previous emission from the sampler Observable.
Returns an Observable that, when the specified sampler Observable emits an item or completes, emits the most recently emitted item (if any) emitted by the source Observable since the previous emission from the sampler Observable.
Use the sample() method to periodically look at an Observable to see what item it has most recently emitted since the previous sampling. Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from the sample( ) operator will emit no item.
- the Observable to use for sampling the source Observable
Emit the most recent items emitted by an Observable within periodic time intervals.
Emit the most recent items emitted by an Observable within periodic time intervals.
Use the sample() method to periodically look at an Observable to see what item it has most recently emitted since the previous sampling. Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from the sample( ) operator will emit no item for that sampling period.
the initial delay after which sampling can happen
the timespan at which sampling occurs and note that this is
not accurate as it is subject to back-pressure concerns - as in
if the delay is 1 second and the processing of an event on onNext
in the observer takes one second, then the actual sampling delay
will be 2 seconds.
Emit the most recent items emitted by an Observable within periodic time intervals.
Emit the most recent items emitted by an Observable within periodic time intervals.
Use the sample
operator to periodically look at an Observable
to see what item it has most recently emitted since the previous
sampling. Note that if the source Observable has emitted no
items since the last time it was sampled, the Observable that
results from the sample( ) operator will emit no item for
that sampling period.
the timespan at which sampling occurs and note that this is
not accurate as it is subject to back-pressure concerns - as in
if the delay is 1 second and the processing of an event on onNext
in the observer takes one second, then the actual sampling delay
will be 2 seconds.
Returns an Observable that, when the specified sampler Observable emits an item or completes, emits the most recently emitted item (if any) emitted by the source Observable since the previous emission from the sampler Observable.
Returns an Observable that, when the specified sampler Observable emits an item or completes, emits the most recently emitted item (if any) emitted by the source Observable since the previous emission from the sampler Observable. If no new value has been emitted since the last time it was sampled, the emit the last emitted value anyway.
- the Observable to use for sampling the source Observable
Emit the most recent items emitted by an Observable within periodic time intervals.
Emit the most recent items emitted by an Observable within periodic time intervals. If no new value has been emitted since the last time it was sampled, the emit the last emitted value anyway.
Also see sample.
the initial delay after which sampling can happen
the timespan at which sampling occurs and note that this is
not accurate as it is subject to back-pressure concerns - as in
if the delay is 1 second and the processing of an event on onNext
in the observer takes one second, then the actual sampling delay
will be 2 seconds.
Emit the most recent items emitted by an Observable within periodic time intervals.
Emit the most recent items emitted by an Observable within periodic time intervals. If no new value has been emitted since the last time it was sampled, the emit the last emitted value anyway.
Also see Observable.sample.
the timespan at which sampling occurs and note that this is
not accurate as it is subject to back-pressure concerns - as in
if the delay is 1 second and the processing of an event on onNext
in the observer takes one second, then the actual sampling delay
will be 2 seconds.
Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits on each step the result of the applied function.
Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits on each step the result of the applied function.
Similar to foldLeft, but emits the state on each step. Useful for modeling finite state machines.
Returns a new Observable that multi-casts (shares) the original Observable.
Creates a new Observable that emits the given elements and then it also emits the events of the source (prepend operation).
Subscribes to the stream.
Subscribes to the stream.
a subscription that can be used to cancel the streaming.
Subscribes to the stream.
Subscribes to the stream.
a subscription that can be used to cancel the streaming.
Subscribes to the stream.
Subscribes to the stream.
a subscription that can be used to cancel the streaming.
Subscribes to the stream.
Subscribes to the stream.
a subscription that can be used to cancel the streaming.
Subscribes to the stream.
Subscribes to the stream.
a subscription that can be used to cancel the streaming.
Subscribes to the stream.
Subscribes to the stream.
a subscription that can be used to cancel the streaming.
Returns a new Observable that uses the specified
Scheduler
for initiating the subscription.
Given a source that emits numeric values, the sum
operator
sums up all values and at onComplete it emits the total.
Convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables.
Returns a new Observable that emits the items emitted by the Observable most recently generated by the mapping function.
Drops the first element of the source observable, emitting the rest.
Creates a new Observable that emits the events of the source, only
for the specified timestamp
, after which it completes.
Creates a new Observable that emits the events of the source, only
for the specified timestamp
, after which it completes.
the window of time during which the new Observable is allowed to emit the events of the source
Selects the first n elements (from the start).
Selects the first n elements (from the start).
the number of elements to take
a new Observable that emits only the first n elements from the source
Creates a new Observable that only emits the last n
elements
emitted by the source.
Takes longest prefix of elements that satisfy the given predicate and returns a new Observable that emits those elements.
Takes longest prefix of elements that satisfy the given predicate and returns a new Observable that emits those elements.
Returns an Observable that emits only the first item emitted by the source Observable during sequential time windows of a specified duration.
Returns an Observable that emits only the first item emitted by the source Observable during sequential time windows of a specified duration.
This differs from Observable!.throttleLast in that this only tracks
passage of time whereas throttleLast
ticks at scheduled intervals.
time to wait before emitting another item after emitting the last item
Returns an Observable that emits only the last item emitted by the source Observable during sequential time windows of a specified duration.
Returns an Observable that emits only the last item emitted by the source Observable during sequential time windows of a specified duration.
This differs from Observable!.throttleFirst in that this ticks along
at a scheduled interval whereas throttleFirst
does not tick, it just
tracks passage of time.
- duration of windows within which the last item emitted by the source Observable will be emitted
Alias for debounce.
Alias for debounce.
Returns an Observable that only emits those items emitted by the source Observable that are not followed by another emitted item within a specified time window.
Note: If the source Observable keeps emitting items more frequently than the length of the time window then no items will be emitted by the resulting Observable.
- the length of the window of time that must pass after the emission of an item from the source Observable in which that Observable emits no items in order for the item to be emitted by the resulting Observable
Returns an Observable that mirrors the source Observable but applies a timeout overflowStrategy for each emitted item.
Returns an Observable that mirrors the source Observable but applies a timeout overflowStrategy for each emitted item. If the next item isn't emitted within the specified timeout duration starting from its predecessor, the resulting Observable begins instead to mirror a backup Observable.
maximum duration between emitted items before a timeout occurs
is the backup observable to subscribe to in case of a timeout
Returns an Observable that mirrors the source Observable but applies a timeout overflowStrategy for each emitted item.
Returns an Observable that mirrors the source Observable but applies a timeout overflowStrategy for each emitted item. If the next item isn't emitted within the specified timeout duration starting from its predecessor, the resulting Observable terminates and notifies observers of a TimeoutException.
maximum duration between emitted items before a timeout occurs
Wraps this Observable into a org.reactivestreams.Publisher
.
Wraps this Observable into a org.reactivestreams.Publisher
.
See the Reactive Streams
protocol that Monifu implements.
While the destination observer is busy, buffers events, applying the given overflowStrategy.
While the destination observer is busy, buffers events, applying the given overflowStrategy.
- the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details
- a function that is used for signaling a special event used to inform the consumers that an overflow event happened, function that receives the number of dropped events as a parameter (see OverflowStrategy.Evicted)
While the destination observer is busy, buffers events, applying the given overflowStrategy.
While the destination observer is busy, buffers events, applying the given overflowStrategy.
- the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details
While the destination observer is busy, drop the incoming events.
While the destination observer is busy, drop the incoming events. When the downstream recovers, we can signal a special event meant to inform the downstream observer how many events where dropped.
- a function that is used for signaling a special event used to inform the consumers that an overflow event happened, function that receives the number of dropped events as a parameter (see OverflowStrategy.Evicted)
While the destination observer is busy, drop the incoming events.
Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.
Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.
The resulting Observable emits connected, non-overlapping windows, each of a fixed duration specified by the timespan argument. When the source Observable completes or encounters an error, the resulting Observable emits the current window and propagates the notification from the source Observable.
the interval of time at which it should complete the current window and emit a new one
the maximum size of each window
Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.
Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.
The resulting Observable emits connected, non-overlapping windows, each of a fixed duration specified by the timespan argument. When the source Observable completes or encounters an error, the resulting Observable emits the current window and propagates the notification from the source Observable.
the interval of time at which it should complete the current window and emit a new one
Returns an Observable that emits windows of items it collects from the source Observable.
Returns an Observable that emits windows of items it collects from the source Observable. The resulting Observable emits windows every skip items, each containing no more than count items. When the source Observable completes or encounters an error, the resulting Observable emits the current window and propagates the notification from the source Observable.
There are 3 possibilities:
1. in case skip == count
, then there are no items dropped and no overlap,
the call being equivalent to window(count)
2. in case skip < count
, then overlap between windows happens, with the
number of elements being repeated being count - skip
3. in case skip > count
, then skip - count
elements start getting
dropped between windows
- the maximum size of each window before it should be emitted
- how many items need to be skipped before starting a new window
Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.
Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.
This variant of window opens its first window immediately. It closes the currently open window and immediately opens a new one whenever the current window has emitted count items. It will also close the currently open window if it receives an onCompleted or onError notification from the source Observable. This variant of window emits a series of non-overlapping windows whose collective emissions correspond one-to-one with those of the source Observable.
the bundle size
Creates a new Observable from this Observable and another given Observable, by emitting elements combined in pairs.
Creates a new Observable from this Observable and another given Observable, by emitting elements combined in pairs. If one of the Observable emits fewer events than the other, then the rest of the unpaired events are ignored.
Zips the emitted elements of the source with their indices.
The Observable interface in the Rx pattern.
Interface
An Observable is characterized by a
onSubscribe
method that needs to be implemented. In simple terms, an Observable might as well be just a function like:In other words an Observable is something that provides a side-effecting function that can connect a Subscriber to a stream of data. A
Subscriber
is a cross between an Observer and a Scheduler. We need aScheduler
when callingsubscribe
because that's when the side-effects happen and a context capable of scheduling tasks for asynchronous execution is needed. An Observer on the other hand is the interface implemented by consumer and that receives events according to the Rx grammar.On
onSubscribe
, because we need the interesting operators and the polymorphic behavior provided by OOP, the Observable is being described as an interface that has to be implemented:Of course, you don't need to inherit from this trait, as you can just use Observable.create, the following example being equivalent to the above:
The above is describing how to create your own Observables, however Monifu already provides already made utilities in the Observable companion object. For example, to periodically make a request to a web service, you could do it like this:
As you might notice, in the above example we are doing Observable!.flatMap on an Observable that emits
Future
instances. And it works, because Monifu considers Scala's Futures to be just a subset of Observables, see the automatic FutureIsObservable conversion that it defines. Or you could just use Observable.fromFuture for explicit conversions, an Observable builder available amongst others.Contract
Observables must obey Monifu's contract, this is why if you get away with already built and tested observables, that would be better than implementing your own by means of inheriting the interface or by using create. The contract is this:
onSubscribe
method MUST NOT throw exceptions, any unforeseen errors that happen in user-code must be emitted to the observers and the streaming closedonNext* (onComplete | onError)
onNext
eventsonComplete
oronError
onNext
event must happen only after the previousonNext
completed with a ContinueonNext
event is signaling a CancelonComplete
andonError
must happen only after the previousonNext
was completed with a Continue acknowledgementonNext
event can be sent directly, since there are no previous eventsonNext
events, then streams can be closed withonComplete
andonError
directlyonError
happens such that its delivery is prioritizedOn Dealing with the contract
Back-pressure means in essence that the speed with which the data-source produces events is adjusted to the speed with which the consumer consumes.
For example, lets say we want to feed an iterator into an observer, similar to what we are doing in Observer.feed, we might build a loop like this:
There are cases in which the data-source can't be slowed down in response to the demand signaled through back-pressure. For such cases buffering is needed.
For example to "imperatively" build an Observable, we could use channels:
In Monifu a Channel is much like a Subject, meaning that it can be used to construct observables, except that a
Channel
has a buffer attached and IS NOT anObserver
(like theSubject
is). In Monifu (compared to Rx implementations) Subjects are subject to back-pressure concerns as well, so they can't be used in an imperative way, like described above, hence the need for Channels.Or for more serious and lower level jobs, you can simply take an
Observer
and wrap it into a BufferedSubscriber.Channel, which are meant for imperatively building Observables without back-pressure concerns
Subject, which are both Observables and Observers
Cancelable, the type returned by higher level
subscribe
variants and that can be used to cancel subscriptionsSubscriber, the cross between an Observer and a Scheduler
Scheduler, our enhanced
ExecutionContext
Observer, the interface that must be implemented by consumers