object Observable extends Serializable
- Alphabetic
- By Inheritance
- Observable
- Serializable
- Serializable
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Type Members
-
class
TypeClassInstances extends Instance[Observable] with Instance[Observable] with Instance[Observable, Throwable] with Instance[Observable] with Instance[Observable] with Instance[Observable] with Instance[Observable]
Type-class instances for Observable.
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
apply[A](elems: A*): Observable[A]
Given a sequence of elements, builds an observable from it.
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
clone(): AnyRef
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @native() @throws( ... )
-
def
coeval[A](value: Coeval[A]): Observable[A]
Transforms a non-strict Coeval value into an
Observable
that emits a single element. -
def
combineLatest2[A1, A2](oa1: Observable[A1], oa2: Observable[A2]): Observable[(A1, A2)]
Creates a combined observable from 2 source observables.
Creates a combined observable from 2 source observables.
This operator behaves in a similar way to zip2, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). -
def
combineLatest3[A1, A2, A3](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3]): Observable[(A1, A2, A3)]
Creates a combined observable from 3 source observables.
Creates a combined observable from 3 source observables.
This operator behaves in a similar way to zip3, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). -
def
combineLatest4[A1, A2, A3, A4](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4]): Observable[(A1, A2, A3, A4)]
Creates a combined observable from 4 source observables.
Creates a combined observable from 4 source observables.
This operator behaves in a similar way to zip4, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). -
def
combineLatest5[A1, A2, A3, A4, A5](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5]): Observable[(A1, A2, A3, A4, A5)]
Creates a combined observable from 5 source observables.
Creates a combined observable from 5 source observables.
This operator behaves in a similar way to zip5, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). -
def
combineLatest6[A1, A2, A3, A4, A5, A6](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5], oa6: Observable[A6]): Observable[(A1, A2, A3, A4, A5, A6)]
Creates a combined observable from 6 source observables.
Creates a combined observable from 6 source observables.
This operator behaves in a similar way to zip6, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). -
def
combineLatestList[A](sources: Observable[A]*): Observable[Seq[A]]
Given an observable sequence, it combines them together (using combineLatest) returning a new observable that generates sequences.
-
def
combineLatestMap2[A1, A2, R](oa1: Observable[A1], oa2: Observable[A2])(f: (A1, A2) ⇒ R): Observable[R]
Creates a combined observable from 2 source observables.
Creates a combined observable from 2 source observables.
This operator behaves in a similar way to zipMap2, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). -
def
combineLatestMap3[A1, A2, A3, R](a1: Observable[A1], a2: Observable[A2], a3: Observable[A3])(f: (A1, A2, A3) ⇒ R): Observable[R]
Creates a combined observable from 3 source observables.
Creates a combined observable from 3 source observables.
This operator behaves in a similar way to zipMap3, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). -
def
combineLatestMap4[A1, A2, A3, A4, R](a1: Observable[A1], a2: Observable[A2], a3: Observable[A3], a4: Observable[A4])(f: (A1, A2, A3, A4) ⇒ R): Observable[R]
Creates a combined observable from 4 source observables.
Creates a combined observable from 4 source observables.
This operator behaves in a similar way to zipMap4, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). -
def
combineLatestMap5[A1, A2, A3, A4, A5, R](a1: Observable[A1], a2: Observable[A2], a3: Observable[A3], a4: Observable[A4], a5: Observable[A5])(f: (A1, A2, A3, A4, A5) ⇒ R): Observable[R]
Creates a combined observable from 5 source observables.
Creates a combined observable from 5 source observables.
This operator behaves in a similar way to zipMap5, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). -
def
combineLatestMap6[A1, A2, A3, A4, A5, A6, R](a1: Observable[A1], a2: Observable[A2], a3: Observable[A3], a4: Observable[A4], a5: Observable[A5], a6: Observable[A6])(f: (A1, A2, A3, A4, A5, A6) ⇒ R): Observable[R]
Creates a combined observable from 6 source observables.
Creates a combined observable from 6 source observables.
This operator behaves in a similar way to zipMap6, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). -
def
concat[A](sources: Observable[A]*): Observable[A]
Concatenates the given list of observables into a single observable.
-
def
concatDelayError[A](sources: Observable[A]*): Observable[A]
Concatenates the given list of observables into a single observable.
Concatenates the given list of observables into a single observable. Delays errors until the end.
-
def
cons[A](head: A, tail: Observable[A]): Observable[A]
Builds a new observable from a strict
head
and a lazily evaluated head. -
def
create[A](overflowStrategy: Synchronous[A])(f: (Sync[A]) ⇒ Cancelable): Observable[A]
Creates an observable from a function that receives a concurrent and safe Subscriber.Sync.
Creates an observable from a function that receives a concurrent and safe Subscriber.Sync.
This builder represents the safe way of building observables from data-sources that cannot be back-pressured.
-
def
defer[A](fa: ⇒ Observable[A]): Observable[A]
Returns a new observable that creates a sequence from the given factory on each subscription.
-
def
delay[A](a: ⇒ A): Observable[A]
Alias for eval.
-
def
empty[A]: Observable[A]
Creates an observable that doesn't emit anything, but immediately calls
onComplete
instead. -
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
eval[A](a: ⇒ A): Observable[A]
Given a non-strict value, converts it into an Observable that upon subscription, evaluates the expression and emits a single element.
-
def
evalDelayed[A](delay: FiniteDuration, a: ⇒ A): Observable[A]
Lifts a non-strict value into an observable that emits a single element, but upon subscription delay its evaluation by the specified timespan
-
def
evalOnce[A](f: ⇒ A): Observable[A]
Given a non-strict value, converts it into an Observable that emits a single element and that memoizes the value for subsequent invocations.
-
def
finalize(): Unit
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] )
-
def
firstStartedOf[A](source: Observable[A]*): Observable[A]
Given a list of source Observables, emits all of the items from the first of these Observables to emit an item or to complete, and cancel the rest.
-
def
flatten[A](sources: Observable[A]*): Observable[A]
Concatenates the given list of observables into a single observable.
-
def
flattenDelayError[A](sources: Observable[A]*): Observable[A]
Concatenates the given list of observables into a single observable.
Concatenates the given list of observables into a single observable. Delays errors until the end.
-
def
fork[A](fa: Observable[A], scheduler: Scheduler): Observable[A]
Mirrors the given source Observable, but upon subscription ensure that evaluation forks into a separate (logical) thread, managed by the given scheduler, which will also be used for subsequent asynchronous execution, overriding the default.
Mirrors the given source Observable, but upon subscription ensure that evaluation forks into a separate (logical) thread, managed by the given scheduler, which will also be used for subsequent asynchronous execution, overriding the default.
The given Scheduler will be used for evaluation of the source Observable, effectively overriding the
Scheduler
that's passed insubscribe()
. Thus you can evaluate an observable on a separate thread-pool, useful for example in case of doing I/O.- fa
is the source observable that will evaluate asynchronously on the specified scheduler
- scheduler
is the scheduler to use for evaluation
-
def
fork[A](fa: Observable[A]): Observable[A]
Mirrors the given source Observable, but upon subscription ensure that evaluation forks into a separate (logical) thread.
Mirrors the given source Observable, but upon subscription ensure that evaluation forks into a separate (logical) thread.
The Scheduler used will be the one that is injected in
subscribe()
.- fa
is the observable that will get subscribed asynchronously
-
def
fromAsyncStateAction[S, A](f: (S) ⇒ Task[(A, S)])(initialState: ⇒ S): Observable[A]
Given an initial state and a generator function that produces the next state and the next element in the sequence, creates an observable that keeps generating elements produced by our generator function.
-
def
fromCharsReader(in: Reader, chunkSize: Int): Observable[Array[Char]]
Converts a
java.io.Reader
into an observable that will emitArray[Char]
elements.Converts a
java.io.Reader
into an observable that will emitArray[Char]
elements.WARNING: reading from a reader is a destructive process. Therefore only a single subscriber is supported, the result being a single-subscriber observable. If multiple subscribers are attempted, all subscribers, except for the first one, will be terminated with a APIContractViolationException.
Therefore, if you need a factory of data sources, from a cold source such as a
java.io.File
from which you can open how many file handles you want, you can use Observable.defer to build such a factory. Or you can share the resulting observable by converting it into a ConnectableObservable by means of multicast.- in
is the
Reader
to convert into an observable- chunkSize
is the maximum length of the emitted arrays of chars. It's also used when reading from the reader.
-
def
fromCharsReader(in: Reader): Observable[Array[Char]]
Converts a
java.io.Reader
into an observable that will emitArray[Char]
elements.Converts a
java.io.Reader
into an observable that will emitArray[Char]
elements.WARNING: reading from a reader is a destructive process. Therefore only a single subscriber is supported, the result being a single-subscriber observable. If multiple subscribers are attempted, all subscribers, except for the first one, will be terminated with a APIContractViolationException.
Therefore, if you need a factory of data sources, from a cold source such as a
java.io.File
from which you can open how many file handles you want, you can use Observable.defer to build such a factory. Or you can share the resulting observable by converting it into a ConnectableObservable by means of multicast.- in
is the
Reader
to convert into an observable
-
def
fromFuture[A](factory: ⇒ Future[A]): Observable[A]
Converts a Scala
Future
provided into an Observable.Converts a Scala
Future
provided into an Observable.If the created instance is a CancelableFuture, then it will be used for the returned Cancelable on
subscribe
. -
def
fromInputStream(in: InputStream, chunkSize: Int): Observable[Array[Byte]]
Converts a
java.io.InputStream
into an observable that will emitArray[Byte]
elements.Converts a
java.io.InputStream
into an observable that will emitArray[Byte]
elements.WARNING: reading from the input stream is a destructive process. Therefore only a single subscriber is supported, the result being a single-subscriber observable. If multiple subscribers are attempted, all subscribers, except for the first one, will be terminated with a APIContractViolationException.
Therefore, if you need a factory of data sources, from a cold source such as a
java.io.File
from which you can open how many file handles you want, you can use Observable.defer to build such a factory. Or you can share the resulting observable by converting it into a ConnectableObservable by means of multicast.- in
is the
InputStream
to convert into an observable- chunkSize
is the maximum length of the emitted arrays of bytes. It's also used when reading from the input stream.
-
def
fromInputStream(in: InputStream): Observable[Array[Byte]]
Converts a
java.io.InputStream
into an observable that will emitArray[Byte]
elements.Converts a
java.io.InputStream
into an observable that will emitArray[Byte]
elements.WARNING: reading from the input stream is a destructive process. Therefore only a single subscriber is supported, the result being a single-subscriber observable. If multiple subscribers are attempted, all subscribers, except for the first one, will be terminated with a APIContractViolationException.
Therefore, if you need a factory of data sources, from a cold source such as a
java.io.File
from which you can open how many file handles you want, you can use Observable.defer to build such a factory. Or you can share the resulting observable by converting it into a ConnectableObservable by means of multicast.- in
is the
InputStream
to convert into an observable
-
def
fromIterable[A](iterable: Iterable[A]): Observable[A]
Converts any
Iterable
into an Observable. -
def
fromIterator[A](iterator: Iterator[A], onFinish: () ⇒ Unit): Observable[A]
Converts any
Iterator
into an observable.Converts any
Iterator
into an observable.WARNING: reading from an
Iterator
is a destructive process. Therefore only a single subscriber is supported, the result being a single-subscriber observable. If multiple subscribers are attempted, all subscribers, except for the first one, will be terminated with a APIContractViolationException.Therefore, if you need a factory of data sources, from a cold source from which you can open how many iterators you want, you can use Observable.defer to build such a factory. Or you can share the resulting observable by converting it into a ConnectableObservable by means of multicast.
This variant of
fromIterator
takes anonFinish
callback that will be called when the streaming is finished, either withonComplete
,onError
, when the downstream signals aStop
or when the subscription gets canceled.This
onFinish
callback is guaranteed to be called only once.Useful for controlling resource deallocation (e.g. closing file handles).
- iterator
to transform into an observable
- onFinish
a callback that will be called for resource deallocation whenever the iterator is complete, or when the stream is canceled
-
def
fromIterator[A](iterator: Iterator[A]): Observable[A]
Converts any
Iterator
into an observable.Converts any
Iterator
into an observable.WARNING: reading from an
Iterator
is a destructive process. Therefore only a single subscriber is supported, the result being a single-subscriber observable. If multiple subscribers are attempted, all subscribers, except for the first one, will be terminated with a APIContractViolationException.Therefore, if you need a factory of data sources, from a cold source from which you can open how many iterators you want, you can use Observable.defer to build such a factory. Or you can share the resulting observable by converting it into a ConnectableObservable by means of multicast.
- iterator
to transform into an observable
-
def
fromLinesReader(in: BufferedReader): Observable[String]
Converts a
java.io.BufferedReader
into an observable that will emitString
text lines from the input.Converts a
java.io.BufferedReader
into an observable that will emitString
text lines from the input.Note that according to the specification of
BufferedReader
, a line is considered to be terminated by any one of a line feed (\n
), a carriage return (\r
), or a carriage return followed immediately by a linefeed.WARNING: reading from a reader is a destructive process. Therefore only a single subscriber is supported, the result being a single-subscriber observable. If multiple subscribers are attempted, all subscribers, except for the first one, will be terminated with a APIContractViolationException.
Therefore, if you need a factory of data sources, from a cold source such as a
java.io.File
from which you can open how many file handles you want, you can use Observable.defer to build such a factory. Or you can share the resulting observable by converting it into a ConnectableObservable by means of multicast.- in
is the
Reader
to convert into an observable
-
def
fromReactivePublisher[A](publisher: Publisher[A], requestCount: Int): Observable[A]
Given a
org.reactivestreams.Publisher
, converts it into a Monix / Rx Observable.Given a
org.reactivestreams.Publisher
, converts it into a Monix / Rx Observable.See the Reactive Streams protocol that Monix implements.
- publisher
is the
org.reactivestreams.Publisher
reference to wrap into an Observable- requestCount
a strictly positive number, representing the size of the buffer used and the number of elements requested on each cycle when communicating demand, compliant with the reactive streams specification. If
Int.MaxValue
is given, then no back-pressuring logic will be applied (e.g. an unbounded buffer is used and the source has a license to stream as many events as it wants).
- See also
Observable.toReactive for converting an
Observable
to a reactive publisher.
-
def
fromReactivePublisher[A](publisher: Publisher[A]): Observable[A]
Given a
org.reactivestreams.Publisher
, converts it into a Monix / Rx Observable.Given a
org.reactivestreams.Publisher
, converts it into a Monix / Rx Observable.See the Reactive Streams protocol that Monix implements.
- publisher
is the
org.reactivestreams.Publisher
reference to wrap into an Observable
- See also
Observable.toReactive for converting an
Observable
to a reactive publisher.
-
def
fromStateAction[S, A](f: (S) ⇒ (A, S))(initialState: ⇒ S): Observable[A]
Given an initial state and a generator function that produces the next state and the next element in the sequence, creates an observable that keeps generating elements produced by our generator function.
-
def
fromTask[A](task: Task[A]): Observable[A]
Converts any Task into an Observable.
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
def
interleave2[A](oa1: Observable[A], oa2: Observable[A]): Observable[A]
Creates a new observable from this observable and another given observable by interleaving their items into a strictly alternating sequence.
Creates a new observable from this observable and another given observable by interleaving their items into a strictly alternating sequence.
So the first item emitted by the new observable will be the item emitted by
self
, the second item will be emitted by the other observable, and so forth; when eitherself
orother
callsonCompletes
, the items will then be directly coming from the observable that has not completed; whenonError
is called by eitherself
orother
, the new observable will callonError
and halt.See merge for a more relaxed alternative that doesn't emit items in strict alternating sequence.
-
def
interval(delay: FiniteDuration): Observable[Long]
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval.
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval. Starts from 0 with no delay, after which it emits incremented numbers spaced by the
period
of time. The givenperiod
of time acts as a fixed delay between successive events.- delay
the delay between 2 successive events
-
def
intervalAtFixedRate(initialDelay: FiniteDuration, period: FiniteDuration): Observable[Long]
Creates an Observable that emits auto-incremented natural numbers (longs) at a fixed rate, as given by the specified
period
.Creates an Observable that emits auto-incremented natural numbers (longs) at a fixed rate, as given by the specified
period
. The time it takes to process anonNext
event gets subtracted from the specifiedperiod
and thus the created observable tries to emit events spaced by the given time interval, regardless of how long the processing ofonNext
takes.This version of the
intervalAtFixedRate
allows specifying aninitialDelay
before events start being emitted.- initialDelay
is the initial delay before emitting the first event
- period
the period between 2 successive
onNext
events
-
def
intervalAtFixedRate(period: FiniteDuration): Observable[Long]
Creates an Observable that emits auto-incremented natural numbers (longs) at a fixed rate, as given by the specified
period
.Creates an Observable that emits auto-incremented natural numbers (longs) at a fixed rate, as given by the specified
period
. The time it takes to process anonNext
event gets subtracted from the specifiedperiod
and thus the created observable tries to emit events spaced by the given time interval, regardless of how long the processing ofonNext
takes.- period
the period between 2 successive
onNext
events
-
def
intervalWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration): Observable[Long]
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval.
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval. Starts from 0 with no delay, after which it emits incremented numbers spaced by the
period
of time. The givenperiod
of time acts as a fixed delay between successive events.- initialDelay
is the delay to wait before emitting the first event
- delay
the time to wait between 2 successive events
-
def
intervalWithFixedDelay(delay: FiniteDuration): Observable[Long]
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval.
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval. Starts from 0 with no delay, after which it emits incremented numbers spaced by the
period
of time. The givenperiod
of time acts as a fixed delay between successive events.- delay
the delay between 2 successive events
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
def
merge[A](sources: Observable[A]*)(implicit os: OverflowStrategy[A] = OverflowStrategy.Default): Observable[A]
Merges the given list of observables into a single observable.
-
def
mergeDelayError[A](sources: Observable[A]*)(implicit os: OverflowStrategy[A] = OverflowStrategy.Default): Observable[A]
Merges the given list of observables into a single observable.
Merges the given list of observables into a single observable. Delays errors until the end.
-
def
multicast[A](multicast: MulticastStrategy[A], overflow: Synchronous[A])(implicit s: Scheduler): (Sync[A], Observable[A])
Creates an input channel and an output observable pair for building a multicast data-source.
Creates an input channel and an output observable pair for building a multicast data-source.
Useful for building multicast observables from data-sources that cannot be back-pressured.
Prefer Observable.create when possible.
- multicast
is the multicast strategy to use (e.g. publish, behavior, reply, async)
- overflow
is the overflow strategy for the buffer that gets placed in front (since this will be a hot data-source that cannot be back-pressured)
-
def
multicast[A](multicast: MulticastStrategy[A])(implicit s: Scheduler): (Sync[A], Observable[A])
Creates an input channel and an output observable pair for building a multicast data-source.
Creates an input channel and an output observable pair for building a multicast data-source.
Useful for building multicast observables from data-sources that cannot be back-pressured.
Prefer Observable.create when possible.
- multicast
is the multicast strategy to use (e.g. publish, behavior, reply, async)
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
never: Observable[Nothing]
Creates an Observable that doesn't emit anything and that never completes.
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
def
now[A](elem: A): Observable[A]
Returns an
Observable
that on execution emits the given strict value. -
def
pure[A](elem: A): Observable[A]
Lifts an element into the
Observable
context.Lifts an element into the
Observable
context.Alias for now.
-
def
raiseError(ex: Throwable): Observable[Nothing]
Creates an Observable that emits an error.
-
def
range(from: Long, until: Long, step: Long = 1L): Observable[Long]
Creates an Observable that emits items in the given range.
Creates an Observable that emits items in the given range.
- from
the range start
- until
the range end
- step
increment step, either positive or negative
-
def
repeat[A](elems: A*): Observable[A]
Creates an Observable that continuously emits the given item repeatedly.
-
def
repeatEval[A](task: ⇒ A): Observable[A]
Repeats the execution of the given
task
, emitting the results indefinitely. -
def
suspend[A](fa: ⇒ Observable[A]): Observable[A]
Alias for defer.
-
def
switch[A](sources: Observable[A]*): Observable[A]
Given a sequence of observables, builds an observable that emits the elements of the most recently emitted observable.
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
def
tailRecM[A, B](a: A)(f: (A) ⇒ Observable[Either[A, B]]): Observable[B]
Keeps calling
f
and concatenating the resulting observables for eachscala.util.Left
event emitted by the source, concatenating the resulting observables and pushing everyscala.util.Right[B]
events downstream.Keeps calling
f
and concatenating the resulting observables for eachscala.util.Left
event emitted by the source, concatenating the resulting observables and pushing everyscala.util.Right[B]
events downstream.Based on Phil Freeman's Stack Safety for Free.
It helps to wrap your head around it if you think of it as being equivalent to this inefficient and unsafe implementation (for
Observable
):def tailRecM[A, B](a: A)(f: (A) => Observable[Either[A, B]]): Observable[B] = f(a).flatMap { case Right(b) => pure(b) case Left(nextA) => tailRecM(nextA)(f) }
-
def
timerRepeated[A](initialDelay: FiniteDuration, period: FiniteDuration, unit: A): Observable[A]
Create an Observable that repeatedly emits the given
item
, until the underlying Observer cancels. -
def
toReactive[A](source: Observable[A])(implicit s: Scheduler): Publisher[A]
Wraps this Observable into a
org.reactivestreams.Publisher
.Wraps this Observable into a
org.reactivestreams.Publisher
. See the Reactive Streams protocol that Monix implements. -
def
toString(): String
- Definition Classes
- AnyRef → Any
-
implicit
val
typeClassInstances: TypeClassInstances
Implicit type-class instances for Observable.
-
def
unsafeCreate[A](f: (Subscriber[A]) ⇒ Cancelable): Observable[A]
Given a subscribe function, lifts it into an Observable.
Given a subscribe function, lifts it into an Observable.
This function is unsafe to use because users have to know and apply the Monix communication contract, related to thread-safety, communicating demand (back-pressure) and error handling.
Only use if you know what you're doing. Otherwise prefer create.
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @throws( ... )
-
def
zip2[A1, A2](oa1: Observable[A1], oa2: Observable[A2]): Observable[(A1, A2)]
Creates a new observable from two observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from two observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap2 for a more relaxed alternative that doesn't combine items in strict sequence.
-
def
zip3[A1, A2, A3](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3]): Observable[(A1, A2, A3)]
Creates a new observable from three observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from three observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap3 for a more relaxed alternative that doesn't combine items in strict sequence.
-
def
zip4[A1, A2, A3, A4](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4]): Observable[(A1, A2, A3, A4)]
Creates a new observable from four observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from four observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap4 for a more relaxed alternative that doesn't combine items in strict sequence.
-
def
zip5[A1, A2, A3, A4, A5](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5]): Observable[(A1, A2, A3, A4, A5)]
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap5 for a more relaxed alternative that doesn't combine items in strict sequence.
-
def
zip6[A1, A2, A3, A4, A5, A6](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5], oa6: Observable[A6]): Observable[(A1, A2, A3, A4, A5, A6)]
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap5 for a more relaxed alternative that doesn't combine items in strict sequence.
-
def
zipList[A](sources: Observable[A]*): Observable[Seq[A]]
Given an observable sequence, it zips them together returning a new observable that generates sequences.
-
def
zipMap2[A1, A2, R](oa1: Observable[A1], oa2: Observable[A2])(f: (A1, A2) ⇒ R): Observable[R]
Creates a new observable from two observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from two observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap2 for a more relaxed alternative that doesn't combine items in strict sequence.
- f
is the mapping function applied over the generated pairs
-
def
zipMap3[A1, A2, A3, R](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3])(f: (A1, A2, A3) ⇒ R): Observable[R]
Creates a new observable from three observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from three observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap3 for a more relaxed alternative that doesn't combine items in strict sequence.
- f
is the mapping function applied over the generated pairs
-
def
zipMap4[A1, A2, A3, A4, R](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4])(f: (A1, A2, A3, A4) ⇒ R): Observable[R]
Creates a new observable from four observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from four observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap4 for a more relaxed alternative that doesn't combine items in strict sequence.
- f
is the mapping function applied over the generated pairs
-
def
zipMap5[A1, A2, A3, A4, A5, R](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5])(f: (A1, A2, A3, A4, A5) ⇒ R): Observable[R]
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap5 for a more relaxed alternative that doesn't combine items in strict sequence.
- f
is the mapping function applied over the generated pairs
-
def
zipMap6[A1, A2, A3, A4, A5, A6, R](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5], oa6: Observable[A6])(f: (A1, A2, A3, A4, A5, A6) ⇒ R): Observable[R]
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap5 for a more relaxed alternative that doesn't combine items in strict sequence.
- f
is the mapping function applied over the generated pairs
This is the API documentation for the Monix library.
Package Overview
monix.execution exposes lower level primitives for dealing with asynchronous execution:
Atomic
types, as alternative tojava.util.concurrent.atomic
monix.eval is for dealing with evaluation of results, thus exposing Task and Coeval.
monix.reactive exposes the
Observable
pattern:Observable
implementationsmonix.types implements type-class shims, to be translated to type-classes provided by libraries such as Cats or Scalaz.
monix.cats is the optional integration with the Cats library, providing translations for the types described in
monix.types
.monix.scalaz is the optional integration with the Scalaz library, providing translations for the types described in
monix.types
.