Trait/Object

monifu.reactive

Observable

Related Docs: object Observable | package reactive

Permalink

trait Observable[+T] extends AnyRef

The Observable interface in the Rx pattern.

Interface

An Observable is characterized by a onSubscribe method that needs to be implemented. In simple terms, an Observable might as well be just a function like:

type Observable[+T] = Subscriber[T] => Unit

In other words an Observable is something that provides a side-effecting function that can connect a Subscriber to a stream of data. A Subscriber is a cross between an Observer and a Scheduler. We need a Scheduler when calling subscribe because that's when the side-effects happen and a context capable of scheduling tasks for asynchronous execution is needed. An Observer on the other hand is the interface implemented by consumer and that receives events according to the Rx grammar.

On onSubscribe, because we need the interesting operators and the polymorphic behavior provided by OOP, the Observable is being described as an interface that has to be implemented:

class MySampleObservable(unit: Int) extends Observable[Int] {
  def onSubscribe(sub: Subscriber[Int]): Unit = {
    implicit val s = sub.scheduler
    // note we must apply back-pressure
    // when calling `onNext`
    sub.onNext(unit).onComplete {
      case Success(Cancel) =>
        () // do nothing
      case Success(Continue) =>
        sub.onComplete()
      case Failure(ex) =>
        sub.onError(ex)
    }
  }
}

Of course, you don't need to inherit from this trait, as you can just use Observable.create, the following example being equivalent to the above:

Observable.create[Int] { sub =>
  implicit val s = sub.scheduler
  // note we must apply back-pressure
  // when calling `onNext`
  sub.onNext(unit).onComplete {
    case Success(Cancel) =>
      () // do nothing
    case Success(Continue) =>
      sub.onComplete()
    case Failure(ex) =>
      sub.onError(ex)
  }
}

The above is describing how to create your own Observables, however Monifu already provides already made utilities in the Observable companion object. For example, to periodically make a request to a web service, you could do it like this:

// just some http client
import play.api.libs.ws._

// triggers an auto-incremented number every second
Observable.intervalAtFixedRate(1.second)
  .flatMap(_ => WS.request(s"http://some.endpoint.com/request").get())

As you might notice, in the above example we are doing Observable!.flatMap on an Observable that emits Future instances. And it works, because Monifu considers Scala's Futures to be just a subset of Observables, see the automatic FutureIsObservable conversion that it defines. Or you could just use Observable.fromFuture for explicit conversions, an Observable builder available amongst others.

Contract

Observables must obey Monifu's contract, this is why if you get away with already built and tested observables, that would be better than implementing your own by means of inheriting the interface or by using create. The contract is this:

On Dealing with the contract

Back-pressure means in essence that the speed with which the data-source produces events is adjusted to the speed with which the consumer consumes.

For example, lets say we want to feed an iterator into an observer, similar to what we are doing in Observer.feed, we might build a loop like this:

/** Transforms any Iterable into an Observable */
def fromIterator[T](iterable: Iterable[T]): Observable[T] =
  Observable.create { sub =>
    implicit val s = sub.scheduler
    loop(sub.observer, iterable.iterator).onComplete {
      case Success(Cancel) =>
        () // do nothing
      case Success(Continue) =>
        sub.onComplete()
      case Failed(ex) =>
        reportError(sub.observer, ex)
    }
  }

private def loop[T](o: Observer[T], iterator: Iterator[T])
  (implicit s: Scheduler): Future[Ack] = {

  try {
    if (iterator.hasNext) {
      val next = iterator.next()
      // signaling event, applying back-pressure
      o.onNext(next).flatMap {
        case Cancel => Cancel
        case Continue =>
          // signal next event (recursive, but async)
          loop(o, iterator)
      }
    }
    else {
      // nothing left to do, and because we are implementing
      // Observer.feed, the final acknowledgement is a `Continue`
      // assuming that the observer hasn't canceled or failed
      Continue
    }
  }
  catch {
    case NonFatal(ex) =>
      reportError(o, ex)
  }
}

private def reportError[T](o: Observer[T], ex: Throwable): Cancel =
  try o.onError(ex) catch {
    case NonFatal(err) =>
      // oops, onError failed, trying to
      // report it somewhere
      s.reportFailure(ex)
      s.reportFailure(err)
      Cancel
  }

There are cases in which the data-source can't be slowed down in response to the demand signaled through back-pressure. For such cases buffering is needed.

For example to "imperatively" build an Observable, we could use channels:

val channel = PublishChannel[Int](OverflowStrategy.DropNew(bufferSize = 100))

// look mum, no back-pressure concerns
channel.pushNext(1)
channel.pushNext(2)
channel.pushNext(3)
channel.pushComplete()

In Monifu a Channel is much like a Subject, meaning that it can be used to construct observables, except that a Channel has a buffer attached and IS NOT an Observer (like the Subject is). In Monifu (compared to Rx implementations) Subjects are subject to back-pressure concerns as well, so they can't be used in an imperative way, like described above, hence the need for Channels.

Or for more serious and lower level jobs, you can simply take an Observer and wrap it into a BufferedSubscriber.

Self Type
Observable[T]
Source
Observable.scala
See also

Channel, which are meant for imperatively building Observables without back-pressure concerns

Subject, which are both Observables and Observers

Cancelable, the type returned by higher level subscribe variants and that can be used to cancel subscriptions

Subscriber, the cross between an Observer and a Scheduler

Scheduler, our enhanced ExecutionContext

Observer, the interface that must be implemented by consumers

Linear Supertypes
Known Subclasses
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. Observable
  2. AnyRef
  3. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Abstract Value Members

  1. abstract def onSubscribe(subscriber: Subscriber[T]): Unit

    Permalink

    Characteristic function for an Observable instance, that creates the subscription and that eventually starts the streaming of events to the given Observer, being meant to be overridden in custom combinators or in classes implementing Observable.

    Characteristic function for an Observable instance, that creates the subscription and that eventually starts the streaming of events to the given Observer, being meant to be overridden in custom combinators or in classes implementing Observable.

    This function is "unsafe" to call because it does not protect the calls to the given Observer implementation in regards to unexpected exceptions that violate the contract, therefore the given instance must respect its contract and not throw any exceptions when the observable calls onNext, onComplete and onError. If it does, then the behavior is undefined.

    See also

    subscribe.

Concrete Value Members

  1. final def !=(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  3. def ++[U >: T](other: ⇒ Observable[U]): Observable[U]

    Permalink

    Concatenates the source Observable with the other Observable, as specified.

    Concatenates the source Observable with the other Observable, as specified.

    Ordering of subscription is preserved, so the second observable starts only after the source observable is completed successfully with an onComplete. On the other hand, the second observable is never subscribed if the source completes with an error.

    Example:
    1. val concat = Observable(1,2,3) ++ Observable(4,5)
      concat.dump("O").subscribe()
      // 0: O-->1
      // 1: O-->2
      // 2: O-->3
      // 3: O-->4
      // 4: O-->5
      // 5: O completed
  4. def +:[U >: T](elem: U): Observable[U]

    Permalink

    Creates a new Observable that emits the given element and then it also emits the events of the source (prepend operation).

    Creates a new Observable that emits the given element and then it also emits the events of the source (prepend operation).

    Example:
    1. val source = 1 +: Observable(2, 3, 4)
      source.dump("O").subscribe()
      // 0: O-->1
      // 1: O-->2
      // 2: O-->3
      // 3: O-->4
      // 4: O completed
  5. def :+[U >: T](elem: U): Observable[U]

    Permalink

    Creates a new Observable that emits the events of the source and then it also emits the given element (appended to the stream).

    Creates a new Observable that emits the events of the source and then it also emits the given element (appended to the stream).

    Example:
    1. val source = Observable(1, 2, 3) :+ 4
      source.dump("O").subscribe()
      // 0: O-->1
      // 1: O-->2
      // 2: O-->3
      // 3: O-->4
      // 4: O completed
  6. final def ==(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  7. def ambWith[U >: T](other: Observable[U]): Observable[U]

    Permalink

    Given the source observable and another Observable, emits all of the items from the first of these Observables to emit an item and cancel the other.

  8. def asFuture(implicit s: Scheduler): Future[Option[T]]

    Permalink

    Returns the first generated result as a Future and then cancels the subscription.

  9. final def asInstanceOf[T0]: T0

    Permalink
    Definition Classes
    Any
  10. def asyncBoundary[U >: T](overflowStrategy: Evicted, onOverflow: (Long) ⇒ U): Observable[U]

    Permalink

    Forces a buffered asynchronous boundary.

    Forces a buffered asynchronous boundary.

    Internally it wraps the observer implementation given to onSubscribe into a BufferedSubscriber.

    Normally Monifu's implementation guarantees that events are not emitted concurrently, and that the publisher MUST NOT emit the next event without acknowledgement from the consumer that it may proceed, however for badly behaved publishers, this wrapper provides the guarantee that the downstream Observer given in subscribe will not receive concurrent events.

    WARNING: if the buffer created by this operator is unbounded, it can blow up the process if the data source is pushing events faster than what the observer can consume, as it introduces an asynchronous boundary that eliminates the back-pressure requirements of the data source. Unbounded is the default overflowStrategy, see OverflowStrategy for options.

    overflowStrategy

    - the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details

    onOverflow

    - a function that is used for signaling a special event used to inform the consumers that an overflow event happened, function that receives the number of dropped events as a parameter (see OverflowStrategy.Evicted)

  11. def asyncBoundary(overflowStrategy: OverflowStrategy): Observable[T]

    Permalink

    Forces a buffered asynchronous boundary.

    Forces a buffered asynchronous boundary.

    Internally it wraps the observer implementation given to onSubscribe into a BufferedSubscriber.

    Normally Monifu's implementation guarantees that events are not emitted concurrently, and that the publisher MUST NOT emit the next event without acknowledgement from the consumer that it may proceed, however for badly behaved publishers, this wrapper provides the guarantee that the downstream Observer given in subscribe will not receive concurrent events.

    WARNING: if the buffer created by this operator is unbounded, it can blow up the process if the data source is pushing events faster than what the observer can consume, as it introduces an asynchronous boundary that eliminates the back-pressure requirements of the data source. Unbounded is the default overflowStrategy, see OverflowStrategy for options.

    overflowStrategy

    - the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details

  12. def behavior[U >: T](initialValue: U)(implicit s: Scheduler): ConnectableObservable[U]

    Permalink

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a BehaviorSubject.

  13. def buffer(timespan: FiniteDuration, maxSize: Int): Observable[Seq[T]]

    Permalink

    Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time.

    Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time.

    The resulting Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the timespan argument or a maximum size specified by the maxSize argument (whichever is reached first). When the source Observable completes or encounters an error, the resulting Observable emits the current buffer and propagates the notification from the source Observable.

    timespan

    the interval of time at which it should emit the buffered bundle

    maxSize

    is the maximum bundle size

  14. def buffer(timespan: FiniteDuration): Observable[Seq[T]]

    Permalink

    Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time.

    Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time.

    This version of buffer emits a new bundle of items periodically, every timespan amount of time, containing all items emitted by the source Observable since the previous bundle emission.

    timespan

    the interval of time at which it should emit the buffered bundle

  15. def buffer(count: Int, skip: Int): Observable[Seq[T]]

    Permalink

    Returns an Observable that emits buffers of items it collects from the source Observable.

    Returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable emits buffers every skip items, each containing count items. When the source Observable completes or encounters an error, the resulting Observable emits the current buffer and propagates the notification from the source Observable.

    There are 3 possibilities:

    1. in case skip == count, then there are no items dropped and no overlap, the call being equivalent to window(count) 2. in case skip < count, then overlap between windows happens, with the number of elements being repeated being count - skip 3. in case skip > count, then skip - count elements start getting dropped between windows

    count

    the maximum size of each buffer before it should be emitted

    skip

    how many items emitted by the source Observable should be skipped before starting a new buffer. Note that when skip and count are equal, this is the same operation as buffer(count)

  16. def buffer(count: Int): Observable[Seq[T]]

    Permalink

    Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time.

    Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time. This version of buffer is emitting items once the internal buffer has the reached the given count.

    count

    the maximum size of each buffer before it should be emitted

  17. def bufferIntrospective(maxSize: Int): Observable[List[T]]

    Permalink

    Buffers signals while busy, after which it emits the buffered events as a single bundle.

    Buffers signals while busy, after which it emits the buffered events as a single bundle.

    This operator starts applying back-pressure when the underlying buffer's size is exceeded.

  18. def cache(maxCapacity: Int): Observable[T]

    Permalink

    Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers.

    Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers. This method has similar behavior to replay except that this auto-subscribes to the source Observable rather than returning a ConnectableObservable for which you must call connect to activate the subscription.

    When you call cache, it does not yet subscribe to the source Observable and so does not yet begin caching items. This only happens when the first Subscriber calls the resulting Observable's subscribe method.

    maxCapacity

    is the maximum buffer size after which old events start being dropped (according to what happens when using ReplaySubject.createWithSize)

    returns

    an Observable that, when first subscribed to, caches all of its items and notifications for the benefit of subsequent subscribers

  19. def cache: Observable[T]

    Permalink

    Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers.

    Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers. This method has similar behavior to replay except that this auto-subscribes to the source Observable rather than returning a ConnectableObservable for which you must call connect to activate the subscription.

    When you call cache, it does not yet subscribe to the source Observable and so does not yet begin caching items. This only happens when the first Subscriber calls the resulting Observable's subscribe method.

    Note: You sacrifice the ability to cancel the origin when you use the cache operator so be careful not to use this on Observables that emit an infinite or very large number of items that will use up memory.

    returns

    an Observable that, when first subscribed to, caches all of its items and notifications for the benefit of subsequent subscribers

  20. def clone(): AnyRef

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  21. def collect[U](pf: PartialFunction[T, U]): Observable[U]

    Permalink

    Returns an Observable by applying the given partial function to the source observable for each element for which the given partial function is defined.

    Returns an Observable by applying the given partial function to the source observable for each element for which the given partial function is defined.

    Useful to be used instead of a filter & map combination.

    pf

    the function that filters and maps the resulting observable

    returns

    an Observable that emits the transformed items by the given partial function

  22. def combineLatest[U](other: Observable[U]): Observable[(T, U)]

    Permalink

    Creates a new Observable from this Observable and another given Observable.

    Creates a new Observable from this Observable and another given Observable.

    This operator behaves in a similar way to zip, but while zip emits items only when all of the zipped source Observables have emitted a previously unzipped item, combine emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item).

  23. def combineLatestDelayError[U](other: Observable[U]): Observable[(T, U)]

    Permalink

    Creates a new Observable from this Observable and another given Observable.

    Creates a new Observable from this Observable and another given Observable.

    This operator behaves in a similar way to zip, but while zip emits items only when all of the zipped source Observables have emitted a previously unzipped item, combine emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item).

    This version of combineLatest is reserving onError notifications until all of the combined Observables complete and only then passing it along to the observers.

    See also

    Observable!.combineLatest

  24. def complete: Observable[Nothing]

    Permalink

    Ignores all items emitted by the source Observable and only calls onCompleted or onError.

    Ignores all items emitted by the source Observable and only calls onCompleted or onError.

    returns

    an empty Observable that only calls onCompleted or onError, based on which one is called by the source Observable

  25. def concat[U](implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.

    Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.

    You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.

    The difference between the concat operation and merge is that concat cares about ordering of emitted items (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereas merge doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables, Observable!.concat is safe to use in all contexts, whereas merge requires buffering.

    returns

    an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by this

  26. def concatDelayError[U](implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.

    Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.

    You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.

    The difference between the concat operation and merge is that concat cares about ordering of emitted items (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereas merge doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables, Observable!.concat is safe to use in all contexts, whereas merge requires buffering.

    This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.

    returns

    an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by this

  27. def concatMap[U](f: (T) ⇒ Observable[U]): Observable[U]

    Permalink

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.

    f

    a function that, when applied to an item emitted by the source Observable, returns an Observable

    returns

    an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and concatenating the results of the Observables obtained from this transformation.

  28. def concatMapDelayError[U](f: (T) ⇒ Observable[U]): Observable[U]

    Permalink

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.

    It's like Observable!.concatMap, except that the created observable is reserving onError notifications until all of the merged Observables complete and only then passing it along to the observers.

    f

    a function that, when applied to an item emitted by the source Observable, returns an Observable

    returns

    an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and concatenating the results of the Observables obtained from this transformation.

  29. def count: Observable[Long]

    Permalink

    Creates a new Observable that emits the total number of onNext events that were emitted by the source.

    Creates a new Observable that emits the total number of onNext events that were emitted by the source.

    Note that this Observable emits only one item after the source is complete. And in case the source emits an error, then only that error will be emitted.

  30. def debounce[U](selector: (T) ⇒ Observable[Any], f: (T) ⇒ Observable[U]): Observable[U]

    Permalink

    Only emit an item from an Observable if a particular timespan has passed without it emitting another item, a timespan indicated by the completion of an observable generated the selector function.

    Only emit an item from an Observable if a particular timespan has passed without it emitting another item, a timespan indicated by the completion of an observable generated the selector function.

    Note: If the source Observable keeps emitting items more frequently than the length of the time window then no items will be emitted by the resulting Observable.

    selector

    function to retrieve a sequence that indicates the throttle duration for each item

    f

    is a function that receives the last element generated by the source, generating an observable to be subscribed when the source is timing out

  31. def debounce(selector: (T) ⇒ Observable[Any]): Observable[T]

    Permalink

    Only emit an item from an Observable if a particular timespan has passed without it emitting another item, a timespan indicated by the completion of an observable generated the selector function.

    Only emit an item from an Observable if a particular timespan has passed without it emitting another item, a timespan indicated by the completion of an observable generated the selector function.

    Note: If the source Observable keeps emitting items more frequently than the length of the time window then no items will be emitted by the resulting Observable.

    selector

    function to retrieve a sequence that indicates the throttle duration for each item

  32. def debounce[U](timeout: FiniteDuration, f: (T) ⇒ Observable[U]): Observable[U]

    Permalink

    Doesn't emit anything until a timeout period passes without the source emitting anything.

    Doesn't emit anything until a timeout period passes without the source emitting anything. When that timeout happens, we subscribe to the observable generated by the given function, an observable that will keep emitting until the source will break the silence by emitting another event.

    Note: If the source Observable keeps emitting items more frequently than the length of the time window, then no items will be emitted by the resulting Observable.

    timeout

    the length of the window of time that must pass after the emission of an item from the source Observable in which that Observable emits no items in order for the item to be emitted by the resulting Observable

    f

    is a function that receives the last element generated by the source, generating an observable to be subscribed when the source is timing out

  33. def debounce(timeout: FiniteDuration): Observable[T]

    Permalink

    Only emit an item from an Observable if a particular timespan has passed without it emitting another item.

    Only emit an item from an Observable if a particular timespan has passed without it emitting another item.

    Note: If the source Observable keeps emitting items more frequently than the length of the time window then no items will be emitted by the resulting Observable.

    timeout

    the length of the window of time that must pass after the emission of an item from the source Observable in which that Observable emits no items in order for the item to be emitted by the resulting Observable

    See also

    echoOnce for a similar operator that also mirrors the source observable

  34. def debounceRepeated(period: FiniteDuration): Observable[T]

    Permalink

    Emits the last item from the source Observable if a particular timespan has passed without it emitting another item, and keeps emitting that item at regular intervals until the source breaks the silence.

    Emits the last item from the source Observable if a particular timespan has passed without it emitting another item, and keeps emitting that item at regular intervals until the source breaks the silence.

    So compared to regular debounce it keeps emitting the last item of the source.

    Note: If the source Observable keeps emitting items more frequently than the length of the time window then no items will be emitted by the resulting Observable.

    period

    the length of the window of time that must pass after the emission of an item from the source Observable in which that Observable emits no items in order for the item to be emitted by the resulting Observable at regular intervals, also determined by period

    See also

    echoRepeated for a similar operator that also mirrors the source observable

  35. def defaultIfEmpty[U >: T](default: U): Observable[U]

    Permalink

    Emit items from the source Observable, or emit a default item if the source Observable completes after emitting no items.

  36. def delay[U](selector: (T) ⇒ Observable[U]): Observable[T]

    Permalink

    Returns an Observable that emits the items emitted by the source Observable shifted forward in time.

    Returns an Observable that emits the items emitted by the source Observable shifted forward in time.

    This variant of delay sets its delay duration on a per-item basis by passing each item from the source Observable into a function that returns an Observable and then monitoring those Observables. When any such Observable emits an item or completes, the Observable returned by delay emits the associated item.

    selector

    - a function that returns an Observable for each item emitted by the source Observable, which is then used to delay the emission of that item by the resulting Observable until the Observable returned from selector emits an item

    returns

    the source Observable shifted in time by the specified delay

    See also

    delay(duration) for the other variant

  37. def delay(duration: FiniteDuration): Observable[T]

    Permalink

    Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a specified delay.

    Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a specified delay.

    Each time the source Observable emits an item, delay starts a timer, and when that timer reaches the given duration, the Observable returned from delay emits the same item.

    NOTE: this delay refers strictly to the time between the onNext event coming from our source and the time it takes the downstream observer to get this event. On the other hand the operator is also applying back-pressure, so on slow observers the actual time passing between two successive events may be higher than the specified duration.

    duration

    - the delay to shift the source by

    returns

    the source Observable shifted in time by the specified delay

  38. def delaySubscription(timespan: FiniteDuration): Observable[T]

    Permalink

    Hold an Observer's subscription request for a specified amount of time before passing it on to the source Observable.

    Hold an Observer's subscription request for a specified amount of time before passing it on to the source Observable.

    timespan

    is the time to wait before the subscription is being initiated.

  39. def delaySubscription[U](trigger: Observable[U]): Observable[T]

    Permalink

    Hold an Observer's subscription request until the given trigger observable either emits an item or completes, before passing it on to the source Observable.

    Hold an Observer's subscription request until the given trigger observable either emits an item or completes, before passing it on to the source Observable.

    If the given trigger completes in error, then the subscription is terminated with onError.

    trigger

    - the observable that must either emit an item or complete in order for the source to be subscribed.

  40. def distinct[U](fn: (T) ⇒ U): Observable[T]

    Permalink

    Given a function that returns a key for each element emitted by the source Observable, suppress duplicates items.

    Given a function that returns a key for each element emitted by the source Observable, suppress duplicates items.

    WARNING: this requires unbounded buffering.

  41. def distinct: Observable[T]

    Permalink

    Suppress the duplicate elements emitted by the source Observable.

    Suppress the duplicate elements emitted by the source Observable.

    WARNING: this requires unbounded buffering.

  42. def distinctUntilChanged[U](fn: (T) ⇒ U): Observable[T]

    Permalink

    Suppress duplicate consecutive items emitted by the source Observable

  43. def distinctUntilChanged: Observable[T]

    Permalink

    Suppress duplicate consecutive items emitted by the source Observable

  44. def doOnCanceled(cb: ⇒ Unit): Observable[T]

    Permalink

    Executes the given callback if the downstream observer has canceled the streaming.

  45. def doOnComplete(cb: ⇒ Unit): Observable[T]

    Permalink

    Executes the given callback when the stream has ended, but before the complete event is emitted.

    Executes the given callback when the stream has ended, but before the complete event is emitted.

    cb

    the callback to execute when the subscription is canceled

  46. def doOnError(cb: (Throwable) ⇒ Unit): Observable[T]

    Permalink

    Executes the given callback when the stream is interrupted with an error, before the onError event is emitted downstream.

    Executes the given callback when the stream is interrupted with an error, before the onError event is emitted downstream.

    NOTE: should protect the code in this callback, because if it throws an exception the onError event will prefer signaling the original exception and otherwise the behavior is undefined.

  47. def doOnStart(cb: (T) ⇒ Unit): Observable[T]

    Permalink

    Executes the given callback only for the first element generated by the source Observable, useful for doing a piece of computation only when the stream started.

    Executes the given callback only for the first element generated by the source Observable, useful for doing a piece of computation only when the stream started.

    returns

    a new Observable that executes the specified callback only for the first element

  48. def doWork(cb: (T) ⇒ Unit): Observable[T]

    Permalink

    Executes the given callback for each element generated by the source Observable, useful for doing side-effects.

    Executes the given callback for each element generated by the source Observable, useful for doing side-effects.

    returns

    a new Observable that executes the specified callback for each element

  49. def drop(n: Int): Observable[T]

    Permalink

    Drops the first n elements (from the start).

    Drops the first n elements (from the start).

    n

    the number of elements to drop

    returns

    a new Observable that drops the first n elements emitted by the source

  50. def dropByTimespan(timespan: FiniteDuration): Observable[T]

    Permalink

    Creates a new Observable that drops the events of the source, only for the specified timestamp window.

    Creates a new Observable that drops the events of the source, only for the specified timestamp window.

    timespan

    the window of time during which the new Observable is must drop the events emitted by the source

  51. def dropWhile(p: (T) ⇒ Boolean): Observable[T]

    Permalink

    Drops the longest prefix of elements that satisfy the given predicate and returns a new Observable that emits the rest.

  52. def dropWhileWithIndex(p: (T, Int) ⇒ Boolean): Observable[T]

    Permalink

    Drops the longest prefix of elements that satisfy the given function and returns a new Observable that emits the rest.

    Drops the longest prefix of elements that satisfy the given function and returns a new Observable that emits the rest. In comparison with dropWhile, this version accepts a function that takes an additional parameter: the zero-based index of the element.

  53. def dump(prefix: String, out: PrintStream = System.out): Observable[T]

    Permalink

    Utility that can be used for debugging purposes.

  54. def echoOnce(timeout: FiniteDuration): Observable[T]

    Permalink

    Mirror the source observable as long as the source keeps emitting items, otherwise if timeout passes without the source emitting anything new then the observable will emit the last item.

    Mirror the source observable as long as the source keeps emitting items, otherwise if timeout passes without the source emitting anything new then the observable will emit the last item.

    This is the rough equivalent of:

    Observable.merge(source, source.debounce(period))

    Note: If the source Observable keeps emitting items more frequently than the length of the time window then the resulting observable will mirror the source exactly.

    timeout

    the window of silence that must pass in order for the observable to echo the last item

  55. def echoRepeated(timeout: FiniteDuration): Observable[T]

    Permalink

    Mirror the source observable as long as the source keeps emitting items, otherwise if timeout passes without the source emitting anything new then the observable will start emitting the last item repeatedly.

    Mirror the source observable as long as the source keeps emitting items, otherwise if timeout passes without the source emitting anything new then the observable will start emitting the last item repeatedly.

    This is the rough equivalent of:

    source.switch { e =>
      e +: Observable.intervalWithFixedDelay(delay, delay)
    }

    Note: If the source Observable keeps emitting items more frequently than the length of the time window then the resulting observable will mirror the source exactly.

    timeout

    the window of silence that must pass in order for the observable to start echoing the last item

  56. def endWith[U >: T](elems: U*): Observable[U]

    Permalink

    Creates a new Observable that emits the events of the source and then it also emits the given elements (appended to the stream).

  57. def endWithError(error: Throwable): Observable[T]

    Permalink

    Emits the given exception instead of onComplete.

    Emits the given exception instead of onComplete.

    error

    the exception to emit onComplete

    returns

    a new Observable that emits an exception onComplete

  58. final def eq(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  59. def equals(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  60. def error: Observable[Throwable]

    Permalink

    Returns an Observable that emits a single Throwable, in case an error was thrown by the source Observable, otherwise it isn't going to emit anything.

  61. def exists(p: (T) ⇒ Boolean): Observable[Boolean]

    Permalink

    Returns an Observable which emits a single value, either true, in case the given predicate holds for at least one item, or false otherwise.

    Returns an Observable which emits a single value, either true, in case the given predicate holds for at least one item, or false otherwise.

    p

    a function that evaluates the items emitted by the source Observable, returning true if they pass the filter

    returns

    an Observable that emits only true or false in case the given predicate holds or not for at least one item

  62. def filter(p: (T) ⇒ Boolean): Observable[T]

    Permalink

    Returns an Observable which only emits those items for which the given predicate holds.

    Returns an Observable which only emits those items for which the given predicate holds.

    p

    a function that evaluates the items emitted by the source Observable, returning true if they pass the filter

    returns

    an Observable that emits only those items in the original Observable for which the filter evaluates as true

  63. def finalize(): Unit

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  64. def find(p: (T) ⇒ Boolean): Observable[T]

    Permalink

    Returns an Observable which only emits the first item for which the predicate holds.

    Returns an Observable which only emits the first item for which the predicate holds.

    p

    a function that evaluates the items emitted by the source Observable, returning true if they pass the filter

    returns

    an Observable that emits only the first item in the original Observable for which the filter evaluates as true

  65. def firstOrElse[U >: T](default: ⇒ U): Observable[U]

    Permalink

    Emits the first element emitted by the source, or otherwise if the source is completed without emitting anything, then the default is emitted.

    Emits the first element emitted by the source, or otherwise if the source is completed without emitting anything, then the default is emitted.

    Alias for headOrElse.

  66. def flatMap[U](f: (T) ⇒ Observable[U]): Observable[U]

    Permalink

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.

    f

    a function that, when applied to an item emitted by the source Observable, returns an Observable

    returns

    an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and concatenating the results of the Observables obtained from this transformation.

  67. def flatMapDelayError[U](f: (T) ⇒ Observable[U]): Observable[U]

    Permalink

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then concatenating those resulting Observables and emitting the results of this concatenation.

    It's an alias for Observable!.concatMapDelayError.

    f

    a function that, when applied to an item emitted by the source Observable, returns an Observable

    returns

    an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and concatenating the results of the Observables obtained from this transformation.

  68. def flatMapLatest[U](f: (T) ⇒ Observable[U]): Observable[U]

    Permalink

    An alias of Observable!.switchMap.

    An alias of Observable!.switchMap.

    Returns a new Observable that emits the items emitted by the Observable most recently generated by the mapping function.

  69. def flatScan[R](initial: R)(op: (R, T) ⇒ Observable[R]): Observable[R]

    Permalink

    Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.

    Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.

    It's the combination between scan and monifu.reactive.Observable.flatten.

  70. def flatScanDelayError[R](initial: R)(op: (R, T) ⇒ Observable[R]): Observable[R]

    Permalink

    Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.

    Applies a binary operator to a start value and to elements produced by the source observable, going from left to right, producing and concatenating observables along the way.

    It's the combination between scan and monifu.reactive.Observable.flattenDelayError.

  71. def flatten[U](implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Alias for Observable!.concat.

    Alias for Observable!.concat.

    Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.

    You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.

    The difference between the concat operation and merge is that concat cares about ordering of emitted items (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereas merge doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables, Observable!.concat is safe to use in all contexts, whereas merge requires buffering.

    returns

    an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by this

  72. def flattenDelayError[U](implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Alias for Observable!.concatDelayError.

    Alias for Observable!.concatDelayError.

    Concatenates the sequence of Observables emitted by the source into one Observable, without any transformation.

    You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.

    The difference between the concat operation and merge is that concat cares about ordering of emitted items (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereas merge doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables, Observable!.concat is safe to use in all contexts, whereas merge requires buffering.

    This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.

    returns

    an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by this

  73. def flattenLatest[U](implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Alias for Observable!.switch

    Alias for Observable!.switch

    Convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables.

  74. def foldLeft[R](initial: R)(op: (R, T) ⇒ R): Observable[R]

    Permalink

    Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits only one item before onComplete.

  75. def forAll(p: (T) ⇒ Boolean): Observable[Boolean]

    Permalink

    Returns an Observable that emits a single boolean, either true, in case the given predicate holds for all the items emitted by the source, or false in case at least one item is not verifying the given predicate.

    Returns an Observable that emits a single boolean, either true, in case the given predicate holds for all the items emitted by the source, or false in case at least one item is not verifying the given predicate.

    p

    a function that evaluates the items emitted by the source Observable, returning true if they pass the filter

    returns

    an Observable that emits only true or false in case the given predicate holds or not for all the items

  76. def foreach(cb: (T) ⇒ Unit)(implicit s: Scheduler): Unit

    Permalink

    Subscribes to the source Observable and foreach element emitted by the source it executes the given callback.

  77. final def getClass(): Class[_]

    Permalink
    Definition Classes
    AnyRef → Any
  78. def groupBy[K](keyBufferSize: Int, keySelector: (T) ⇒ K): Observable[GroupedObservable[K, T]]

    Permalink

    Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.

    Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.

    A GroupedObservable will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedObservables that do not concern you. Instead, you can signal to them that they may discard their buffers by doing something like source.take(0).

    This variant of groupBy specifies a keyBufferSize representing the size of the buffer that holds our keys. We cannot block when emitting new GroupedObservable. So by specifying a buffer size, on overflow the resulting observable will terminate with an onError.

    keyBufferSize

    - the buffer size used for buffering keys

    keySelector

    - a function that extracts the key for each item

  79. def groupBy[K](keySelector: (T) ⇒ K): Observable[GroupedObservable[K, T]]

    Permalink

    Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.

    Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservable per group.

    Note: A GroupedObservable will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedObservables that do not concern you. Instead, you can signal to them that they may discard their buffers by doing something like source.take(0).

    keySelector

    - a function that extracts the key for each item

  80. def hashCode(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  81. def head: Observable[T]

    Permalink

    Only emits the first element emitted by the source observable, after which it's completed immediately.

  82. def headOrElse[B >: T](default: ⇒ B): Observable[B]

    Permalink

    Emits the first element emitted by the source, or otherwise if the source is completed without emitting anything, then the default is emitted.

  83. def ignoreElements: Observable[Nothing]

    Permalink

    Alias for Observable!.complete.

    Alias for Observable!.complete.

    Ignores all items emitted by the source Observable and only calls onCompleted or onError.

    returns

    an empty Observable that only calls onCompleted or onError, based on which one is called by the source Observable

  84. def isEmpty: Observable[Boolean]

    Permalink

    Returns an Observable that emits true if the source Observable is empty, otherwise false.

  85. final def isInstanceOf[T0]: Boolean

    Permalink
    Definition Classes
    Any
  86. def last: Observable[T]

    Permalink

    Only emits the last element emitted by the source observable, after which it's completed immediately.

  87. def lift[U](f: (Observable[T]) ⇒ Observable[U]): Observable[U]

    Permalink

    Given a function that transforms an Observable[T] into an Observable[U], it transforms the source observable into an Observable[U].

  88. def map[U](f: (T) ⇒ U): Observable[U]

    Permalink

    Returns an Observable that applies the given function to each item emitted by an Observable and emits the result.

    Returns an Observable that applies the given function to each item emitted by an Observable and emits the result.

    f

    a function to apply to each item emitted by the Observable

    returns

    an Observable that emits the items from the source Observable, transformed by the given function

  89. def materialize: Observable[Notification[T]]

    Permalink

    Converts the source Observable that emits T into an Observable that emits Notification[T].

    Converts the source Observable that emits T into an Observable that emits Notification[T].

    NOTE: onComplete is still emitted after an onNext(OnComplete) notification however an onError(ex) notification is emitted as an onNext(OnError(ex)) followed by an onComplete.

  90. def max[U >: T](implicit ev: Ordering[U]): Observable[U]

    Permalink

    Takes the elements of the source Observable and emits the maximum value, after the source has completed.

  91. def maxBy[U](f: (T) ⇒ U)(implicit ev: Ordering[U]): Observable[T]

    Permalink

    Takes the elements of the source Observable and emits the element that has the maximum key value, where the key is generated by the given function f.

  92. def merge[U](overflowStrategy: Evicted, onOverflow: (Long) ⇒ U)(implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.

    overflowStrategy

    - the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details

    onOverflow

    - a function that is used for signaling a special event used to inform the consumers that an overflow event happened, function that receives the number of dropped events as a parameter (see OverflowStrategy.Evicted)

    returns

    an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by this

  93. def merge[U](overflowStrategy: OverflowStrategy)(implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.

    overflowStrategy

    - the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details

    returns

    an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by this

  94. def merge[U](implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.

    returns

    an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by this

    Note

    this operation needs to do buffering and by not specifying an OverflowStrategy, the default strategy is being used.

  95. def mergeDelayErrors[U](overflowStrategy: Evicted, onOverflow: (Long) ⇒ U)(implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.

    This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.

    overflowStrategy

    - the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details

    onOverflow

    - a function that is used for signaling a special event used to inform the consumers that an overflow event happened, function that receives the number of dropped events as a parameter (see OverflowStrategy.Evicted)

    returns

    an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by this

  96. def mergeDelayErrors[U](overflowStrategy: OverflowStrategy)(implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.

    This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.

    overflowStrategy

    - the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details

    returns

    an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by this

  97. def mergeDelayErrors[U](implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    Merges the sequence of Observables emitted by the source into one Observable, without any transformation.

    You can combine the items emitted by multiple Observables so that they act like a single Observable by using this method.

    This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.

    returns

    an Observable that emits items that are the result of flattening the items emitted by the Observables emitted by this

    Note

    this operation needs to do buffering and by not specifying an OverflowStrategy, the default strategy is being used.

  98. def mergeMap[U](f: (T) ⇒ Observable[U]): Observable[U]

    Permalink

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.

    This function is the equivalent of observable.map(f).merge.

    The difference between concat and merge is that concat cares about ordering of emitted items (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereas merge doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables, concat is safe to use in all contexts, whereas merge requires buffering.

    f

    - the transformation function

    returns

    an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and merging the results of the Observables obtained from this transformation.

  99. def mergeMapDelayErrors[U](f: (T) ⇒ Observable[U]): Observable[U]

    Permalink

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.

    Creates a new Observable by applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.

    This function is the equivalent of observable.map(f).merge.

    The difference between concat and merge is that concat cares about ordering of emitted items (e.g. all items emitted by the first observable in the sequence will come before the elements emitted by the second observable), whereas merge doesn't care about that (elements get emitted as they come). Because of back-pressure applied to observables, concat is safe to use in all contexts, whereas merge requires buffering.

    This version is reserving onError notifications until all of the Observables complete and only then passing the issued errors(s) along to the observers. Note that the streamed error is a CompositeException, since multiple errors from multiple streams can happen.

    f

    - the transformation function

    returns

    an Observable that emits the result of applying the transformation function to each item emitted by the source Observable and merging the results of the Observables obtained from this transformation.

  100. def min[U >: T](implicit ev: Ordering[U]): Observable[U]

    Permalink

    Takes the elements of the source Observable and emits the minimum value, after the source has completed.

  101. def minBy[U](f: (T) ⇒ U)(implicit ev: Ordering[U]): Observable[T]

    Permalink

    Takes the elements of the source Observable and emits the element that has the minimum key value, where the key is generated by the given function f.

  102. def multicast[U >: T, R](subject: Subject[U, R])(implicit s: Scheduler): ConnectableObservable[R]

    Permalink

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers).

  103. final def ne(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  104. def nonEmpty: Observable[Boolean]

    Permalink

    Returns an Observable that emits false if the source Observable is empty, otherwise true.

  105. final def notify(): Unit

    Permalink
    Definition Classes
    AnyRef
  106. final def notifyAll(): Unit

    Permalink
    Definition Classes
    AnyRef
  107. def onErrorFallbackTo[U >: T](that: ⇒ Observable[U]): Observable[U]

    Permalink

    Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an onError, in which case the streaming of events continues with the specified backup sequence.

    Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an onError, in which case the streaming of events continues with the specified backup sequence.

    The created Observable mirrors the behavior of the source in case the source does not end with an error.

    NOTE that compared with onErrorResumeNext from Rx.NET, the streaming is not resumed in case the source is terminated normally with an onComplete.

    that

    - a backup sequence that's being subscribed in case the source terminates with an error.

  108. def onErrorRecoverWith[U >: T](pf: PartialFunction[Throwable, Observable[U]]): Observable[U]

    Permalink

    Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an onError, in which case the streaming of events continues with the specified backup sequence generated by the given partial function.

    Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an onError, in which case the streaming of events continues with the specified backup sequence generated by the given partial function.

    The created Observable mirrors the behavior of the source in case the source does not end with an error or if the thrown Throwable is not matched.

    NOTE that compared with onErrorResumeNext from Rx.NET, the streaming is not resumed in case the source is terminated normally with an onComplete.

    pf

    - a partial function that matches errors with a backup throwable that is subscribed when the source throws an error.

  109. def onErrorRetry(maxRetries: Long): Observable[T]

    Permalink

    Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an onError, in which case it tries subscribing to the source again in the hope that it will complete without an error.

    Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an onError, in which case it tries subscribing to the source again in the hope that it will complete without an error.

    The number of retries is limited by the specified maxRetries parameter, so for an Observable that always ends in error the total number of subscriptions that will eventually happen is maxRetries + 1.

  110. def onErrorRetryIf(p: (Throwable) ⇒ Boolean): Observable[T]

    Permalink

    Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an onError, in which case it tries subscribing to the source again in the hope that it will complete without an error.

    Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an onError, in which case it tries subscribing to the source again in the hope that it will complete without an error.

    The given predicate establishes if the subscription should be retried or not.

  111. def onErrorRetryUnlimited: Observable[T]

    Permalink

    Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an onError, in which case it tries subscribing to the source again in the hope that it will complete without an error.

    Returns an Observable that mirrors the behavior of the source, unless the source is terminated with an onError, in which case it tries subscribing to the source again in the hope that it will complete without an error.

    NOTE: The number of retries is unlimited, so something like Observable.error(new RuntimeException).onErrorRetryUnlimited will loop forever.

  112. def onSubscribe(observer: Observer[T])(implicit s: Scheduler): Unit

    Permalink

    Subscribes to the stream.

    Subscribes to the stream.

    This function is "unsafe" to call because it does not protect the calls to the given Observer implementation in regards to unexpected exceptions that violate the contract, therefore the given instance must respect its contract and not throw any exceptions when the observable calls onNext, onComplete and onError. If it does, then the behavior is undefined.

    observer

    is an Observer that respects the Monifu Rx contract

    s

    is the Scheduler used for creating the subscription

  113. def publish(implicit s: Scheduler): ConnectableObservable[T]

    Permalink

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a PublishSubject.

  114. def publishLast(implicit s: Scheduler): ConnectableObservable[T]

    Permalink

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a AsyncSubject.

  115. def reduce[U >: T](op: (U, U) ⇒ U): Observable[U]

    Permalink

    Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits only one item before onComplete.

  116. def repeat: Observable[T]

    Permalink

    Repeats the items emitted by this Observable continuously.

    Repeats the items emitted by this Observable continuously. It caches the generated items until onComplete and repeats them ad infinitum. On error it terminates.

  117. def replay(bufferSize: Int)(implicit s: Scheduler): ConnectableObservable[T]

    Permalink

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a ReplaySubject.

    bufferSize

    is the size of the buffer limiting the number of items that can be replayed (on overflow the head starts being dropped)

  118. def replay(implicit s: Scheduler): ConnectableObservable[T]

    Permalink

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e.

    Converts this observable into a multicast observable, useful for turning a cold observable into a hot one (i.e. whose source is shared by all observers). The underlying subject used is a ReplaySubject.

  119. def sample[U](sampler: Observable[U]): Observable[T]

    Permalink

    Returns an Observable that, when the specified sampler Observable emits an item or completes, emits the most recently emitted item (if any) emitted by the source Observable since the previous emission from the sampler Observable.

    Returns an Observable that, when the specified sampler Observable emits an item or completes, emits the most recently emitted item (if any) emitted by the source Observable since the previous emission from the sampler Observable.

    Use the sample() method to periodically look at an Observable to see what item it has most recently emitted since the previous sampling. Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from the sample( ) operator will emit no item.

    sampler

    - the Observable to use for sampling the source Observable

  120. def sample(initialDelay: FiniteDuration, delay: FiniteDuration): Observable[T]

    Permalink

    Emit the most recent items emitted by an Observable within periodic time intervals.

    Emit the most recent items emitted by an Observable within periodic time intervals.

    Use the sample() method to periodically look at an Observable to see what item it has most recently emitted since the previous sampling. Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from the sample( ) operator will emit no item for that sampling period.

    initialDelay

    the initial delay after which sampling can happen

    delay

    the timespan at which sampling occurs and note that this is not accurate as it is subject to back-pressure concerns - as in if the delay is 1 second and the processing of an event on onNext in the observer takes one second, then the actual sampling delay will be 2 seconds.

  121. def sample(delay: FiniteDuration): Observable[T]

    Permalink

    Emit the most recent items emitted by an Observable within periodic time intervals.

    Emit the most recent items emitted by an Observable within periodic time intervals.

    Use the sample operator to periodically look at an Observable to see what item it has most recently emitted since the previous sampling. Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from the sample( ) operator will emit no item for that sampling period.

    delay

    the timespan at which sampling occurs and note that this is not accurate as it is subject to back-pressure concerns - as in if the delay is 1 second and the processing of an event on onNext in the observer takes one second, then the actual sampling delay will be 2 seconds.

  122. def sampleRepeated[U](sampler: Observable[U]): Observable[T]

    Permalink

    Returns an Observable that, when the specified sampler Observable emits an item or completes, emits the most recently emitted item (if any) emitted by the source Observable since the previous emission from the sampler Observable.

    Returns an Observable that, when the specified sampler Observable emits an item or completes, emits the most recently emitted item (if any) emitted by the source Observable since the previous emission from the sampler Observable. If no new value has been emitted since the last time it was sampled, the emit the last emitted value anyway.

    sampler

    - the Observable to use for sampling the source Observable

    See also

    Observable.sample

  123. def sampleRepeated(initialDelay: FiniteDuration, delay: FiniteDuration): Observable[T]

    Permalink

    Emit the most recent items emitted by an Observable within periodic time intervals.

    Emit the most recent items emitted by an Observable within periodic time intervals. If no new value has been emitted since the last time it was sampled, the emit the last emitted value anyway.

    Also see sample.

    initialDelay

    the initial delay after which sampling can happen

    delay

    the timespan at which sampling occurs and note that this is not accurate as it is subject to back-pressure concerns - as in if the delay is 1 second and the processing of an event on onNext in the observer takes one second, then the actual sampling delay will be 2 seconds.

  124. def sampleRepeated(delay: FiniteDuration): Observable[T]

    Permalink

    Emit the most recent items emitted by an Observable within periodic time intervals.

    Emit the most recent items emitted by an Observable within periodic time intervals. If no new value has been emitted since the last time it was sampled, the emit the last emitted value anyway.

    Also see Observable.sample.

    delay

    the timespan at which sampling occurs and note that this is not accurate as it is subject to back-pressure concerns - as in if the delay is 1 second and the processing of an event on onNext in the observer takes one second, then the actual sampling delay will be 2 seconds.

  125. def scan[R](initial: R)(op: (R, T) ⇒ R): Observable[R]

    Permalink

    Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits on each step the result of the applied function.

    Applies a binary operator to a start value and all elements of this Observable, going left to right and returns a new Observable that emits on each step the result of the applied function.

    Similar to foldLeft, but emits the state on each step. Useful for modeling finite state machines.

  126. def share(implicit s: Scheduler): Observable[T]

    Permalink

    Returns a new Observable that multi-casts (shares) the original Observable.

  127. def startWith[U >: T](elems: U*): Observable[U]

    Permalink

    Creates a new Observable that emits the given elements and then it also emits the events of the source (prepend operation).

  128. def subscribe(nextFn: (T) ⇒ Future[Ack])(implicit s: Scheduler): BooleanCancelable

    Permalink

    Subscribes to the stream.

    Subscribes to the stream.

    returns

    a subscription that can be used to cancel the streaming.

  129. def subscribe()(implicit s: Scheduler): Cancelable

    Permalink

    Subscribes to the stream.

    Subscribes to the stream.

    returns

    a subscription that can be used to cancel the streaming.

  130. def subscribe(nextFn: (T) ⇒ Future[Ack], errorFn: (Throwable) ⇒ Unit)(implicit s: Scheduler): BooleanCancelable

    Permalink

    Subscribes to the stream.

    Subscribes to the stream.

    returns

    a subscription that can be used to cancel the streaming.

  131. def subscribe(nextFn: (T) ⇒ Future[Ack], errorFn: (Throwable) ⇒ Unit, completedFn: () ⇒ Unit)(implicit s: Scheduler): BooleanCancelable

    Permalink

    Subscribes to the stream.

    Subscribes to the stream.

    returns

    a subscription that can be used to cancel the streaming.

  132. def subscribe(observer: Observer[T])(implicit s: Scheduler): BooleanCancelable

    Permalink

    Subscribes to the stream.

    Subscribes to the stream.

    returns

    a subscription that can be used to cancel the streaming.

  133. def subscribe(subscriber: Subscriber[T]): BooleanCancelable

    Permalink

    Subscribes to the stream.

    Subscribes to the stream.

    returns

    a subscription that can be used to cancel the streaming.

  134. def subscribeOn(s: Scheduler): Observable[T]

    Permalink

    Returns a new Observable that uses the specified Scheduler for initiating the subscription.

  135. def sum[U >: T](implicit ev: Numeric[U]): Observable[U]

    Permalink

    Given a source that emits numeric values, the sum operator sums up all values and at onComplete it emits the total.

  136. def switch[U](implicit ev: <:<[T, Observable[U]]): Observable[U]

    Permalink

    Convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables.

  137. def switchMap[U](f: (T) ⇒ Observable[U]): Observable[U]

    Permalink

    Returns a new Observable that emits the items emitted by the Observable most recently generated by the mapping function.

  138. final def synchronized[T0](arg0: ⇒ T0): T0

    Permalink
    Definition Classes
    AnyRef
  139. def tail: Observable[T]

    Permalink

    Drops the first element of the source observable, emitting the rest.

  140. def take(timespan: FiniteDuration): Observable[T]

    Permalink

    Creates a new Observable that emits the events of the source, only for the specified timestamp, after which it completes.

    Creates a new Observable that emits the events of the source, only for the specified timestamp, after which it completes.

    timespan

    the window of time during which the new Observable is allowed to emit the events of the source

  141. def take(n: Long): Observable[T]

    Permalink

    Selects the first n elements (from the start).

    Selects the first n elements (from the start).

    n

    the number of elements to take

    returns

    a new Observable that emits only the first n elements from the source

  142. def takeRight(n: Int): Observable[T]

    Permalink

    Creates a new Observable that only emits the last n elements emitted by the source.

  143. def takeWhile(p: (T) ⇒ Boolean): Observable[T]

    Permalink

    Takes longest prefix of elements that satisfy the given predicate and returns a new Observable that emits those elements.

  144. def takeWhileNotCanceled(c: BooleanCancelable): Observable[T]

    Permalink

    Takes longest prefix of elements that satisfy the given predicate and returns a new Observable that emits those elements.

  145. def throttleFirst(interval: FiniteDuration): Observable[T]

    Permalink

    Returns an Observable that emits only the first item emitted by the source Observable during sequential time windows of a specified duration.

    Returns an Observable that emits only the first item emitted by the source Observable during sequential time windows of a specified duration.

    This differs from Observable!.throttleLast in that this only tracks passage of time whereas throttleLast ticks at scheduled intervals.

    interval

    time to wait before emitting another item after emitting the last item

  146. def throttleLast(period: FiniteDuration): Observable[T]

    Permalink

    Returns an Observable that emits only the last item emitted by the source Observable during sequential time windows of a specified duration.

    Returns an Observable that emits only the last item emitted by the source Observable during sequential time windows of a specified duration.

    This differs from Observable!.throttleFirst in that this ticks along at a scheduled interval whereas throttleFirst does not tick, it just tracks passage of time.

    period

    - duration of windows within which the last item emitted by the source Observable will be emitted

  147. def throttleWithTimeout(timeout: FiniteDuration): Observable[T]

    Permalink

    Alias for debounce.

    Alias for debounce.

    Returns an Observable that only emits those items emitted by the source Observable that are not followed by another emitted item within a specified time window.

    Note: If the source Observable keeps emitting items more frequently than the length of the time window then no items will be emitted by the resulting Observable.

    timeout

    - the length of the window of time that must pass after the emission of an item from the source Observable in which that Observable emits no items in order for the item to be emitted by the resulting Observable

  148. def timeout[U >: T](timeout: FiniteDuration, backup: Observable[U]): Observable[U]

    Permalink

    Returns an Observable that mirrors the source Observable but applies a timeout overflowStrategy for each emitted item.

    Returns an Observable that mirrors the source Observable but applies a timeout overflowStrategy for each emitted item. If the next item isn't emitted within the specified timeout duration starting from its predecessor, the resulting Observable begins instead to mirror a backup Observable.

    timeout

    maximum duration between emitted items before a timeout occurs

    backup

    is the backup observable to subscribe to in case of a timeout

  149. def timeout(timeout: FiniteDuration): Observable[T]

    Permalink

    Returns an Observable that mirrors the source Observable but applies a timeout overflowStrategy for each emitted item.

    Returns an Observable that mirrors the source Observable but applies a timeout overflowStrategy for each emitted item. If the next item isn't emitted within the specified timeout duration starting from its predecessor, the resulting Observable terminates and notifies observers of a TimeoutException.

    timeout

    maximum duration between emitted items before a timeout occurs

  150. def toReactive[U >: T](implicit s: Scheduler): Publisher[U]

    Permalink

    Wraps this Observable into a org.reactivestreams.Publisher.

    Wraps this Observable into a org.reactivestreams.Publisher. See the Reactive Streams protocol that Monifu implements.

  151. def toString(): String

    Permalink
    Definition Classes
    AnyRef → Any
  152. final def wait(): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  153. final def wait(arg0: Long, arg1: Int): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  154. final def wait(arg0: Long): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  155. def whileBusyBuffer[U >: T](overflowStrategy: Evicted, onOverflow: (Long) ⇒ U): Observable[U]

    Permalink

    While the destination observer is busy, buffers events, applying the given overflowStrategy.

    While the destination observer is busy, buffers events, applying the given overflowStrategy.

    overflowStrategy

    - the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details

    onOverflow

    - a function that is used for signaling a special event used to inform the consumers that an overflow event happened, function that receives the number of dropped events as a parameter (see OverflowStrategy.Evicted)

  156. def whileBusyBuffer[U >: T](overflowStrategy: Synchronous): Observable[U]

    Permalink

    While the destination observer is busy, buffers events, applying the given overflowStrategy.

    While the destination observer is busy, buffers events, applying the given overflowStrategy.

    overflowStrategy

    - the overflow strategy used for buffering, which specifies what to do in case we're dealing with a slow consumer - should an unbounded buffer be used, should back-pressure be applied, should the pipeline drop newer or older events, should it drop the whole buffer? See OverflowStrategy for more details

  157. def whileBusyDropEvents[U >: T](onOverflow: (Long) ⇒ U): Observable[U]

    Permalink

    While the destination observer is busy, drop the incoming events.

    While the destination observer is busy, drop the incoming events. When the downstream recovers, we can signal a special event meant to inform the downstream observer how many events where dropped.

    onOverflow

    - a function that is used for signaling a special event used to inform the consumers that an overflow event happened, function that receives the number of dropped events as a parameter (see OverflowStrategy.Evicted)

  158. def whileBusyDropEvents: Observable[T]

    Permalink

    While the destination observer is busy, drop the incoming events.

  159. def window(timespan: FiniteDuration, maxCount: Int): Observable[Observable[T]]

    Permalink

    Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.

    Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.

    The resulting Observable emits connected, non-overlapping windows, each of a fixed duration specified by the timespan argument. When the source Observable completes or encounters an error, the resulting Observable emits the current window and propagates the notification from the source Observable.

    timespan

    the interval of time at which it should complete the current window and emit a new one

    maxCount

    the maximum size of each window

  160. def window(timespan: FiniteDuration): Observable[Observable[T]]

    Permalink

    Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.

    Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.

    The resulting Observable emits connected, non-overlapping windows, each of a fixed duration specified by the timespan argument. When the source Observable completes or encounters an error, the resulting Observable emits the current window and propagates the notification from the source Observable.

    timespan

    the interval of time at which it should complete the current window and emit a new one

  161. def window(count: Int, skip: Int): Observable[Observable[T]]

    Permalink

    Returns an Observable that emits windows of items it collects from the source Observable.

    Returns an Observable that emits windows of items it collects from the source Observable. The resulting Observable emits windows every skip items, each containing no more than count items. When the source Observable completes or encounters an error, the resulting Observable emits the current window and propagates the notification from the source Observable.

    There are 3 possibilities:

    1. in case skip == count, then there are no items dropped and no overlap, the call being equivalent to window(count) 2. in case skip < count, then overlap between windows happens, with the number of elements being repeated being count - skip 3. in case skip > count, then skip - count elements start getting dropped between windows

    count

    - the maximum size of each window before it should be emitted

    skip

    - how many items need to be skipped before starting a new window

  162. def window(count: Int): Observable[Observable[T]]

    Permalink

    Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.

    Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time.

    This variant of window opens its first window immediately. It closes the currently open window and immediately opens a new one whenever the current window has emitted count items. It will also close the currently open window if it receives an onCompleted or onError notification from the source Observable. This variant of window emits a series of non-overlapping windows whose collective emissions correspond one-to-one with those of the source Observable.

    count

    the bundle size

  163. def zip[U](other: Observable[U]): Observable[(T, U)]

    Permalink

    Creates a new Observable from this Observable and another given Observable, by emitting elements combined in pairs.

    Creates a new Observable from this Observable and another given Observable, by emitting elements combined in pairs. If one of the Observable emits fewer events than the other, then the rest of the unpaired events are ignored.

  164. def zipWithIndex: Observable[(T, Long)]

    Permalink

    Zips the emitted elements of the source with their indices.

Inherited from AnyRef

Inherited from Any

Ungrouped