trait Observable[+A] extends ObservableLike[A, Observable]
The Observable type that implements the Reactive Pattern.
Provides methods of subscribing to the Observable and operators for combining observable sources, filtering, modifying, throttling, buffering, error handling and others.
See the available documentation at: https://monix.io
- Self Type
- Observable[A]
- Source
- Observable.scala
- Alphabetic
- By Inheritance
- Observable
- ObservableLike
- Serializable
- Serializable
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Abstract Value Members
-
abstract
def
unsafeSubscribeFn(subscriber: Subscriber[A]): Cancelable
Characteristic function for an
Observable
instance, that creates the subscription and that eventually starts the streaming of events to the given Observer, being meant to be provided.Characteristic function for an
Observable
instance, that creates the subscription and that eventually starts the streaming of events to the given Observer, being meant to be provided.This function is "unsafe" to call because it does not protect the calls to the given Observer implementation in regards to unexpected exceptions that violate the contract, therefore the given instance must respect its contract and not throw any exceptions when the observable calls
onNext
,onComplete
andonError
. If it does, then the behavior is undefined.- See also
Concrete Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
def
++[B >: A](other: Observable[B]): Observable[B]
Concatenates the source with another observable.
Concatenates the source with another observable.
Ordering of subscription is preserved, so the second observable starts only after the source observable is completed successfully with an
onComplete
. On the other hand, the second observable is never subscribed if the source completes with an error.- Definition Classes
- ObservableLike
-
def
+:[B >: A](elem: B): Observable[B]
Creates a new Observable that emits the given element and then it also emits the events of the source (prepend operation).
Creates a new Observable that emits the given element and then it also emits the events of the source (prepend operation).
- Definition Classes
- ObservableLike
-
def
:+[B >: A](elem: B): Observable[B]
Creates a new Observable that emits the events of the source and then it also emits the given element (appended to the stream).
Creates a new Observable that emits the events of the source and then it also emits the given element (appended to the stream).
- Definition Classes
- ObservableLike
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
ambWith[B >: A](other: Observable[B]): Observable[B]
Given the source observable and another
Observable
, emits all of the items from the first of these Observables to emit an item and cancel the other.Given the source observable and another
Observable
, emits all of the items from the first of these Observables to emit an item and cancel the other.- Definition Classes
- ObservableLike
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
asyncBoundary[B >: A](overflowStrategy: OverflowStrategy[B]): Observable[B]
Forces a buffered asynchronous boundary.
Forces a buffered asynchronous boundary.
Internally it wraps the observer implementation given to
onSubscribe
into a BufferedSubscriber.Normally Monix's implementation guarantees that events are not emitted concurrently, and that the publisher MUST NOT emit the next event without acknowledgement from the consumer that it may proceed, however for badly behaved publishers, this wrapper provides the guarantee that the downstream Observer given in
subscribe
will not receive concurrent events.WARNING: if the buffer created by this operator is unbounded, it can blow up the process if the data source is pushing events faster than what the observer can consume, as it introduces an asynchronous boundary that eliminates the back-pressure requirements of the data source. Unbounded is the default overflowStrategy, see OverflowStrategy for options.
- overflowStrategy
- the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details.
- Definition Classes
- ObservableLike
-
def
behavior[B >: A](initialValue: B)(implicit s: Scheduler): ConnectableObservable[B]
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a BehaviorSubject.
-
def
bufferIntrospective(maxSize: Int): Observable[List[A]]
Buffers signals while busy, after which it emits the buffered events as a single bundle.
Buffers signals while busy, after which it emits the buffered events as a single bundle.
This operator starts applying back-pressure when the underlying buffer's size is exceeded.
- Definition Classes
- ObservableLike
-
def
bufferSliding(count: Int, skip: Int): Observable[Seq[A]]
Returns an observable that emits buffers of items it collects from the source observable.
Returns an observable that emits buffers of items it collects from the source observable. The resulting observable emits buffers every
skip
items, each containingcount
items.If the source observable completes, then the current buffer gets signaled downstream. If the source triggers an error then the current buffer is being dropped and the error gets propagated immediately.
For
count
andskip
there are 3 possibilities:- in case
skip == count
, then there are no items dropped and no overlap, the call being equivalent tobuffer(count)
- in case
skip < count
, then overlap between buffers happens, with the number of elements being repeated beingcount - skip
- in case
skip > count
, thenskip - count
elements start getting dropped between windows
- count
the maximum size of each buffer before it should be emitted
- skip
how many items emitted by the source observable should be skipped before starting a new buffer. Note that when skip and count are equal, this is the same operation as
buffer(count)
- Definition Classes
- ObservableLike
- in case
-
def
bufferTimed(timespan: FiniteDuration): Observable[Seq[A]]
Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time.
Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time.
This version of
buffer
emits a new bundle of items periodically, every timespan amount of time, containing all items emitted by the source Observable since the previous bundle emission.If the source observable completes, then the current buffer gets signaled downstream. If the source triggers an error then the current buffer is being dropped and the error gets propagated immediately.
- timespan
the interval of time at which it should emit the buffered bundle
- Definition Classes
- ObservableLike
-
def
bufferTimedAndCounted(timespan: FiniteDuration, maxCount: Int): Observable[Seq[A]]
Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time.
Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time.
The resulting observable emits connected, non-overlapping buffers, each of a fixed duration specified by the
timespan
argument or a maximum size specified by themaxCount
argument (whichever is reached first).If the source observable completes, then the current buffer gets signaled downstream. If the source triggers an error then the current buffer is being dropped and the error gets propagated immediately.
- timespan
the interval of time at which it should emit the buffered bundle
- maxCount
is the maximum bundle size, after which the buffered bundle gets forcefully emitted
- Definition Classes
- ObservableLike
-
def
bufferTimedWithPressure(period: FiniteDuration, maxSize: Int): Observable[Seq[A]]
Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time.
Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time. Back-pressure the source when the buffer is full.
The resulting observable emits connected, non-overlapping buffers, each of a fixed duration specified by the
period
argument.The bundles are emitted at a fixed rate. If the source is silent, then the resulting observable will start emitting empty sequences.
If the source observable completes, then the current buffer gets signaled downstream. If the source triggers an error then the current buffer is being dropped and the error gets propagated immediately.
A
maxSize
argument is specified as the capacity of the bundle. In case the source is too fast andmaxSize
is reached, then the source will be back-pressured.The difference with bufferTimedAndCounted is that bufferTimedWithPressure applies back-pressure from the time when the buffer is full until the buffer is emitted, whereas bufferTimedAndCounted will forcefully emit the buffer when it's full.
- period
the interval of time at which it should emit the buffered bundle
- maxSize
is the maximum buffer size, after which the source starts being back-pressured
- Definition Classes
- ObservableLike
-
def
bufferTumbling(count: Int): Observable[Seq[A]]
Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time.
Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time. This version of
buffer
is emitting items once the internal buffer has reached the given count.If the source observable completes, then the current buffer gets signaled downstream. If the source triggers an error then the current buffer is being dropped and the error gets propagated immediately.
- count
the maximum size of each buffer before it should be emitted
- Definition Classes
- ObservableLike
-
def
bufferWithSelector[S](selector: Observable[S], maxSize: Int): Observable[Seq[A]]
Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time, whenever the
selector
observable signals an event.Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time, whenever the
selector
observable signals an event.The resulting observable collects the elements of the source in a buffer and emits that buffer whenever the given
selector
observable emits anonNext
event, when the buffer is emitted as a sequence downstream and then reset. Thus the resulting observable emits connected, non-overlapping bundles triggered by the givenselector
.If
selector
terminates with anonComplete
, then the resulting observable also terminates normally. Ifselector
terminates with anonError
, then the resulting observable also terminates with an error.If the source observable completes, then the current buffer gets signaled downstream. If the source triggers an error then the current buffer is being dropped and the error gets propagated immediately.
A
maxSize
argument is specified as the capacity of the bundle. In case the source is too fast andmaxSize
is reached, then the source will be back-pressured.- selector
is the observable that triggers the signaling of the current buffer
- maxSize
is the maximum bundle size, after which the source starts being back-pressured
- Definition Classes
- ObservableLike
-
def
bufferWithSelector[S](selector: Observable[S]): Observable[Seq[A]]
Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time, whenever the
selector
observable signals an event.Periodically gather items emitted by an observable into bundles and emit these bundles rather than emitting the items one at a time, whenever the
selector
observable signals an event.The resulting observable collects the elements of the source in a buffer and emits that buffer whenever the given
selector
observable emits anonNext
event, when the buffer is emitted as a sequence downstream and then reset. Thus the resulting observable emits connected, non-overlapping bundles triggered by the givenselector
.If
selector
terminates with anonComplete
, then the resulting observable also terminates normally. Ifselector
terminates with anonError
, then the resulting observable also terminates with an error.If the source observable completes, then the current buffer gets signaled downstream. If the source triggers an error then the current buffer is being dropped and the error gets propagated immediately.
- selector
is the observable that triggers the signaling of the current buffer
- Definition Classes
- ObservableLike
-
def
cache(maxCapacity: Int): Observable[A]
Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers.
Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers. This operator has similar behavior to replay except that this auto-subscribes to the source Observable rather than returning a ConnectableObservable for which you must call connect to activate the subscription.
When you call cache, it does not yet subscribe to the source Observable and so does not yet begin caching items. This only happens when the first Subscriber calls the resulting Observable's
subscribe
method.- maxCapacity
is the maximum buffer size after which old events start being dropped (according to what happens when using ReplaySubject.createLimited)
- returns
an Observable that, when first subscribed to, caches all of its items and notifications for the benefit of subsequent subscribers
-
def
cache: Observable[A]
Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers.
Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers. This operator has similar behavior to replay except that this auto-subscribes to the source Observable rather than returning a ConnectableObservable for which you must call connect to activate the subscription.
When you call cache, it does not yet subscribe to the source Observable and so does not yet begin caching items. This only happens when the first Subscriber calls the resulting Observable's
subscribe
method.Note: You sacrifice the ability to cancel the origin when you use the cache operator so be careful not to use this on Observables that emit an infinite or very large number of items that will use up memory.
- returns
an Observable that, when first subscribed to, caches all of its items and notifications for the benefit of subsequent subscribers
-
def
clone(): AnyRef
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
def
collect[B](pf: PartialFunction[A, B]): Observable[B]
Applies the given partial function to the source for each element for which the given partial function is defined.
Applies the given partial function to the source for each element for which the given partial function is defined.
- pf
the function that filters and maps the source
- returns
an observable that emits the transformed items by the given partial function
- Definition Classes
- ObservableLike
-
def
combineLatest[B](other: Observable[B]): Observable[(A, B)]
Creates a new observable from the source and another given observable, by emitting elements combined in pairs.
Creates a new observable from the source and another given observable, by emitting elements combined in pairs. If one of the observables emits fewer events than the other, then the rest of the unpaired events are ignored.
See zip for an alternative that pairs the items in strict sequence.
- other
is an observable that gets paired with the source
- Definition Classes
- ObservableLike
-
def
combineLatestMap[B, R](other: Observable[B])(f: (A, B) ⇒ R): Observable[R]
Creates a new observable from the source and another given observable, by emitting elements combined in pairs.
Creates a new observable from the source and another given observable, by emitting elements combined in pairs. If one of the observables emits fewer events than the other, then the rest of the unpaired events are ignored.
See zipMap for an alternative that pairs the items in strict sequence.
- other
is an observable that gets paired with the source
- f
is a mapping function over the generated pairs
- Definition Classes
- ObservableLike
-
def
completed: Observable[Nothing]
Ignores all items emitted by the source Observable and only calls onCompleted or onError.
Ignores all items emitted by the source Observable and only calls onCompleted or onError.
- returns
an empty Observable that only calls onCompleted or onError, based on which one is called by the source Observable
- Definition Classes
- ObservableLike
-
def
completedL: Task[Unit]
Creates a new Task that will consume the source observable and upon completion of the source it will complete with
Unit
. -
def
concat[B](implicit ev: <:<[A, Observable[B]]): Observable[B]
Concatenates the sequence of observables emitted by the source into one observable, without any transformation.
Concatenates the sequence of observables emitted by the source into one observable, without any transformation.
You can combine the items emitted by multiple observables so that they act like a single sequence by using this operator.
The difference between the
concat
operation andmerge
is thatconcat
cares about the ordering of sequences (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereasmerge
doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables,concat
is safe to use in all contexts, whereasmerge
requires buffering.- returns
an observable that emits items that are the result of flattening the items emitted by the observables emitted by the source
- Definition Classes
- ObservableLike
-
def
concatDelayErrors[B](implicit ev: <:<[A, Observable[B]]): Observable[B]
Concatenates the sequence of observables emitted by the source into one observable, without any transformation.
Concatenates the sequence of observables emitted by the source into one observable, without any transformation.
You can combine the items emitted by multiple observables so that they act like a single sequence by using this operator.
The difference between the
concat
operation andmerge
is thatconcat
cares about the ordering of sequences (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereasmerge
doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables,concat
is safe to use in all contexts, whereasmerge
requires buffering.This version is reserving onError notifications until all of the observables complete and only then passing the issued errors(s) downstream. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.
- returns
an observable that emits items that are the result of flattening the items emitted by the observables emitted by the source
- Definition Classes
- ObservableLike
-
def
concatMap[B](f: (A) ⇒ Observable[B]): Observable[B]
Applies a function that you supply to each item emitted by the source observable, where that function returns observables, and then concatenating those resulting sequences and emitting the results of this concatenation.
Applies a function that you supply to each item emitted by the source observable, where that function returns observables, and then concatenating those resulting sequences and emitting the results of this concatenation.
The difference between the
concat
operation andmerge
is thatconcat
cares about the ordering of sequences (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereasmerge
doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables,concat
is safe to use in all contexts, whereasmerge
requires buffering.- Definition Classes
- ObservableLike
-
def
concatMapDelayErrors[B](f: (A) ⇒ Observable[B]): Observable[B]
Applies a function that you supply to each item emitted by the source observable, where that function returns sequences and then concatenating those resulting sequences and emitting the results of this concatenation.
Applies a function that you supply to each item emitted by the source observable, where that function returns sequences and then concatenating those resulting sequences and emitting the results of this concatenation.
This version is reserving onError notifications until all of the observables complete and only then passing the issued errors(s) downstream. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.
- f
a function that, when applied to an item emitted by the source, returns an observable
- returns
an observable that emits items that are the result of flattening the items emitted by the observables emitted by the source
- Definition Classes
- ObservableLike
-
def
consumeWith[R](f: Consumer[A, R]): Task[R]
On execution, consumes the source observable with the given Consumer, effectively transforming the source observable into a Task.
-
def
countF: Observable[Long]
Creates a new Observable that emits the total number of
onNext
events that were emitted by the source.Creates a new Observable that emits the total number of
onNext
events that were emitted by the source.Note that this Observable emits only one item after the source is complete. And in case the source emits an error, then only that error will be emitted.
- Definition Classes
- ObservableLike
-
def
countL: Task[Long]
Creates a task that emits the total number of
onNext
events that were emitted by the source. -
def
debounce(timeout: FiniteDuration): Observable[A]
Only emit an item from an observable if a particular timespan has passed without it emitting another item.
Only emit an item from an observable if a particular timespan has passed without it emitting another item.
Note: If the source observable keeps emitting items more frequently than the length of the time window, then no items will be emitted by the resulting observable.
- timeout
the length of the window of time that must pass after the emission of an item from the source observable in which that observable emits no items in order for the item to be emitted by the resulting observable
- Definition Classes
- ObservableLike
- See also
echoOnce for a similar operator that also mirrors the source observable
-
def
debounceRepeated(period: FiniteDuration): Observable[A]
Emits the last item from the source Observable if a particular timespan has passed without it emitting another item, and keeps emitting that item at regular intervals until the source breaks the silence.
Emits the last item from the source Observable if a particular timespan has passed without it emitting another item, and keeps emitting that item at regular intervals until the source breaks the silence.
So compared to regular debounceTo this version keeps emitting the last item of the source.
Note: If the source Observable keeps emitting items more frequently than the length of the time window then no items will be emitted by the resulting Observable.
- period
the length of the window of time that must pass after the emission of an item from the source Observable in which that Observable emits no items in order for the item to be emitted by the resulting Observable at regular intervals, also determined by period
- Definition Classes
- ObservableLike
- See also
echoRepeated for a similar operator that also mirrors the source observable
-
def
debounceTo[B](timeout: FiniteDuration, f: (A) ⇒ Observable[B]): Observable[B]
Doesn't emit anything until a
timeout
period passes without the source emitting anything.Doesn't emit anything until a
timeout
period passes without the source emitting anything. When that timeout happens, we subscribe to the observable generated by the given function, an observable that will keep emitting until the source will break the silence by emitting another event.Note: If the source observable keeps emitting items more frequently than the length of the time window, then no items will be emitted by the resulting Observable.
- timeout
the length of the window of time that must pass after the emission of an item from the source Observable in which that Observable emits no items in order for the item to be emitted by the resulting Observable
- f
is a function that receives the last element generated by the source, generating an observable to be subscribed when the source is timing out
- Definition Classes
- ObservableLike
-
def
defaultIfEmpty[B >: A](default: ⇒ B): Observable[B]
Emit items from the source, or emit a default item if the source completes after emitting no items.
Emit items from the source, or emit a default item if the source completes after emitting no items.
- Definition Classes
- ObservableLike
-
def
delayOnComplete(delay: FiniteDuration): Observable[A]
Delays emitting the final
onComplete
event by the specified amount.Delays emitting the final
onComplete
event by the specified amount.- Definition Classes
- ObservableLike
-
def
delayOnNext(duration: FiniteDuration): Observable[A]
Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a specified delay.
Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a specified delay.
Each time the source Observable emits an item, delay starts a timer, and when that timer reaches the given duration, the Observable returned from delay emits the same item.
NOTE: this delay refers strictly to the time between the
onNext
event coming from our source and the time it takes the downstream observer to get this event. On the other hand the operator is also applying back-pressure, so on slow observers the actual time passing between two successive events may be higher than the specifiedduration
.- duration
- the delay to shift the source by
- returns
the source Observable shifted in time by the specified delay
- Definition Classes
- ObservableLike
-
def
delayOnNextBySelector[B](selector: (A) ⇒ Observable[B]): Observable[A]
Returns an Observable that emits the items emitted by the source Observable shifted forward in time.
Returns an Observable that emits the items emitted by the source Observable shifted forward in time.
This variant of
delay
sets its delay duration on a per-item basis by passing each item from the source Observable into a function that returns an Observable and then monitoring those Observables. When any such Observable emits an item or completes, the Observable returned by delay emits the associated item.- selector
is a function that returns an Observable for each item emitted by the source Observable, which is then used to delay the emission of that item by the resulting Observable until the Observable returned from
selector
emits an item- returns
the source Observable shifted in time by the specified delay
- Definition Classes
- ObservableLike
-
def
delaySubscription(timespan: FiniteDuration): Observable[A]
Hold an Observer's subscription request for a specified amount of time before passing it on to the source Observable.
Hold an Observer's subscription request for a specified amount of time before passing it on to the source Observable.
- timespan
is the time to wait before the subscription is being initiated.
- Definition Classes
- ObservableLike
-
def
delaySubscriptionWith(trigger: Observable[Any]): Observable[A]
Hold an Observer's subscription request until the given
trigger
observable either emits an item or completes, before passing it on to the source Observable.Hold an Observer's subscription request until the given
trigger
observable either emits an item or completes, before passing it on to the source Observable.If the given
trigger
completes in error, then the subscription is terminated withonError
.- trigger
the observable that must either emit an item or complete in order for the source to be subscribed.
- Definition Classes
- ObservableLike
-
def
dematerialize[B](implicit ev: <:<[A, Notification[B]]): Observable[B]
Converts the source Observable that emits
Notification[A]
(the result of materialize) back to an Observable that emitsA
.Converts the source Observable that emits
Notification[A]
(the result of materialize) back to an Observable that emitsA
.- Definition Classes
- ObservableLike
-
def
distinct: Observable[A]
Suppress the duplicate elements emitted by the source Observable.
Suppress the duplicate elements emitted by the source Observable.
WARNING: this requires unbounded buffering.
- Definition Classes
- ObservableLike
-
def
distinctByKey[K](key: (A) ⇒ K): Observable[A]
Given a function that returns a key for each element emitted by the source Observable, suppress duplicates items.
Given a function that returns a key for each element emitted by the source Observable, suppress duplicates items.
WARNING: this requires unbounded buffering.
- Definition Classes
- ObservableLike
-
def
distinctUntilChanged: Observable[A]
Suppress duplicate consecutive items emitted by the source Observable
Suppress duplicate consecutive items emitted by the source Observable
- Definition Classes
- ObservableLike
-
def
distinctUntilChangedByKey[K](key: (A) ⇒ K): Observable[A]
Suppress duplicate consecutive items emitted by the source Observable
Suppress duplicate consecutive items emitted by the source Observable
- Definition Classes
- ObservableLike
-
def
doAfterSubscribe(cb: () ⇒ Unit): Observable[A]
Executes the given callback just after the subscription happens.
Executes the given callback just after the subscription happens.
- Definition Classes
- ObservableLike
- See also
doOnSubscribe for executing a callback just before a subscription happens.
-
def
doAfterTerminate(cb: (Option[Throwable]) ⇒ Unit): Observable[A]
Executes the given callback after the stream has ended either with an
onComplete
oronError
event, or when the streaming stops by a downstreamStop
being signaled.Executes the given callback after the stream has ended either with an
onComplete
oronError
event, or when the streaming stops by a downstreamStop
being signaled.This differs from doOnTerminate in that this happens *after* the
onComplete
oronError
notification.- Definition Classes
- ObservableLike
- See also
doAfterTerminateEval for a version that allows for asynchronous evaluation by means of Task.
-
def
doAfterTerminateEval(cb: (Option[Throwable]) ⇒ Task[Unit]): Observable[A]
Evaluates the task generated by the given callback after the stream has ended either with an
onComplete
oronError
event, or when the streaming stops by a downstreamStop
being signaled.Evaluates the task generated by the given callback after the stream has ended either with an
onComplete
oronError
event, or when the streaming stops by a downstreamStop
being signaled.This operation subsumes doOnEarlyStopEval and the callback-generated Task will back-pressure the source when applied for
Stop
events returned byonNext
and thus the upstream source will receive theStop
result only after the task has finished executing.This differs from doOnTerminateEval in that this happens *after* the
onComplete
oronError
notification.- Definition Classes
- ObservableLike
- See also
doAfterTerminate for a simpler version that doesn't allow asynchronous execution.
-
def
doOnComplete(cb: () ⇒ Unit): Observable[A]
Executes the given callback when the stream has ended with an
onComplete
event, but before the complete event is emitted.Executes the given callback when the stream has ended with an
onComplete
event, but before the complete event is emitted.Unless you know what you're doing, you probably want to use doOnTerminate and doOnSubscriptionCancel for proper disposal of resources on completion.
- cb
the callback to execute when the
onComplete
event gets emitted
- Definition Classes
- ObservableLike
- See also
doOnCompleteEval for a version that allows for asynchronous evaluation by means of Task.
-
def
doOnCompleteEval(task: Task[Unit]): Observable[A]
Evaluates the given task when the stream has ended with an
onComplete
event, but before the complete event is emitted.Evaluates the given task when the stream has ended with an
onComplete
event, but before the complete event is emitted.The task gets evaluated and is finished *before* the
onComplete
signal gets sent downstream.Unless you know what you're doing, you probably want to use doOnTerminateEval and doOnSubscriptionCancel for proper disposal of resources on completion.
- task
the task to execute when the
onComplete
event gets emitted
- Definition Classes
- ObservableLike
- See also
doOnComplete for a simpler version that doesn't do asynchronous execution
-
def
doOnEarlyStop(cb: () ⇒ Unit): Observable[A]
Executes the given callback when the streaming is stopped due to a downstream Stop signal returned by onNext.
Executes the given callback when the streaming is stopped due to a downstream Stop signal returned by onNext.
- Definition Classes
- ObservableLike
- See also
doOnEarlyStopEval for a version that allows for asynchronous evaluation by means of Task.
-
def
doOnEarlyStopEval(task: Task[Unit]): Observable[A]
Executes the given task when the streaming is stopped due to a downstream Stop signal returned by onNext.
Executes the given task when the streaming is stopped due to a downstream Stop signal returned by onNext.
The given
task
gets evaluated *before* the upstream receives theStop
event (is back-pressured).- Definition Classes
- ObservableLike
- See also
doOnEarlyStop for a simpler version that doesn't do asynchronous execution
-
def
doOnError(cb: (Throwable) ⇒ Unit): Observable[A]
Executes the given callback when the stream is interrupted with an error, before the
onError
event is emitted downstream.Executes the given callback when the stream is interrupted with an error, before the
onError
event is emitted downstream.NOTE: should protect the code in this callback, because if it throws an exception the
onError
event will prefer signaling the original exception and otherwise the behavior is undefined.- Definition Classes
- ObservableLike
- See also
doOnTerminate and doOnSubscriptionCancel for handling resource disposal, also see doOnErrorEval for a version that does asynchronous evaluation by means of Task.
-
def
doOnErrorEval(cb: (Throwable) ⇒ Task[Unit]): Observable[A]
Executes the given task when the stream is interrupted with an error, before the
onError
event is emitted downstream.Executes the given task when the stream is interrupted with an error, before the
onError
event is emitted downstream.NOTE: should protect the code in this callback, because if it throws an exception the
onError
event will prefer signaling the original exception and otherwise the behavior is undefined.- Definition Classes
- ObservableLike
- See also
doOnTerminateEval and doOnSubscriptionCancel for handling resource disposal, also see doOnError for a simpler version that doesn't do asynchronous execution.
-
def
doOnNext(cb: (A) ⇒ Unit): Observable[A]
Executes the given callback for each element generated by the source Observable, useful for doing side-effects.
Executes the given callback for each element generated by the source Observable, useful for doing side-effects.
- returns
a new Observable that executes the specified callback for each element
- Definition Classes
- ObservableLike
- See also
doOnNextEval for a version that allows for asynchronous evaluation by means of Task.
-
def
doOnNextAck(cb: (A, Ack) ⇒ Unit): Observable[A]
Executes the given callback on each acknowledgement received from the downstream subscriber.
Executes the given callback on each acknowledgement received from the downstream subscriber.
This method helps in executing logic after messages get processed, for example when messages are polled from some distributed message queue and an acknowledgement needs to be sent after each message in order to mark it as processed.
- Definition Classes
- ObservableLike
- See also
doOnNextAckEval for a version that allows for asynchronous evaluation by means of Task.
-
def
doOnNextAckEval(cb: (A, Ack) ⇒ Task[Unit]): Observable[A]
Executes the given callback on each acknowledgement received from the downstream subscriber, executing a generated Task and back-pressuring until the task is done.
Executes the given callback on each acknowledgement received from the downstream subscriber, executing a generated Task and back-pressuring until the task is done.
This method helps in executing logic after messages get processed, for example when messages are polled from some distributed message queue and an acknowledgement needs to be sent after each message in order to mark it as processed.
- Definition Classes
- ObservableLike
- See also
doOnNextAck for a simpler version that doesn't allow asynchronous execution.
-
def
doOnNextEval(cb: (A) ⇒ Task[Unit]): Observable[A]
Evaluates the given callback for each element generated by the source Observable, useful for triggering async side-effects.
Evaluates the given callback for each element generated by the source Observable, useful for triggering async side-effects.
- returns
a new Observable that executes the specified callback for each element
- Definition Classes
- ObservableLike
- See also
doOnNext for a simpler version that doesn't allow asynchronous execution.
-
def
doOnStart(cb: (A) ⇒ Unit): Observable[A]
Executes the given callback only for the first element generated by the source Observable, useful for doing a piece of computation only when the stream starts.
Executes the given callback only for the first element generated by the source Observable, useful for doing a piece of computation only when the stream starts.
- returns
a new Observable that executes the specified callback only for the first element
- Definition Classes
- ObservableLike
-
def
doOnSubscribe(cb: () ⇒ Unit): Observable[A]
Executes the given callback just before the subscription happens.
Executes the given callback just before the subscription happens.
- Definition Classes
- ObservableLike
- See also
doAfterSubscribe for executing a callback just after a subscription happens.
-
def
doOnSubscriptionCancel(cb: () ⇒ Unit): Observable[A]
Executes the given callback when the connection is being cancelled.
Executes the given callback when the connection is being cancelled.
- Definition Classes
- ObservableLike
-
def
doOnTerminate(cb: (Option[Throwable]) ⇒ Unit): Observable[A]
Executes the given callback right before the streaming is ended either with an
onComplete
oronError
event, or when the streaming stops by a downstreamStop
being signaled.Executes the given callback right before the streaming is ended either with an
onComplete
oronError
event, or when the streaming stops by a downstreamStop
being signaled.It is the equivalent of calling:
This differs from doAfterTerminate in that this happens *before* the
onComplete
oronError
notification.- Definition Classes
- ObservableLike
- See also
doOnTerminateEval for a version that allows for asynchronous evaluation by means of Task.
-
def
doOnTerminateEval(cb: (Option[Throwable]) ⇒ Task[Unit]): Observable[A]
Evaluates the task generated by the given callback right before the streaming is ended either with an
onComplete
oronError
event, or when the streaming stops by a downstreamStop
being signaled.Evaluates the task generated by the given callback right before the streaming is ended either with an
onComplete
oronError
event, or when the streaming stops by a downstreamStop
being signaled.The callback-generated Task will back-pressure the source when applied for
Stop
events returned byonNext
and thus the upstream source will receive theStop
result only after the task has finished executing.It is the equivalent of calling:
This differs from doAfterTerminateEval in that this happens *before* the
onComplete
oronError
notification.- Definition Classes
- ObservableLike
- See also
doOnTerminate for a simpler version that doesn't allow asynchronous execution.
-
def
drop(n: Int): Observable[A]
Drops the first
n
elements (from the start).Drops the first
n
elements (from the start).- n
the number of elements to drop
- returns
a new Observable that drops the first n elements emitted by the source
- Definition Classes
- ObservableLike
-
def
dropByTimespan(timespan: FiniteDuration): Observable[A]
Creates a new observable that drops the events of the source, only for the specified
timestamp
window.Creates a new observable that drops the events of the source, only for the specified
timestamp
window.- timespan
the window of time during which the new observable must drop events emitted by the source
- Definition Classes
- ObservableLike
-
def
dropLast(n: Int): Observable[A]
Drops the last
n
elements (from the end).Drops the last
n
elements (from the end).- n
the number of elements to drop
- returns
a new Observable that drops the first n elements emitted by the source
- Definition Classes
- ObservableLike
-
def
dropUntil(trigger: Observable[Any]): Observable[A]
Discard items emitted by the source until a second observable emits an item or completes.
Discard items emitted by the source until a second observable emits an item or completes.
If the
trigger
observable completes in error, then the resulting observable will also end in error when it notices it (next time an element is emitted by the source).- trigger
the observable that has to emit an item before the source begin to be mirrored by the resulting observable
- Definition Classes
- ObservableLike
-
def
dropWhile(p: (A) ⇒ Boolean): Observable[A]
Drops the longest prefix of elements that satisfy the given predicate and returns a new observable that emits the rest.
Drops the longest prefix of elements that satisfy the given predicate and returns a new observable that emits the rest.
- Definition Classes
- ObservableLike
-
def
dropWhileWithIndex(p: (A, Int) ⇒ Boolean): Observable[A]
Drops the longest prefix of elements that satisfy the given function and returns a new observable that emits the rest.
Drops the longest prefix of elements that satisfy the given function and returns a new observable that emits the rest. In comparison with dropWhile, this version accepts a function that takes an additional parameter: the zero-based index of the element.
- Definition Classes
- ObservableLike
-
def
dump(prefix: String, out: PrintStream = System.out): Observable[A]
Utility that can be used for debugging purposes.
Utility that can be used for debugging purposes.
- Definition Classes
- ObservableLike
-
def
echoOnce(timeout: FiniteDuration): Observable[A]
Mirror the source observable as long as the source keeps emitting items, otherwise if
timeout
passes without the source emitting anything new then the observable will emit the last item.Mirror the source observable as long as the source keeps emitting items, otherwise if
timeout
passes without the source emitting anything new then the observable will emit the last item.This is the rough equivalent of:
Observable.merge(source, source.debounce(period))
Note: If the source Observable keeps emitting items more frequently than the length of the time window then the resulting observable will mirror the source exactly.
- timeout
the window of silence that must pass in order for the observable to echo the last item
- Definition Classes
- ObservableLike
-
def
echoRepeated(timeout: FiniteDuration): Observable[A]
Mirror the source observable as long as the source keeps emitting items, otherwise if
timeout
passes without the source emitting anything new then the observable will start emitting the last item repeatedly.Mirror the source observable as long as the source keeps emitting items, otherwise if
timeout
passes without the source emitting anything new then the observable will start emitting the last item repeatedly.Note: If the source Observable keeps emitting items more frequently than the length of the time window then the resulting observable will mirror the source exactly.
- timeout
the window of silence that must pass in order for the observable to start echoing the last item
- Definition Classes
- ObservableLike
-
def
endWith[B >: A](elems: Seq[B]): Observable[B]
Creates a new Observable that emits the events of the source and then it also emits the given elements (appended to the stream).
Creates a new Observable that emits the events of the source and then it also emits the given elements (appended to the stream).
- Definition Classes
- ObservableLike
-
def
endWithError(error: Throwable): Observable[A]
Emits the given exception instead of
onComplete
.Emits the given exception instead of
onComplete
.- error
the exception to emit onComplete
- returns
a new Observable that emits an exception onComplete
- Definition Classes
- ObservableLike
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
executeOn(scheduler: Scheduler): Observable[A]
Specify an override for the Scheduler that will be used for subscribing and for observing the source.
Specify an override for the Scheduler that will be used for subscribing and for observing the source.
Normally the Scheduler gets injected implicitly when doing
subscribe
, but this operator overrides the injected subscriber for the given source. And if the source is normally using that injected scheduler (given bysubscribe
), then the effect will be that all processing will now happen on the override.To put it in other words, in Monix it's usually the consumer and not the producer that specifies the scheduler and this operator allows for a different behavior.
This operator also includes the effects of subscribeOn, meaning that the subscription logic itself will start on the provided scheduler.
IMPORTANT: This operator is a replacement for the observeOn operator from ReactiveX, but does not work in the same way. The
observeOn
operator forces the signaling to happen on a givenScheduler
, butexecuteOn
is more relaxed, usage is not forced, the source just gets injected with a different scheduler and it's up to the source to actually use it. This also means the effects are more far reaching, because the whole chain until the call of this operator is affected.Alias for Observable.fork(fa, scheduler).
- Definition Classes
- ObservableLike
-
def
executeWithFork: Observable[A]
Mirrors the source observable, but upon subscription ensure that the evaluation forks into a separate (logical) thread.
Mirrors the source observable, but upon subscription ensure that the evaluation forks into a separate (logical) thread.
The execution is managed by the injected scheduler in
subscribe()
.Alias for Observable.fork(fa).
- Definition Classes
- ObservableLike
-
def
executeWithModel(em: ExecutionModel): Observable[A]
Returns a new observable that will execute the source with a different ExecutionModel.
Returns a new observable that will execute the source with a different ExecutionModel.
This allows fine-tuning the options injected by the scheduler locally. Example:
observable.executeWithModel(AlwaysAsyncExecution)
- em
is the ExecutionModel that will be used when evaluating the source.
- Definition Classes
- ObservableLike
-
def
existsF(p: (A) ⇒ Boolean): Observable[Boolean]
Returns an Observable which emits a single value, either true, in case the given predicate holds for at least one item, or false otherwise.
Returns an Observable which emits a single value, either true, in case the given predicate holds for at least one item, or false otherwise.
- p
is a function that evaluates the items emitted by the source Observable, returning
true
if they pass the filter- returns
an Observable that emits only true or false in case the given predicate holds or not for at least one item
- Definition Classes
- ObservableLike
-
def
existsL(p: (A) ⇒ Boolean): Task[Boolean]
Returns a
Task
which emits eithertrue
, in case the given predicate holds for at least one item, orfalse
otherwise.Returns a
Task
which emits eithertrue
, in case the given predicate holds for at least one item, orfalse
otherwise.- p
is a function that evaluates the items emitted by the source, returning
true
if they pass the filter- returns
a task that emits
true
orfalse
in case the given predicate holds or not for at least one item
-
def
failed: Observable[Throwable]
Returns an observable that emits a single Throwable, in case an error was thrown by the source, otherwise it isn't going to emit anything.
Returns an observable that emits a single Throwable, in case an error was thrown by the source, otherwise it isn't going to emit anything.
- Definition Classes
- ObservableLike
-
def
filter(p: (A) ⇒ Boolean): Observable[A]
Only emits those items for which the given predicate holds.
Only emits those items for which the given predicate holds.
- p
a function that evaluates the items emitted by the source returning
true
if they pass the filter- returns
a new observable that emits only those items in the source for which the filter evaluates as
true
- Definition Classes
- ObservableLike
-
def
finalize(): Unit
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] )
-
def
findF(p: (A) ⇒ Boolean): Observable[A]
Returns an Observable which only emits the first item for which the predicate holds.
Returns an Observable which only emits the first item for which the predicate holds.
- p
is a function that evaluates the items emitted by the source Observable, returning
true
if they pass the filter- returns
an Observable that emits only the first item in the original Observable for which the filter evaluates as
true
- Definition Classes
- ObservableLike
-
def
findL(p: (A) ⇒ Boolean): Task[Option[A]]
Returns a task which emits the first item for which the predicate holds.
Returns a task which emits the first item for which the predicate holds.
- p
is a function that evaluates the items emitted by the source observable, returning
true
if they pass the filter- returns
a task that emits the first item in the source observable for which the filter evaluates as
true
-
def
firstL: Task[A]
Creates a new Task that upon execution will signal the first generated element of the source observable.
Creates a new Task that upon execution will signal the first generated element of the source observable.
In case the stream was empty, then the
Task
gets completed in error with aNoSuchElementException
. -
def
firstOptionL: Task[Option[A]]
Creates a new Task that upon execution will signal the first generated element of the source observable.
Creates a new Task that upon execution will signal the first generated element of the source observable.
Returns an
Option
because the source can be empty. -
def
firstOrElseF[B >: A](default: ⇒ B): Observable[B]
Emits the first element emitted by the source, or otherwise if the source is completed without emitting anything, then the
default
is emitted.Emits the first element emitted by the source, or otherwise if the source is completed without emitting anything, then the
default
is emitted.Alias for
headOrElse
.- Definition Classes
- ObservableLike
-
def
firstOrElseL[B >: A](default: ⇒ B): Task[B]
Creates a new Task that upon execution will signal the first generated element of the source observable.
Creates a new Task that upon execution will signal the first generated element of the source observable.
In case the stream was empty, then the given default gets evaluated and emitted.
-
def
flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]
Applies a function that you supply to each item emitted by the source observable, where that function returns sequences that can be observed, and then concatenating those resulting sequences and emitting the results of this concatenation.
Applies a function that you supply to each item emitted by the source observable, where that function returns sequences that can be observed, and then concatenating those resulting sequences and emitting the results of this concatenation.
Alias for concatMap.
The difference between the
concat
operation andmerge
is thatconcat
cares about the ordering of sequences (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereasmerge
doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables,concat
is safe to use in all contexts, whereasmerge
requires buffering.- Definition Classes
- ObservableLike
-
def
flatMapDelayErrors[B](f: (A) ⇒ Observable[B]): Observable[B]
Applies a function that you supply to each item emitted by the source observable, where that function returns sequences and then concatenating those resulting sequences and emitting the results of this concatenation.
Applies a function that you supply to each item emitted by the source observable, where that function returns sequences and then concatenating those resulting sequences and emitting the results of this concatenation.
It's an alias for concatMapDelayErrors.
- f
a function that, when applied to an item emitted by the source Observable, returns an Observable
- returns
an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and concatenating the results of the Observables obtained from this transformation.
- Definition Classes
- ObservableLike
-
def
flatMapLatest[B](f: (A) ⇒ Observable[B]): Observable[B]
An alias of switchMap.
An alias of switchMap.
Returns a new observable that emits the items emitted by the observable most recently generated by the mapping function.
- Definition Classes
- ObservableLike
-
def
flatScan[R](initial: ⇒ R)(op: (R, A) ⇒ Observable[R]): Observable[R]
Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.
Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.
- Definition Classes
- ObservableLike
-
def
flatScanDelayErrors[R](initial: ⇒ R)(op: (R, A) ⇒ Observable[R]): Observable[R]
Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.
Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.
This version of flatScan delays all errors until
onComplete
, when it will finally emit a CompositeException. It's the combination between scan and flatMapDelayErrors.- Definition Classes
- ObservableLike
-
def
flatten[B](implicit ev: <:<[A, Observable[B]]): Observable[B]
Concatenates the sequence of observables emitted by the source into one observable, without any transformation.
Concatenates the sequence of observables emitted by the source into one observable, without any transformation.
You can combine the items emitted by multiple observables so that they act like a single sequence by using this operator.
The difference between the
concat
operation andmerge
is thatconcat
cares about the ordering of sequences (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereasmerge
doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables,concat
is safe to use in all contexts, whereasmerge
requires buffering.Alias for concat.
- returns
an observable that emits items that are the result of flattening the items emitted by the observables emitted by the source
- Definition Classes
- ObservableLike
-
def
flattenDelayErrors[B](implicit ev: <:<[A, Observable[B]]): Observable[B]
Alias for concatDelayErrors.
Alias for concatDelayErrors.
Concatenates the sequence of observables emitted by the source into one observable, without any transformation.
You can combine the items emitted by multiple observables so that they act like a single sequence by using this operator.
The difference between the
concat
operation andmerge
is thatconcat
cares about the ordering of sequences (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereasmerge
doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables,concat
is safe to use in all contexts, whereasmerge
requires buffering.This version is reserving onError notifications until all of the observables complete and only then passing the issued errors(s) downstream. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.
- returns
an observable that emits items that are the result of flattening the items emitted by the observables emitted by the source
- Definition Classes
- ObservableLike
-
def
flattenLatest[B](implicit ev: <:<[A, Observable[B]]): Observable[B]
Alias for switch
Alias for switch
Convert an observable that emits observables into a single observable that emits the items emitted by the most-recently-emitted of those observables.
- Definition Classes
- ObservableLike
-
def
foldLeftF[R](initial: ⇒ R)(op: (R, A) ⇒ R): Observable[R]
Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits only one item before
onComplete
.Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits only one item before
onComplete
.- initial
is the initial state, specified as a possibly lazy value; it gets evaluated when the subscription happens and if it triggers an error then the subscriber will get immediately terminated with an error
- op
is an operator that will fold the signals of the source observable, returning the next state
- Definition Classes
- ObservableLike
-
def
foldLeftL[R](initial: ⇒ R)(op: (R, A) ⇒ R): Task[R]
Applies a binary operator to a start value and all elements of the source, going left to right and returns a new
Task
that upon evaluation will eventually emit the final result. -
def
foldWhileF[R](initial: ⇒ R)(op: (R, A) ⇒ (Boolean, R)): Observable[R]
Folds the source observable, from start to finish, until the source completes, or until the operator short-circuits the process by returning
false
.Folds the source observable, from start to finish, until the source completes, or until the operator short-circuits the process by returning
false
.Note that a call to foldLeftF is equivalent to this function being called with an operator always returning
true
as the first member of its result.- initial
is the initial state, specified as a possibly lazy value; it gets evaluated when the subscription happens and if it triggers an error then the subscriber will get immediately terminated with an error
- op
is an operator that will fold the signals of the source observable, returning either a new state along with a boolean that should become false in case the folding must be interrupted.
- Definition Classes
- ObservableLike
-
def
foldWhileL[R](initial: ⇒ R)(op: (R, A) ⇒ (Boolean, R)): Task[R]
Folds the source observable, from start to finish, until the source completes, or until the operator short-circuits the process by returning
false
.Folds the source observable, from start to finish, until the source completes, or until the operator short-circuits the process by returning
false
.Note that a call to foldLeftL is equivalent to this function being called with an operator always returning
true
as the first member of its result.- op
is an operator that will fold the signals of the source observable, returning either a new state along with a boolean that should become false in case the folding must be interrupted.
-
def
forAllF(p: (A) ⇒ Boolean): Observable[Boolean]
Returns an Observable that emits a single boolean, either true, in case the given predicate holds for all the items emitted by the source, or false in case at least one item is not verifying the given predicate.
Returns an Observable that emits a single boolean, either true, in case the given predicate holds for all the items emitted by the source, or false in case at least one item is not verifying the given predicate.
- p
is a function that evaluates the items emitted by the source Observable, returning
true
if they pass the filter- returns
an Observable that emits only true or false in case the given predicate holds or not for all the items
- Definition Classes
- ObservableLike
-
def
forAllL(p: (A) ⇒ Boolean): Task[Boolean]
Returns a
Task
that emits a single boolean, either true, in case the given predicate holds for all the items emitted by the source, or false in case at least one item is not verifying the given predicate.Returns a
Task
that emits a single boolean, either true, in case the given predicate holds for all the items emitted by the source, or false in case at least one item is not verifying the given predicate.- p
is a function that evaluates the items emitted by the source observable, returning
true
if they pass the filter- returns
a task that emits only true or false in case the given predicate holds or not for all the items
-
def
foreach(cb: (A) ⇒ Unit)(implicit s: Scheduler): CancelableFuture[Unit]
Subscribes to the source
Observable
and foreach element emitted by the source it executes the given callback. -
def
foreachL(cb: (A) ⇒ Unit): Task[Unit]
Creates a new Task that will consume the source observable, executing the given callback for each element.
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
-
def
groupBy[K](keySelector: (A) ⇒ K)(implicit keysBuffer: Synchronous[Nothing] = OverflowStrategy.Unbounded): Observable[GroupedObservable[K, A]]
Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.
Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.
Note: A GroupedObservable will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedObservables that do not concern you. Instead, you can signal to them that they may discard their buffers by doing something like
source.take(0)
.- keySelector
a function that extracts the key for each item
- Definition Classes
- ObservableLike
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
-
def
headF: Observable[A]
Only emits the first element emitted by the source observable, after which it's completed immediately.
Only emits the first element emitted by the source observable, after which it's completed immediately.
- Definition Classes
- ObservableLike
-
def
headL: Task[A]
Alias for firstL.
-
def
headOptionL: Task[Option[A]]
Alias for firstOptionL.
-
def
headOrElseF[B >: A](default: ⇒ B): Observable[B]
Emits the first element emitted by the source, or otherwise if the source is completed without emitting anything, then the
default
is emitted.Emits the first element emitted by the source, or otherwise if the source is completed without emitting anything, then the
default
is emitted.- Definition Classes
- ObservableLike
-
def
headOrElseL[B >: A](default: ⇒ B): Task[B]
Alias for firstOrElseL.
-
def
ignoreElements: Observable[Nothing]
Alias for completed.
Alias for completed. Ignores all items emitted by the source and only calls onCompleted or onError.
- returns
an empty sequence that only calls onCompleted or onError, based on which one is called by the source Observable
- Definition Classes
- ObservableLike
-
def
interleave[B >: A](other: Observable[B]): Observable[B]
Creates a new observable from this observable and another given observable by interleaving their items into a strictly alternating sequence.
Creates a new observable from this observable and another given observable by interleaving their items into a strictly alternating sequence.
So the first item emitted by the new observable will be the item emitted by
self
, the second item will be emitted by the other observable, and so forth; when eitherself
orother
callsonCompletes
, the items will then be directly coming from the observable that has not completed; whenonError
is called by eitherself
orother
, the new observable will callonError
and halt.See merge for a more relaxed alternative that doesn't emit items in strict alternating sequence.
- other
is an observable that interleaves with the source
- returns
a new observable sequence that alternates emission of the items from both child streams
- Definition Classes
- ObservableLike
-
def
isEmptyF: Observable[Boolean]
Returns an Observable that emits true if the source Observable is empty, otherwise false.
Returns an Observable that emits true if the source Observable is empty, otherwise false.
- Definition Classes
- ObservableLike
-
def
isEmptyL: Task[Boolean]
Returns a task that emits
true
if the source observable is empty, otherwisefalse
. -
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
def
lastF: Observable[A]
Only emits the last element emitted by the source observable, after which it's completed immediately.
Only emits the last element emitted by the source observable, after which it's completed immediately.
- Definition Classes
- ObservableLike
-
def
lastL: Task[A]
Returns a Task that upon execution will signal the last generated element of the source observable.
Returns a Task that upon execution will signal the last generated element of the source observable.
In case the stream was empty, then the
Task
gets completed in error with aNoSuchElementException
. -
def
lastOptionL: Task[Option[A]]
Returns a Task that upon execution will signal the last generated element of the source observable.
Returns a Task that upon execution will signal the last generated element of the source observable.
Returns an
Option
because the source can be empty. -
def
lastOrElseL[B >: A](default: ⇒ B): Task[B]
Creates a new Task that upon execution will signal the last generated element of the source observable.
Creates a new Task that upon execution will signal the last generated element of the source observable.
In case the stream was empty, then the given default gets evaluated and emitted.
-
def
liftByOperator[B](operator: Operator[A, B]): Observable[B]
Transforms the source using the given operator.
Transforms the source using the given operator.
- Definition Classes
- Observable → ObservableLike
-
def
map[B](f: (A) ⇒ B): Observable[B]
Returns a new observable that applies the given function to each item emitted by the source and emits the result.
Returns a new observable that applies the given function to each item emitted by the source and emits the result.
- Definition Classes
- ObservableLike
-
def
mapAsync[B](parallelism: Int)(f: (A) ⇒ Task[B]): Observable[B]
Given a mapping function that maps events to tasks, applies it in parallel on the source, but with a specified
parallelism
, which indicates the maximum number of tasks that can be executed in parallel.Given a mapping function that maps events to tasks, applies it in parallel on the source, but with a specified
parallelism
, which indicates the maximum number of tasks that can be executed in parallel.Similar in spirit with Consumer.loadBalance, but expressed as an operator that executes Task instances in parallel.
Note that when the specified
parallelism
is 1, it has the same behavior as mapTask.- parallelism
is the maximum number of tasks that can be executed in parallel, over which the source starts being back-pressured
- f
is the mapping function that produces tasks to execute in parallel, which will eventually produce events for the resulting observable stream
- Definition Classes
- ObservableLike
- See also
mapTask for serial execution
-
def
mapAsync[B](f: (A) ⇒ Task[B]): Observable[B]
Alias for mapTask.
Alias for mapTask.
- Definition Classes
- ObservableLike
-
def
mapFuture[B](f: (A) ⇒ Future[B]): Observable[B]
Maps elements from the source using a function that can do asynchronous processing by means of
scala.concurrent.Future
.Maps elements from the source using a function that can do asynchronous processing by means of
scala.concurrent.Future
.Given a source observable, this function is basically the equivalent of doing:
observable.concatMap(a => Observable.fromFuture(f(a)))
However prefer this operator to
concatMap
because it is more clear and has better performance.- Definition Classes
- ObservableLike
- See also
-
def
mapTask[B](f: (A) ⇒ Task[B]): Observable[B]
Maps elements from the source using a function that can do asynchronous processing by means of Task.
Maps elements from the source using a function that can do asynchronous processing by means of Task.
Given a source observable, this function is basically the equivalent of doing:
observable.concatMap(a => Observable.fromTask(f(a)))
However prefer this operator to
concatMap
because it is more clear and has better performance.- Definition Classes
- ObservableLike
- See also
mapFuture for the version that can work with
scala.concurrent.Future
-
def
materialize: Observable[Notification[A]]
Converts the source Observable that emits
A
into an Observable that emitsNotification[A]
.Converts the source Observable that emits
A
into an Observable that emitsNotification[A]
.- Definition Classes
- ObservableLike
-
def
maxByF[B](f: (A) ⇒ B)(implicit ev: Ordering[B]): Observable[A]
Takes the elements of the source Observable and emits the element that has the maximum key value, where the key is generated by the given function
f
.Takes the elements of the source Observable and emits the element that has the maximum key value, where the key is generated by the given function
f
.- Definition Classes
- ObservableLike
-
def
maxByL[B](f: (A) ⇒ B)(implicit ev: Ordering[B]): Task[Option[A]]
Takes the elements of the source and emits the element that has the maximum key value, where the key is generated by the given function
f
. -
def
maxF[B >: A](implicit ev: Ordering[B]): Observable[B]
Takes the elements of the source Observable and emits the maximum value, after the source has completed.
Takes the elements of the source Observable and emits the maximum value, after the source has completed.
- Definition Classes
- ObservableLike
-
def
maxL[B >: A](implicit ev: Ordering[B]): Task[Option[B]]
Takes the elements of the source and emits the maximum value, after the source has completed.
-
def
merge[B](implicit ev: <:<[A, Observable[B]], os: OverflowStrategy[B] = OverflowStrategy.Default): Observable[B]
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this operator.
- returns
an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by
this
.
- Definition Classes
- ObservableLike
- Note
this operation needs to do buffering and by not specifying an OverflowStrategy, the default strategy is being used.
-
def
mergeDelayErrors[B](implicit ev: <:<[A, Observable[B]], os: OverflowStrategy[B] = OverflowStrategy.Default): Observable[B]
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
Merges the sequence of Observables emitted by the source into one Observable, without any transformation.
You can combine the items emitted by multiple Observables so that they act like a single Observable by using this operator.
This version is reserving onError notifications until all of the observables complete and only then passing the issued errors(s) downstream. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.
- returns
an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by
this
.
- Definition Classes
- ObservableLike
- Note
this operation needs to do buffering and by not specifying an OverflowStrategy, the default strategy is being used.
-
def
mergeMap[B](f: (A) ⇒ Observable[B])(implicit os: OverflowStrategy[B] = OverflowStrategy.Default): Observable[B]
Creates a new observable by applying a function that you supply to each item emitted by the source observable, where that function returns an observable, and then merging those resulting observable and emitting the results of this merger.
Creates a new observable by applying a function that you supply to each item emitted by the source observable, where that function returns an observable, and then merging those resulting observable and emitting the results of this merger.
The difference between this and
concatMap
is thatconcatMap
cares about ordering of emitted items (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereasmerge
doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables, theconcat
operation is safe to use in all contexts, whereasmerge
requires buffering.- f
- the transformation function
- returns
an observable that emits the result of applying the transformation function to each item emitted by the source observable and merging the results of the observables obtained from this transformation.
- Definition Classes
- ObservableLike
-
def
mergeMapDelayErrors[B](f: (A) ⇒ Observable[B])(implicit os: OverflowStrategy[B] = OverflowStrategy.Default): Observable[B]
Creates a new observable by applying a function that you supply to each item emitted by the source observable, where that function returns an observable, and then merging those resulting observable and emitting the results of this merger.
Creates a new observable by applying a function that you supply to each item emitted by the source observable, where that function returns an observable, and then merging those resulting observable and emitting the results of this merger.
The difference between this and
concatMap
is thatconcatMap
cares about ordering of emitted items (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereasmerge
doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables, theconcat
operation is safe to use in all contexts, whereasmerge
requires buffering.This version is reserving onError notifications until all of the observables complete and only then passing the issued errors(s) downstream. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.
- f
- the transformation function
- returns
an observable that emits the result of applying the transformation function to each item emitted by the source observable and merging the results of the observables obtained from this transformation.
- Definition Classes
- ObservableLike
-
def
minByF[B](f: (A) ⇒ B)(implicit ev: Ordering[B]): Observable[A]
Takes the elements of the source Observable and emits the element that has the minimum key value, where the key is generated by the given function
f
.Takes the elements of the source Observable and emits the element that has the minimum key value, where the key is generated by the given function
f
.- Definition Classes
- ObservableLike
-
def
minByL[B](f: (A) ⇒ B)(implicit ev: Ordering[B]): Task[Option[A]]
Takes the elements of the source and emits the element that has the minimum key value, where the key is generated by the given function
f
. -
def
minF[B >: A](implicit ev: Ordering[B]): Observable[B]
Takes the elements of the source Observable and emits the minimum value, after the source has completed.
Takes the elements of the source Observable and emits the minimum value, after the source has completed.
- Definition Classes
- ObservableLike
-
def
minL[B >: A](implicit ev: Ordering[B]): Task[Option[B]]
Takes the elements of the source and emits the minimum value, after the source has completed.
-
def
multicast[B >: A, R](pipe: Pipe[B, R])(implicit s: Scheduler): ConnectableObservable[R]
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers).
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
nonEmptyF: Observable[Boolean]
Returns an Observable that emits false if the source Observable is empty, otherwise true.
Returns an Observable that emits false if the source Observable is empty, otherwise true.
- Definition Classes
- ObservableLike
-
def
nonEmptyL: Task[Boolean]
Returns a task that emits
false
if the source observable is empty, otherwisetrue
. -
final
def
notify(): Unit
- Definition Classes
- AnyRef
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
-
def
onCancelTriggerError: Observable[A]
If the connection is cancelled then trigger a
CancellationException
.If the connection is cancelled then trigger a
CancellationException
.A connection can be cancelled with the help of the Cancelable returned on subscribe.
Because the cancellation is effectively concurrent with the signals the Observer receives and because we need to uphold the contract, this operator will effectively synchronize access to onNext, onComplete and onError. It will also watch out for asynchronous Stop events.
In other words, this operator does heavy synchronization, can prove to be inefficient and you should avoid using it because the signaled error can interfere with functionality from other operators that use cancellation internally and cancellation in general is a side-effecting operation that should be avoided, unless it's necessary.
- Definition Classes
- ObservableLike
-
def
onErrorFallbackTo[B >: A](that: Observable[B]): Observable[B]
Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case the streaming of events continues with the specified backup sequence.Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case the streaming of events continues with the specified backup sequence.The created Observable mirrors the behavior of the source in case the source does not end with an error.
NOTE that compared with
onErrorResumeNext
from Rx.NET, the streaming is not resumed in case the source is terminated normally with anonComplete
.- that
is a backup sequence that's being subscribed in case the source terminates with an error.
- Definition Classes
- ObservableLike
-
def
onErrorHandle[B >: A](f: (Throwable) ⇒ B): Observable[B]
Returns an observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case the streaming of events fallbacks to an observable emitting a single element generated by the backup function.Returns an observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case the streaming of events fallbacks to an observable emitting a single element generated by the backup function.See onErrorRecover for the version that takes a partial function as a parameter.
- f
- a function that matches errors with a backup element that is emitted when the source throws an error.
- Definition Classes
- ObservableLike
-
def
onErrorHandleWith[B >: A](f: (Throwable) ⇒ Observable[B]): Observable[B]
Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case the streaming of events continues with the specified backup sequence generated by the given function.Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case the streaming of events continues with the specified backup sequence generated by the given function.See onErrorRecoverWith for the version that takes a partial function as a parameter.
- f
is a function that matches errors with a backup throwable that is subscribed when the source throws an error.
- Definition Classes
- ObservableLike
-
def
onErrorRecover[B >: A](pf: PartialFunction[Throwable, B]): Observable[B]
Returns an observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case the streaming of events fallbacks to an observable emitting a single element generated by the backup function.Returns an observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case the streaming of events fallbacks to an observable emitting a single element generated by the backup function.The created Observable mirrors the behavior of the source in case the source does not end with an error or if the thrown
Throwable
is not matched.See onErrorHandle for the version that takes a total function as a parameter.
- pf
- a function that matches errors with a backup element that is emitted when the source throws an error.
- Definition Classes
- ObservableLike
-
def
onErrorRecoverWith[B >: A](pf: PartialFunction[Throwable, Observable[B]]): Observable[B]
Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case the streaming of events continues with the specified backup sequence generated by the given function.Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case the streaming of events continues with the specified backup sequence generated by the given function.The created Observable mirrors the behavior of the source in case the source does not end with an error or if the thrown
Throwable
is not matched.See onErrorHandleWith for the version that takes a total function as a parameter.
- pf
is a function that matches errors with a backup throwable that is subscribed when the source throws an error.
- Definition Classes
- ObservableLike
-
def
onErrorRestart(maxRetries: Long): Observable[A]
Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case it tries subscribing to the source again in the hope that it will complete without an error.Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case it tries subscribing to the source again in the hope that it will complete without an error.The number of retries is limited by the specified
maxRetries
parameter, so for an Observable that always ends in error the total number of subscriptions that will eventually happen ismaxRetries + 1
.- Definition Classes
- ObservableLike
-
def
onErrorRestartIf(p: (Throwable) ⇒ Boolean): Observable[A]
Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case it tries subscribing to the source again in the hope that it will complete without an error.Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case it tries subscribing to the source again in the hope that it will complete without an error.The given predicate establishes if the subscription should be retried or not.
- Definition Classes
- ObservableLike
-
def
onErrorRestartUnlimited: Observable[A]
Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case it tries subscribing to the source again in the hope that it will complete without an error.Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an
onError
, in which case it tries subscribing to the source again in the hope that it will complete without an error.NOTE: The number of retries is unlimited, so something like
Observable.error(new RuntimeException).onErrorRestartUnlimited
will loop forever.- Definition Classes
- ObservableLike
-
def
pipeThrough[I >: A, B](pipe: Pipe[I, B]): Observable[B]
Given a Pipe, transform the source observable with it.
Given a Pipe, transform the source observable with it.
- Definition Classes
- ObservableLike
-
def
pipeThroughSelector[S >: A, B, R](pipe: Pipe[S, B], f: (Observable[B]) ⇒ Observable[R]): Observable[R]
Returns an observable that emits the results of invoking a specified selector on items emitted by a ConnectableObservable, which shares a single subscription to the underlying sequence.
Returns an observable that emits the results of invoking a specified selector on items emitted by a ConnectableObservable, which shares a single subscription to the underlying sequence.
- pipe
is the Pipe used to transform the source into a multicast (hot) observable that can be shared in the selector function
- f
is a selector function that can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Observers to the given source will receive all notifications of the source from the time of the subscription forward.
- Definition Classes
- ObservableLike
-
def
publish(implicit s: Scheduler): ConnectableObservable[A]
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a PublishSubject.
-
def
publishLast(implicit s: Scheduler): ConnectableObservable[A]
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a AsyncSubject.
-
def
publishSelector[R](f: (Observable[A]) ⇒ Observable[R]): Observable[R]
Returns an observable that emits the results of invoking a specified selector on items emitted by a ConnectableObservable, which shares a single subscription to the underlying sequence.
Returns an observable that emits the results of invoking a specified selector on items emitted by a ConnectableObservable, which shares a single subscription to the underlying sequence.
- f
is a selector function that can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Observers to the given source will receive all notifications of the source from the time of the subscription forward.
- Definition Classes
- ObservableLike
-
def
reduce[B >: A](op: (B, B) ⇒ B): Observable[B]
Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits only one item before
onComplete
.Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits only one item before
onComplete
.- Definition Classes
- ObservableLike
-
def
repeat: Observable[A]
Repeats the items emitted by the source continuously.
Repeats the items emitted by the source continuously. It caches the generated items until
onComplete
and repeats them forever.It terminates either on error or if the source is empty.
- Definition Classes
- ObservableLike
-
def
replay(bufferSize: Int)(implicit s: Scheduler): ConnectableObservable[A]
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a ReplaySubject.
- bufferSize
is the size of the buffer limiting the number of items that can be replayed (on overflow the head starts being dropped)
-
def
replay(implicit s: Scheduler): ConnectableObservable[A]
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a ReplaySubject.
-
def
restartUntil(p: (A) ⇒ Boolean): Observable[A]
Keeps restarting / resubscribing the source until the predicate returns
true
for the the first emitted element, after which it starts mirroring the source.Keeps restarting / resubscribing the source until the predicate returns
true
for the the first emitted element, after which it starts mirroring the source.- Definition Classes
- ObservableLike
-
def
runAsyncGetFirst(implicit s: Scheduler): CancelableFuture[Option[A]]
Creates a new CancelableFuture that upon execution will signal the last generated element of the source observable.
Creates a new CancelableFuture that upon execution will signal the last generated element of the source observable. Returns an
Option
because the source can be empty. -
def
runAsyncGetLast(implicit s: Scheduler): CancelableFuture[Option[A]]
Creates a new CancelableFuture that upon execution will signal the last generated element of the source observable.
Creates a new CancelableFuture that upon execution will signal the last generated element of the source observable. Returns an
Option
because the source can be empty. -
def
sample(period: FiniteDuration): Observable[A]
Emit the most recent items emitted by the source within periodic time intervals.
Emit the most recent items emitted by the source within periodic time intervals.
Use the
sample
operator to periodically look at an observable to see what item it has most recently emitted since the previous sampling. Note that if the source observable has emitted no items since the last time it was sampled, the observable that results from thesample
operator will emit no item for that sampling period.- period
the timespan at which sampling occurs
- Definition Classes
- ObservableLike
- See also
sampleRepeated for repeating the last value on silence
sampleBy for fine control
-
def
sampleBy[B](sampler: Observable[B]): Observable[A]
Returns an observable that, when the specified sampler emits an item or completes, emits the most recently emitted item (if any) emitted by the source since the previous emission from the sampler.
Returns an observable that, when the specified sampler emits an item or completes, emits the most recently emitted item (if any) emitted by the source since the previous emission from the sampler.
Use the
sampleBy
operator to periodically look at an observable to see what item it has most recently emitted since the previous sampling. Note that if the source observable has emitted no items since the last time it was sampled, the observable that results from thesampleBy
operator will emit no item.- sampler
- the observable to use for sampling the source
- Definition Classes
- ObservableLike
- See also
sampleRepeatedBy for repeating the last value on silence
sample for periodic sampling
-
def
sampleRepeated(period: FiniteDuration): Observable[A]
Emit the most recent items emitted by an observable within periodic time intervals.
Emit the most recent items emitted by an observable within periodic time intervals. If no new value has been emitted since the last time it was sampled, it signals the last emitted value anyway.
- period
the timespan at which sampling occurs
- Definition Classes
- ObservableLike
- See also
sampleRepeatedBy for fine control
sample for a variant that doesn't repeat the last value on silence
-
def
sampleRepeatedBy[B](sampler: Observable[B]): Observable[A]
Returns an observable that, when the specified sampler observable emits an item or completes, emits the most recently emitted item (if any) emitted by the source Observable since the previous emission from the sampler observable.
Returns an observable that, when the specified sampler observable emits an item or completes, emits the most recently emitted item (if any) emitted by the source Observable since the previous emission from the sampler observable. If no new value has been emitted since the last time it was sampled, it signals the last emitted value anyway.
- sampler
- the Observable to use for sampling the source Observable
- Definition Classes
- ObservableLike
- See also
sampleRepeated for a periodic sampling
sampleBy for a variant that doesn't repeat the last value on silence
-
def
scan[R](initial: ⇒ R)(f: (R, A) ⇒ R): Observable[R]
Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits on each step the result of the applied function.
Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits on each step the result of the applied function.
Similar to foldLeftF, but emits the state on each step. Useful for modeling finite state machines.
- Definition Classes
- ObservableLike
-
def
share(implicit s: Scheduler): Observable[A]
Returns a new Observable that multi-casts (shares) the original Observable.
-
def
startWith[B >: A](elems: Seq[B]): Observable[B]
Creates a new Observable that emits the given elements and then it also emits the events of the source (prepend operation).
Creates a new Observable that emits the given elements and then it also emits the events of the source (prepend operation).
- Definition Classes
- ObservableLike
-
def
subscribe(nextFn: (A) ⇒ Future[Ack])(implicit s: Scheduler): Cancelable
Subscribes to the stream.
Subscribes to the stream.
- returns
a subscription that can be used to cancel the streaming.
- See also
consumeWith for another way of consuming observables
-
def
subscribe()(implicit s: Scheduler): Cancelable
Subscribes to the stream.
Subscribes to the stream.
- returns
a subscription that can be used to cancel the streaming.
- See also
consumeWith for another way of consuming observables
-
def
subscribe(nextFn: (A) ⇒ Future[Ack], errorFn: (Throwable) ⇒ Unit)(implicit s: Scheduler): Cancelable
Subscribes to the stream.
Subscribes to the stream.
- returns
a subscription that can be used to cancel the streaming.
- See also
consumeWith for another way of consuming observables
-
def
subscribe(nextFn: (A) ⇒ Future[Ack], errorFn: (Throwable) ⇒ Unit, completedFn: () ⇒ Unit)(implicit s: Scheduler): Cancelable
Subscribes to the stream.
Subscribes to the stream.
- returns
a subscription that can be used to cancel the streaming.
- See also
consumeWith for another way of consuming observables
-
def
subscribe(observer: Observer[A])(implicit s: Scheduler): Cancelable
Subscribes to the stream.
Subscribes to the stream.
- returns
a subscription that can be used to cancel the streaming.
- See also
consumeWith for another way of consuming observables
-
def
subscribe(subscriber: Subscriber[A]): Cancelable
Subscribes to the stream.
Subscribes to the stream.
- returns
a subscription that can be used to cancel the streaming.
- See also
consumeWith for another way of consuming observables
-
def
subscribeOn(scheduler: Scheduler): Observable[A]
Returns a new Observable that uses the specified
Scheduler
for initiating the subscription.Returns a new Observable that uses the specified
Scheduler
for initiating the subscription.- Definition Classes
- ObservableLike
-
def
sumF[B >: A](implicit arg0: Numeric[B]): Observable[B]
Given a source that emits numeric values, the
sum
operator sums up all values and at onComplete it emits the total.Given a source that emits numeric values, the
sum
operator sums up all values and at onComplete it emits the total.- Definition Classes
- ObservableLike
-
def
sumL[B >: A](implicit B: Numeric[B]): Task[B]
Given a source that emits numeric values, the
sum
operator sums up all values and returns the result. -
def
switch[B](implicit ev: <:<[A, Observable[B]]): Observable[B]
Convert an observable that emits observables into a single observable that emits the items emitted by the most-recently-emitted of those observables.
Convert an observable that emits observables into a single observable that emits the items emitted by the most-recently-emitted of those observables.
- Definition Classes
- ObservableLike
-
def
switchIfEmpty[B >: A](backup: Observable[B]): Observable[B]
In case the source is empty, switch to the given backup.
In case the source is empty, switch to the given backup.
- Definition Classes
- ObservableLike
-
def
switchMap[B](f: (A) ⇒ Observable[B]): Observable[B]
Returns a new observable that emits the items emitted by the observable most recently generated by the mapping function.
Returns a new observable that emits the items emitted by the observable most recently generated by the mapping function.
- Definition Classes
- ObservableLike
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
def
tail: Observable[A]
Drops the first element of the source observable, emitting the rest.
Drops the first element of the source observable, emitting the rest.
- Definition Classes
- ObservableLike
-
def
take(n: Long): Observable[A]
Selects the first
n
elements (from the start).Selects the first
n
elements (from the start).- n
the number of elements to take
- returns
a new Observable that emits only the first
n
elements from the source
- Definition Classes
- ObservableLike
-
def
takeByTimespan(timespan: FiniteDuration): Observable[A]
Creates a new Observable that emits the events of the source, only for the specified
timestamp
, after which it completes.Creates a new Observable that emits the events of the source, only for the specified
timestamp
, after which it completes.- timespan
the window of time during which the new Observable is allowed to emit the events of the source
- Definition Classes
- ObservableLike
-
def
takeEveryNth(n: Int): Observable[A]
Creates a new Observable that emits every n-th event from the source, dropping intermediary events.
Creates a new Observable that emits every n-th event from the source, dropping intermediary events.
- Definition Classes
- ObservableLike
-
def
takeLast(n: Int): Observable[A]
Creates a new observable that only emits the last
n
elements emitted by the source.Creates a new observable that only emits the last
n
elements emitted by the source.In case the source triggers an error, then the underlying buffer gets dropped and the error gets emitted immediately.
- Definition Classes
- ObservableLike
-
def
takeUntil(trigger: Observable[Any]): Observable[A]
Creates a new observable that mirrors the source until the given
trigger
emits either an element oronComplete
, after which it is completed.Creates a new observable that mirrors the source until the given
trigger
emits either an element oronComplete
, after which it is completed.The resulting observable is completed as soon as
trigger
emits either anonNext
oronComplete
. Iftrigger
emits anonError
, then the resulting observable is also completed with error.- trigger
is an observable that will cancel the streaming as soon as it emits an event
- Definition Classes
- ObservableLike
-
def
takeWhile(p: (A) ⇒ Boolean): Observable[A]
Takes longest prefix of elements that satisfy the given predicate and returns a new Observable that emits those elements.
Takes longest prefix of elements that satisfy the given predicate and returns a new Observable that emits those elements.
- Definition Classes
- ObservableLike
-
def
takeWhileNotCanceled(c: BooleanCancelable): Observable[A]
Takes longest prefix of elements that satisfy the given predicate and returns a new Observable that emits those elements.
Takes longest prefix of elements that satisfy the given predicate and returns a new Observable that emits those elements.
- Definition Classes
- ObservableLike
-
def
throttleFirst(interval: FiniteDuration): Observable[A]
Returns an Observable that emits only the first item emitted by the source Observable during sequential time windows of a specified duration.
Returns an Observable that emits only the first item emitted by the source Observable during sequential time windows of a specified duration.
This differs from Observable!.throttleLast in that this only tracks passage of time whereas
throttleLast
ticks at scheduled intervals.- interval
time to wait before emitting another item after emitting the last item
- Definition Classes
- ObservableLike
-
def
throttleLast(period: FiniteDuration): Observable[A]
Emit the most recent items emitted by the source within periodic time intervals.
Emit the most recent items emitted by the source within periodic time intervals.
Alias for sample.
- period
duration of windows within which the last item emitted by the source Observable will be emitted
- Definition Classes
- ObservableLike
-
def
throttleWithTimeout(timeout: FiniteDuration): Observable[A]
Only emit an item from an observable if a particular timespan has passed without it emitting another item.
Only emit an item from an observable if a particular timespan has passed without it emitting another item.
Note: If the source observable keeps emitting items more frequently than the length of the time window, then no items will be emitted by the resulting observable.
Alias for debounce.
- timeout
the length of the window of time that must pass after the emission of an item from the source observable in which that observable emits no items in order for the item to be emitted by the resulting observable
- Definition Classes
- ObservableLike
- See also
echoOnce for a similar operator that also mirrors the source observable
-
def
timeoutOnSlowDownstream(timeout: FiniteDuration): Observable[A]
Returns an observable that mirrors the source but that will trigger a DownstreamTimeoutException in case the downstream subscriber takes more than the given timespan to process an
onNext
message.Returns an observable that mirrors the source but that will trigger a DownstreamTimeoutException in case the downstream subscriber takes more than the given timespan to process an
onNext
message.Note that this ignores the time it takes for the upstream to send
onNext
messages. For detecting slow producers see timeoutOnSlowUpstream.- timeout
maximum duration for
onNext
.
- Definition Classes
- ObservableLike
-
def
timeoutOnSlowUpstream(timeout: FiniteDuration): Observable[A]
Returns an observable that mirrors the source but applies a timeout for each emitted item by the upstream.
Returns an observable that mirrors the source but applies a timeout for each emitted item by the upstream. If the next item isn't emitted within the specified timeout duration starting from its predecessor, the resulting Observable terminates and notifies observers of a TimeoutException.
Note that this ignores the time it takes to process
onNext
. If dealing with a slow consumer, see timeoutOnSlowDownstream.- timeout
maximum duration between emitted items before a timeout occurs (ignoring the time it takes to process
onNext
)
- Definition Classes
- ObservableLike
-
def
timeoutOnSlowUpstreamTo[B >: A](timeout: FiniteDuration, backup: Observable[B]): Observable[B]
Returns an observable that mirrors the source but applies a timeout for each emitted item by the upstream.
Returns an observable that mirrors the source but applies a timeout for each emitted item by the upstream. If the next item isn't emitted within the specified timeout duration starting from its predecessor, the source is terminated and the downstream gets subscribed to the given backup.
Note that this ignores the time it takes to process
onNext
. If dealing with a slow consumer, see timeoutOnSlowDownstream.- timeout
maximum duration between emitted items before a timeout occurs (ignoring the time it takes to process
onNext
)- backup
is the alternative data source to subscribe to on timeout
- Definition Classes
- ObservableLike
-
def
toListL: Task[List[A]]
Returns a
Task
that upon evaluation will collect all items from the source in a ScalaList
and return this list instead.Returns a
Task
that upon evaluation will collect all items from the source in a ScalaList
and return this list instead.WARNING: for infinite streams the process will eventually blow up with an out of memory error.
-
def
toReactivePublisher[B >: A](implicit s: Scheduler): Publisher[B]
Wraps this Observable into a
org.reactivestreams.Publisher
.Wraps this Observable into a
org.reactivestreams.Publisher
. See the Reactive Streams protocol that Monix implements. -
def
toString(): String
- Definition Classes
- AnyRef → Any
-
def
transform[B](transformer: Transformer[A, B]): Observable[B]
Transforms the source using the given transformer function.
Transforms the source using the given transformer function.
- Definition Classes
- Observable → ObservableLike
-
def
unsafeMulticast[B >: A, R](processor: Subject[B, R])(implicit s: Scheduler): ConnectableObservable[R]
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.
Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers).
This operator is unsafe because
Subject
objects are stateful and have to obey theObserver
contract, meaning that they shouldn't be subscribed multiple times, so they are error prone. Only use if you know what you're doing, otherwise prefer the safe multicast operator. - def unsafeSubscribeFn(observer: Observer[A])(implicit s: Scheduler): Cancelable
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
def
whileBusyBuffer[B >: A](overflowStrategy: Synchronous[B]): Observable[B]
While the destination observer is busy, buffers events, applying the given overflowStrategy.
While the destination observer is busy, buffers events, applying the given overflowStrategy.
- overflowStrategy
- the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details.
- Definition Classes
- ObservableLike
-
def
whileBusyDropEvents: Observable[A]
While the destination observer is busy, drop the incoming events.
While the destination observer is busy, drop the incoming events.
- Definition Classes
- ObservableLike
-
def
whileBusyDropEventsAndSignal[B >: A](onOverflow: (Long) ⇒ B): Observable[B]
While the destination observer is busy, drop the incoming events.
While the destination observer is busy, drop the incoming events. When the downstream recovers, we can signal a special event meant to inform the downstream observer how many events where dropped.
- onOverflow
- a function that is used for signaling a special event used to inform the consumers that an overflow event happened, function that receives the number of dropped events as a parameter (see OverflowStrategy.Evicted)
- Definition Classes
- ObservableLike
-
def
withLatestFrom[B, R](other: Observable[B])(f: (A, B) ⇒ R): Observable[R]
Combines the elements emitted by the source with the latest element emitted by another observable.
Combines the elements emitted by the source with the latest element emitted by another observable.
Similar with
combineLatest
, but only emits items when the single source emits an item (not when any of the Observables that are passed to the operator do, as combineLatest does).- other
is an observable that gets paired with the source
- f
is a mapping function over the generated pairs
- Definition Classes
- ObservableLike
-
def
withLatestFrom2[B1, B2, R](o1: Observable[B1], o2: Observable[B2])(f: (A, B1, B2) ⇒ R): Observable[R]
Combines the elements emitted by the source with the latest elements emitted by two observables.
Combines the elements emitted by the source with the latest elements emitted by two observables.
Similar with
combineLatest
, but only emits items when the single source emits an item (not when any of the Observables that are passed to the operator do, as combineLatest does).- o1
is the first observable that gets paired with the source
- o2
is the second observable that gets paired with the source
- f
is a mapping function over the generated pairs
- Definition Classes
- ObservableLike
-
def
withLatestFrom3[B1, B2, B3, R](o1: Observable[B1], o2: Observable[B2], o3: Observable[B3])(f: (A, B1, B2, B3) ⇒ R): Observable[R]
Combines the elements emitted by the source with the latest elements emitted by three observables.
Combines the elements emitted by the source with the latest elements emitted by three observables.
Similar with
combineLatest
, but only emits items when the single source emits an item (not when any of the Observables that are passed to the operator do, as combineLatest does).- o1
is the first observable that gets paired with the source
- o2
is the second observable that gets paired with the source
- o3
is the third observable that gets paired with the source
- f
is a mapping function over the generated pairs
- Definition Classes
- ObservableLike
-
def
withLatestFrom4[B1, B2, B3, B4, R](o1: Observable[B1], o2: Observable[B2], o3: Observable[B3], o4: Observable[B4])(f: (A, B1, B2, B3, B4) ⇒ R): Observable[R]
Combines the elements emitted by the source with the latest elements emitted by four observables.
Combines the elements emitted by the source with the latest elements emitted by four observables.
Similar with
combineLatest
, but only emits items when the single source emits an item (not when any of the Observables that are passed to the operator do, as combineLatest does).- o1
is the first observable that gets paired with the source
- o2
is the second observable that gets paired with the source
- o3
is the third observable that gets paired with the source
- o4
is the fourth observable that gets paired with the source
- f
is a mapping function over the generated pairs
- Definition Classes
- ObservableLike
-
def
withLatestFrom5[B1, B2, B3, B4, B5, R](o1: Observable[B1], o2: Observable[B2], o3: Observable[B3], o4: Observable[B4], o5: Observable[B5])(f: (A, B1, B2, B3, B4, B5) ⇒ R): Observable[R]
Combines the elements emitted by the source with the latest elements emitted by five observables.
Combines the elements emitted by the source with the latest elements emitted by five observables.
Similar with
combineLatest
, but only emits items when the single source emits an item (not when any of the Observables that are passed to the operator do, as combineLatest does).- o1
is the first observable that gets paired with the source
- o2
is the second observable that gets paired with the source
- o3
is the third observable that gets paired with the source
- o4
is the fourth observable that gets paired with the source
- o5
is the fifth observable that gets paired with the source
- f
is a mapping function over the generated pairs
- Definition Classes
- ObservableLike
-
def
withLatestFrom6[B1, B2, B3, B4, B5, B6, R](o1: Observable[B1], o2: Observable[B2], o3: Observable[B3], o4: Observable[B4], o5: Observable[B5], o6: Observable[B6])(f: (A, B1, B2, B3, B4, B5, B6) ⇒ R): Observable[R]
Combines the elements emitted by the source with the latest elements emitted by six observables.
Combines the elements emitted by the source with the latest elements emitted by six observables.
Similar with
combineLatest
, but only emits items when the single source emits an item (not when any of the Observables that are passed to the operator do, as combineLatest does).- o1
is the first observable that gets paired with the source
- o2
is the second observable that gets paired with the source
- o3
is the third observable that gets paired with the source
- o4
is the fourth observable that gets paired with the source
- o5
is the fifth observable that gets paired with the source
- o6
is the sixth observable that gets paired with the source
- f
is a mapping function over the generated pairs
- Definition Classes
- ObservableLike
-
def
zip[B](other: Observable[B]): Observable[(A, B)]
Creates a new observable from this observable and another given observable by combining their items in pairs in a strict sequence.
Creates a new observable from this observable and another given observable by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the tuple of the first items emitted by each of the source observables; the second item emitted by the new observable will be a tuple with the second items emitted by each of those observables; and so forth.
See combineLatest for a more relaxed alternative that doesn't combine items in strict sequence.
- other
is an observable that gets paired with the source
- returns
a new observable sequence that emits the paired items of the source observables
- Definition Classes
- ObservableLike
-
def
zipMap[B, R](other: Observable[B])(f: (A, B) ⇒ R): Observable[R]
Creates a new observable from this observable and another given observable by combining their items in pairs in a strict sequence.
Creates a new observable from this observable and another given observable by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first item emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second item emitted by each of those observables; and so forth.
See combineLatestMap for a more relaxed alternative that doesn't combine items in strict sequence.
- other
is an observable that gets paired with the source
- f
is a mapping function over the generated pairs
- Definition Classes
- ObservableLike
-
def
zipWithIndex: Observable[(A, Long)]
Zips the emitted elements of the source with their indices.
Zips the emitted elements of the source with their indices.
- Definition Classes
- ObservableLike
Deprecated Value Members
-
def
runWith[R](f: Consumer[A, R]): Task[R]
Deprecated.
Deprecated. See consumeWith.
- Annotations
- @deprecated
- Deprecated
(Since version 2.1.0) Renamed to consumeWith
This is the API documentation for the Monix library.
Package Overview
monix.execution exposes lower level primitives for dealing with asynchronous execution:
Atomic
types, as alternative tojava.util.concurrent.atomic
monix.eval is for dealing with evaluation of results, thus exposing Task and Coeval.
monix.reactive exposes the
Observable
pattern:Observable
implementationsmonix.types implements type-class shims, to be translated to type-classes provided by libraries such as Cats or Scalaz.
monix.cats is the optional integration with the Cats library, providing translations for the types described in
monix.types
.monix.scalaz is the optional integration with the Scalaz library, providing translations for the types described in
monix.types
.