object Observable extends ObservableDeprecatedBuilders with Serializable
- Alphabetic
- By Inheritance
- Observable
- Serializable
- ObservableDeprecatedBuilders
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- class CatsInstances extends Bracket[Observable, Throwable] with Alternative[Observable] with CoflatMap[Observable] with FunctorFilter[Observable] with TaskLift[Observable]
Cats instances for Observable.
- implicit final class DeprecatedExtensions[+A] extends AnyVal with ObservableDeprecatedMethods[A]
Exposes extension methods for deprecated Observable methods.
- type Operator[-I, +O] = (Subscriber[O]) => Subscriber[I]
An
Operator
is a function for transforming observers, that can be used for lifting observables.An
Operator
is a function for transforming observers, that can be used for lifting observables.
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##(): Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def apply[A](elems: A*): Observable[A]
Given a sequence of elements, builds an observable from it.
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- implicit val catsInstances: CatsInstances
Implicit type class instances for Observable.
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- def coeval[A](value: Coeval[A]): Observable[A]
Transforms a non-strict Coeval value into an
Observable
that emits a single element. - def combineLatest2[A1, A2](oa1: Observable[A1], oa2: Observable[A2]): Observable[(A1, A2)]
Creates a combined observable from 2 source observables.
Creates a combined observable from 2 source observables.
This operator behaves in a similar way to zip2, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item).Visual Example
stream1: 1 - - 2 - - 3 - 4 - - stream2: 1 - - 2 - 3 - - - - 4 result: (1, 1), (2, 2), (2, 3), (3, 3), (4, 3), (4, 4)
- def combineLatest3[A1, A2, A3](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3]): Observable[(A1, A2, A3)]
Creates a combined observable from 3 source observables.
Creates a combined observable from 3 source observables.
This operator behaves in a similar way to zip3, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). - def combineLatest4[A1, A2, A3, A4](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4]): Observable[(A1, A2, A3, A4)]
Creates a combined observable from 4 source observables.
Creates a combined observable from 4 source observables.
This operator behaves in a similar way to zip4, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). - def combineLatest5[A1, A2, A3, A4, A5](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5]): Observable[(A1, A2, A3, A4, A5)]
Creates a combined observable from 5 source observables.
Creates a combined observable from 5 source observables.
This operator behaves in a similar way to zip5, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). - def combineLatest6[A1, A2, A3, A4, A5, A6](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5], oa6: Observable[A6]): Observable[(A1, A2, A3, A4, A5, A6)]
Creates a combined observable from 6 source observables.
Creates a combined observable from 6 source observables.
This operator behaves in a similar way to zip6, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). - def combineLatestList[A](sources: Observable[A]*): Observable[Seq[A]]
Given an observable sequence, it combines them together returning a new observable that generates sequences.
- def combineLatestMap2[A1, A2, R](oa1: Observable[A1], oa2: Observable[A2])(f: (A1, A2) => R): Observable[R]
Creates a combined observable from 2 source observables.
Creates a combined observable from 2 source observables.
This operator behaves in a similar way to zipMap2, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item).Visual Example
stream1: 1 - - 2 - - 3 - 4 - - stream2: 1 - - 2 - 3 - - - - 4 result: (1, 1), (2, 2), (2, 3), (3, 3), (4, 3), (4, 4)
- def combineLatestMap3[A1, A2, A3, R](a1: Observable[A1], a2: Observable[A2], a3: Observable[A3])(f: (A1, A2, A3) => R): Observable[R]
Creates a combined observable from 3 source observables.
Creates a combined observable from 3 source observables.
This operator behaves in a similar way to zipMap3, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). - def combineLatestMap4[A1, A2, A3, A4, R](a1: Observable[A1], a2: Observable[A2], a3: Observable[A3], a4: Observable[A4])(f: (A1, A2, A3, A4) => R): Observable[R]
Creates a combined observable from 4 source observables.
Creates a combined observable from 4 source observables.
This operator behaves in a similar way to zipMap4, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). - def combineLatestMap5[A1, A2, A3, A4, A5, R](a1: Observable[A1], a2: Observable[A2], a3: Observable[A3], a4: Observable[A4], a5: Observable[A5])(f: (A1, A2, A3, A4, A5) => R): Observable[R]
Creates a combined observable from 5 source observables.
Creates a combined observable from 5 source observables.
This operator behaves in a similar way to zipMap5, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). - def combineLatestMap6[A1, A2, A3, A4, A5, A6, R](a1: Observable[A1], a2: Observable[A2], a3: Observable[A3], a4: Observable[A4], a5: Observable[A5], a6: Observable[A6])(f: (A1, A2, A3, A4, A5, A6) => R): Observable[R]
Creates a combined observable from 6 source observables.
Creates a combined observable from 6 source observables.
This operator behaves in a similar way to zipMap6, but while
zip
emits items only when all of the zipped source observables have emitted a previously unzipped item,combine
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). - def cons[A](head: A, tail: Observable[A]): Observable[A]
Builds a new observable from a strict
head
and a lazily evaluated tail. - def create[A](overflowStrategy: Synchronous[A], producerType: ProducerSide = MultiProducer)(f: (Sync[A]) => Cancelable): Observable[A]
Creates an observable from a function that receives a concurrent and safe Subscriber.Sync.
Creates an observable from a function that receives a concurrent and safe Subscriber.Sync.
This builder represents the safe way of building observables from data-sources that cannot be back-pressured.
- overflowStrategy
is the overflow strategy that specifies the type of the underlying buffer (unbounded, that overflows the head, etc). This parameter can only specify a "synchronous" strategy, so no back-pressuring allowed.
- producerType
(UNSAFE) is the producer type and can be
MultiProducer
orSingleProducer
, specified as an optimization option; if you don't know what you're doing, stick toMultiProducer
, which says that multiple producers can push events at the same time, which is the default
- def defer[A](fa: => Observable[A]): Observable[A]
Returns a new observable that creates a sequence from the given factory on each subscription.
- def delay[A](a: => A): Observable[A]
Alias for eval.
- def empty[A]: Observable[A]
Creates an observable that doesn't emit anything, but immediately calls
onComplete
instead. - final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def eval[A](a: => A): Observable[A]
Given a non-strict value, converts it into an Observable that upon subscription, evaluates the expression and emits a single element.
- def evalDelayed[A](delay: FiniteDuration, a: => A): Observable[A]
Lifts a non-strict value into an observable that emits a single element, but upon subscription delay its evaluation by the specified timespan
- def evalOnce[A](f: => A): Observable[A]
Given a non-strict value, converts it into an Observable that emits a single element and that memoizes the value for subsequent invocations.
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- def firstStartedOf[A](source: Observable[A]*): Observable[A]
Given a list of source Observables, emits all of the items from the first of these Observables to emit an item or to complete, and cancel the rest.
Given a list of source Observables, emits all of the items from the first of these Observables to emit an item or to complete, and cancel the rest.
Visual Example
stream1: - - 1 1 1 - 1 - 1 - - stream2: - - - - - 2 2 2 2 2 2 result: - - 1 1 1 - 1 - 1 - -
- def from[F[_], A](fa: F[A])(implicit F: ObservableLike[F]): Observable[A]
Converts to Observable from any
F[_]
that has an ObservableLike instance.Converts to Observable from any
F[_]
that has an ObservableLike instance.Supported types includes, but is not necessarily limited to:
- def fromAsyncStateAction[S, A](f: (S) => Task[(A, S)])(seed: => S): Observable[A]
Given an initial state and a generator function that produces the next state and the next element in the sequence, creates an observable that keeps generating elements produced by our generator function.
- def fromAsyncStateActionF[F[_], S, A](f: (S) => F[(A, S)])(seed: => S)(implicit F: TaskLike[F]): Observable[A]
Version of fromAsyncStateAction that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.Version of fromAsyncStateAction that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.So you can work among others with:
cats.effect.IO
monix.eval.Coeval
scala.concurrent.Future
- ...
- See also
fromAsyncStateAction for a version specialized for Task
- def fromCharsReader(in: Task[Reader], chunkSize: Int = 4096): Observable[Array[Char]]
Safely converts a
java.io.Reader
into an observable that will emitArray[Char]
elements.Safely converts a
java.io.Reader
into an observable that will emitArray[Char]
elements.Compared with fromCharsReaderUnsafe, this version:
- is referentially transparent, the input being a "generator" powered by monix.eval.Task
- automatically forks execution on subscription to ensure that the current thread isn't blocked by the ensuing blocking I/O
- ensures that the input stream is closed on completion, failure or cancellation
- in
is the
Task[Reader]
generator to convert into an observable- chunkSize
is the maximum length of the emitted arrays of chars
- def fromCharsReaderF[F[_]](in: F[Reader], chunkSize: Int = 4096)(implicit F: TaskLike[F]): Observable[Array[Char]]
Version of fromCharsReader that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.Version of fromCharsReader that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.So you can work among others with:
cats.effect.IO
monix.eval.Coeval
scala.concurrent.Future
- ...
- def fromCharsReaderUnsafe(in: Reader, chunkSize: Int = 4096): Observable[Array[Char]]
Converts a
java.io.Reader
into an observable that will emitArray[Char]
elements.Converts a
java.io.Reader
into an observable that will emitArray[Char]
elements.UNSAFE WARNING: this is an unsafe function, because reading from a reader is a destructive process, also violating referential transparency. Therefore only a single subscriber is supported, the result being a single-subscriber observable. If multiple subscribers are attempted, all subscribers, except for the first one, will be terminated with a APIContractViolationException.
UNSAFE PROTOCOL: the created Observable does not close the given
Reader
. Usually it's the producer of a resource that needs to deallocate the resource.This operation will start processing on the current thread (on
subscribe()
), so in order to not block, it might be better to also do an executeAsync, or you may want to use the AlwaysAsyncExecution model, which can be configured perScheduler
, see Scheduler.withExecutionModel, or perObservable
, see Observable.executeWithModel.- in
is the
Reader
to convert into an observable- chunkSize
is the maximum length of the emitted arrays of chars
- Annotations
- @UnsafeProtocol() @UnsafeBecauseImpure()
- See also
fromCharsReader for the safe version
- def fromEither[E, A](f: (E) => Throwable)(a: Either[E, A]): Observable[A]
Builds a Observable instance out of a Scala
Either
. - def fromEither[E <: Throwable, A](a: Either[E, A]): Observable[A]
Builds an
Observable
instance out of a ScalaEither
. - def fromFuture[A](factory: => Future[A]): Observable[A]
Converts a Scala
Future
provided into an Observable.Converts a Scala
Future
provided into an Observable.If the created instance is a CancelableFuture, then it will be used for the returned Cancelable on
subscribe
. - def fromInputStream(in: Task[InputStream], chunkSize: Int = 4096): Observable[Array[Byte]]
Safely converts a
java.io.InputStream
into an observable that will emitArray[Byte]
elements.Safely converts a
java.io.InputStream
into an observable that will emitArray[Byte]
elements.Compared with fromInputStreamUnsafe, this version:
- is referentially transparent, the input being a "generator" powered by monix.eval.Task
- automatically forks execution on subscription to ensure that the current thread isn't blocked by the ensuing blocking I/O
- ensures that the input stream is closed on completion, failure or cancellation
- in
is the
Task[InputStream]
generator to convert into an observable- chunkSize
is the maximum length of the emitted arrays of bytes
- def fromInputStreamF[F[_]](in: F[InputStream], chunkSize: Int = 4096)(implicit F: TaskLike[F]): Observable[Array[Byte]]
Version of fromInputStream that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.Version of fromInputStream that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.So you can work among others with:
cats.effect.IO
monix.eval.Coeval
scala.concurrent.Future
- ...
- def fromInputStreamUnsafe(in: InputStream, chunkSize: Int = 4096): Observable[Array[Byte]]
Converts a
java.io.InputStream
into an observable that will emitArray[Byte]
elements.Converts a
java.io.InputStream
into an observable that will emitArray[Byte]
elements.UNSAFE WARNING: this is an unsafe function, because reading from an input stream is a destructive process, also violating referential transparency. Therefore only a single subscriber is supported, the result being a single-subscriber observable. If multiple subscribers are attempted, all subscribers, except for the first one, will be terminated with a APIContractViolationException.
UNSAFE PROTOCOL: the created Observable does not close the given
InputStream
. Usually it's the producer of a resource that needs to deallocate the resource.This operation will start processing on the current thread (on
subscribe()
), so in order to not block, it might be better to also do an executeAsync, or you may want to use the AlwaysAsyncExecution model, which can be configured perScheduler
, see Scheduler.withExecutionModel, or perObservable
, see Observable.executeWithModel.- in
is the
InputStream
to convert into an observable- chunkSize
is the maximum length of the emitted arrays of bytes
- Annotations
- @UnsafeProtocol() @UnsafeBecauseImpure()
- See also
fromInputStream for the safe version
- def fromIterable[A](iterable: Iterable[A]): Observable[A]
Converts any
Iterable
into an Observable. - def fromIterator[A](resource: Resource[Task, Iterator[A]]): Observable[A]
Wraps a scala.Iterator into an
Observable
in the context of a cats.effect.Resource, which allows for specifying a finalizer.Wraps a scala.Iterator into an
Observable
in the context of a cats.effect.Resource, which allows for specifying a finalizer.- See also
fromIterator(task) for a version that uses Task for suspending side effects
fromIteratorUnsafe for the unsafe version that can wrap an iterator directly
- def fromIterator[A](task: Task[Iterator[A]]): Observable[A]
Wraps a scala.Iterator into an
Observable
.Wraps a scala.Iterator into an
Observable
.This function uses Task in order to suspend the creation of the
Iterator
, because reading from anIterator
is a destructive process. TheTask
is being used as a "factory", in pace of scala.Iterable.Example:
import monix.eval.Task Observable.fromIterator(Task(Iterator.from(1)))
- See also
fromIterator(Resource) for a version that uses
cats.effect.Resource
fromIteratorUnsafe for the unsafe version that can wrap an iterator directly
- def fromIteratorF[F[_], A](iteratorF: F[Iterator[A]])(implicit F: TaskLike[F]): Observable[A]
Version of fromIterator that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.Version of fromIterator that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.So you can work among others with:
cats.effect.IO
monix.eval.Coeval
scala.concurrent.Future
- ...
- def fromIteratorUnsafe[A](iterator: Iterator[A]): Observable[A]
Converts any
Iterator
into an observable.Converts any
Iterator
into an observable.UNSAFE WARNING: reading from an
Iterator
is a destructive process. Therefore only a single subscriber is supported, the result being a single-subscriber observable. If multiple subscribers are attempted, all subscribers, except for the first one, will be terminated with a APIContractViolationException.- iterator
to transform into an observable
- Annotations
- @UnsafeProtocol() @UnsafeBecauseImpure()
- See also
fromIterator(task) or fromIterator(resource) for safe alternatives
- def fromLinesReader(in: Task[BufferedReader]): Observable[String]
Safely converts a
java.io.BufferedReader
into an observable that will emitString
elements corresponding to text lines from the input.Safely converts a
java.io.BufferedReader
into an observable that will emitString
elements corresponding to text lines from the input.According to the specification of
BufferedReader
, a line is considered to be terminated by any one of a line feed (\n
), a carriage return (\r
), or a carriage return followed immediately by a linefeed.Compared with fromLinesReaderUnsafe, this version:
- is referentially transparent, the input being a "generator" powered by monix.eval.Task
- automatically forks execution on subscription to ensure that the current thread isn't blocked by the ensuing blocking I/O
- ensures that the input stream is closed on completion, failure or cancellation
- in
is the
Task[BufferedReader]
generator to convert into an observable
- def fromLinesReaderF[F[_]](in: F[BufferedReader])(implicit F: TaskLike[F]): Observable[String]
Version of fromLinesReader that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.Version of fromLinesReader that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.So you can work among others with:
cats.effect.IO
monix.eval.Coeval
scala.concurrent.Future
- ...
- def fromLinesReaderUnsafe(in: BufferedReader): Observable[String]
Converts a
java.io.BufferedReader
into an observable that will emitString
text lines from the input.Converts a
java.io.BufferedReader
into an observable that will emitString
text lines from the input.According to the specification of
BufferedReader
, a line is considered to be terminated by any one of a line feed (\n
), a carriage return (\r
), or a carriage return followed immediately by a linefeed.UNSAFE WARNING: this is an unsafe function, because reading from a reader is a destructive process, also violating referential transparency. Therefore only a single subscriber is supported, the result being a single-subscriber observable. If multiple subscribers are attempted, all subscribers, except for the first one, will be terminated with a APIContractViolationException.
UNSAFE PROTOCOL: the created Observable does not close the given
Reader
. Usually it's the producer of a resource that needs to deallocate the resource.- in
is the
Reader
to convert into an observable
- Annotations
- @UnsafeProtocol() @UnsafeBecauseImpure()
- See also
fromLinesReader for the safe version
- def fromReactivePublisher[A](publisher: Publisher[A], requestCount: Int): Observable[A]
Given a
org.reactivestreams.Publisher
, converts it into a Monix / Rx Observable.Given a
org.reactivestreams.Publisher
, converts it into a Monix / Rx Observable.See the Reactive Streams protocol that Monix implements.
- publisher
is the
org.reactivestreams.Publisher
reference to wrap into an Observable- requestCount
a strictly positive number, representing the size of the buffer used and the number of elements requested on each cycle when communicating demand, compliant with the reactive streams specification. If
Int.MaxValue
is given, then no back-pressuring logic will be applied (e.g. an unbounded buffer is used and the source has a license to stream as many events as it wants).
- See also
Observable.toReactive for converting an
Observable
to a reactive publisher.
- def fromReactivePublisher[A](publisher: Publisher[A]): Observable[A]
Given a
org.reactivestreams.Publisher
, converts it into a Monix / Rx Observable.Given a
org.reactivestreams.Publisher
, converts it into a Monix / Rx Observable.See the Reactive Streams protocol that Monix implements.
- publisher
is the
org.reactivestreams.Publisher
reference to wrap into an Observable
- See also
Observable.toReactive for converting an
Observable
to a reactive publisher.
- def fromResource[F[_], A](resource: Resource[F, A])(implicit F: TaskLike[F]): Observable[A]
Transforms any
cats.effect.Resource
into an Observable.Transforms any
cats.effect.Resource
into an Observable.See the documentation for Resource.
import cats.effect.Resource import monix.eval.Task import java.io._ def openFileAsResource(file: File): Resource[Task, FileInputStream] = Resource.make(Task(new FileInputStream(file)))(h => Task(h.close())) def openFileAsStream(file: File): Observable[FileInputStream] = Observable.fromResource(openFileAsResource(file))
This example would be equivalent with usage of Observable.resource:
def openFileAsResource2(file: File): Observable[FileInputStream] = { Observable.resource(Task(new FileInputStream(file)))(h => Task(h.close())) }
This means that
flatMap
is safe to use:def readBytes(file: File): Observable[Array[Byte]] = openFileAsStream(file).flatMap { in => Observable.fromInputStreamUnsafe(in) }
- def fromStateAction[S, A](f: (S) => (A, S))(seed: => S): Observable[A]
Given an initial state and a generator function that produces the next state and the next element in the sequence, creates an observable that keeps generating elements produced by our generator function.
- def fromTask[A](task: Task[A]): Observable[A]
Converts any Task into an Observable.
Converts any Task into an Observable.
import monix.eval.Task val task = Task.eval("Hello!") Observable.fromTask(task)
- def fromTaskLike[F[_], A](fa: F[A])(implicit F: TaskLike[F]): Observable[A]
Converts generic
F[_]
effects toObservable
.Converts generic
F[_]
effects toObservable
.Currently supported data types:
- monix.eval.Task
- monix.eval.Coeval
- scala.concurrent.Future
- cats.effect.IO
- any cats.effect.Effect
- any cats.effect.ConcurrentEffect
Sample:
import cats.implicits._ import cats.effect.IO import scala.concurrent.duration._ import monix.execution.Scheduler.global import monix.catnap.SchedulerEffect // Needed for IO.sleep implicit val timer = SchedulerEffect.timerLiftIO[IO](global) val task = IO.sleep(5.seconds) *> IO(println("Hello!")) Observable.fromTaskLike(task)
- def fromTry[A](a: Try[A]): Observable[A]
Converts a Scala
Try
into anObservable
.Converts a Scala
Try
into anObservable
.import scala.util.Try val value = Try(1) Observable.fromTry(value)
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def interleave2[A](oa1: Observable[A], oa2: Observable[A]): Observable[A]
Creates a new observable from this observable and another given observable by interleaving their items into a strictly alternating sequence.
Creates a new observable from this observable and another given observable by interleaving their items into a strictly alternating sequence.
So the first item emitted by the new observable will be the item emitted by
self
, the second item will be emitted by the other observable, and so forth; when eitherself
orother
callsonCompletes
, the items will then be directly coming from the observable that has not completed; whenonError
is called by eitherself
orother
, the new observable will callonError
and halt.See merge for a more relaxed alternative that doesn't emit items in strict alternating sequence.
- def interval(delay: FiniteDuration): Observable[Long]
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval.
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval. Starts from 0 with no delay, after which it emits incremented numbers spaced by the
delay
of time. The givendelay
of time acts as a fixed delay between successive events.- delay
the delay between 2 successive events
- def intervalAtFixedRate(initialDelay: FiniteDuration, period: FiniteDuration): Observable[Long]
Creates an Observable that emits auto-incremented natural numbers (longs) at a fixed rate, as given by the specified
period
.Creates an Observable that emits auto-incremented natural numbers (longs) at a fixed rate, as given by the specified
period
. The time it takes to process anonNext
event gets subtracted from the specifiedperiod
and thus the created observable tries to emit events spaced by the given time interval, regardless of how long the processing ofonNext
takes.This version of the
intervalAtFixedRate
allows specifying aninitialDelay
before events start being emitted.- initialDelay
is the initial delay before emitting the first event
- period
the period between 2 successive
onNext
events
- def intervalAtFixedRate(period: FiniteDuration): Observable[Long]
Creates an Observable that emits auto-incremented natural numbers (longs) at a fixed rate, as given by the specified
period
.Creates an Observable that emits auto-incremented natural numbers (longs) at a fixed rate, as given by the specified
period
. The time it takes to process anonNext
event gets subtracted from the specifiedperiod
and thus the created observable tries to emit events spaced by the given time interval, regardless of how long the processing ofonNext
takes.- period
the period between 2 successive
onNext
events
- def intervalWithFixedDelay(delay: FiniteDuration): Observable[Long]
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval.
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval. Starts from 0 with no delay, after which it emits incremented numbers spaced by the
delay
of time. The givendelay
of time acts as a fixed delay between successive events.- delay
the delay between 2 successive events
- def intervalWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration): Observable[Long]
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval.
Creates an Observable that emits auto-incremented natural numbers (longs) spaced by a given time interval. Starts from 0 with
initialDelay
, after which it emits incremented numbers spaced by thedelay
of time. The givendelay
of time acts as a fixed delay between successive events.This version of the
intervalWithFixedDelay
allows specifying aninitialDelay
before events start being emitted.- initialDelay
is the delay to wait before emitting the first event
- delay
the time to wait between 2 successive events
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def liftFrom[F[_]](implicit F: ObservableLike[F]): ~>[F, Observable]
Returns a
F ~> Coeval
(FunctionK
) for transforming any supported data-type into Observable. - def multicast[A](multicast: MulticastStrategy[A], overflow: Synchronous[A])(implicit s: Scheduler): (Sync[A], Observable[A])
Creates an input channel and an output observable pair for building a multicast data-source.
Creates an input channel and an output observable pair for building a multicast data-source.
Useful for building multicast observables from data-sources that cannot be back-pressured.
Prefer Observable.create when possible.
- multicast
is the multicast strategy to use (e.g. publish, behavior, reply, async)
- overflow
is the overflow strategy for the buffer that gets placed in front (since this will be a hot data-source that cannot be back-pressured)
- def multicast[A](multicast: MulticastStrategy[A])(implicit s: Scheduler): (Sync[A], Observable[A])
Creates an input channel and an output observable pair for building a multicast data-source.
Creates an input channel and an output observable pair for building a multicast data-source.
Useful for building multicast observables from data-sources that cannot be back-pressured.
Prefer Observable.create when possible.
- multicast
is the multicast strategy to use (e.g. publish, behavior, reply, async)
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def never[A]: Observable[A]
Creates an Observable that doesn't emit anything and that never completes.
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- def now[A](elem: A): Observable[A]
Returns an
Observable
that on execution emits the given strict value. - implicit val observableNonEmptyParallel: Aux[Observable, observables.CombineObservable.Type]
cats.NonEmptyParallel instance for Observable.
- def pure[A](elem: A): Observable[A]
Lifts an element into the
Observable
context.Lifts an element into the
Observable
context.Alias for now.
- def raiseError[A](ex: Throwable): Observable[A]
Creates an Observable that emits an error.
- def range(from: Long, until: Long, step: Long = 1L): Observable[Long]
Creates an Observable that emits items in the given range.
Creates an Observable that emits items in the given range.
- from
the range start
- until
the range end
- step
increment step, either positive or negative
- def repeat[A](elems: A*): Observable[A]
Creates an Observable that continuously emits the given item repeatedly.
- def repeatEval[A](task: => A): Observable[A]
Repeats the execution of the given
task
, emitting the results indefinitely. - def repeatEvalF[F[_], A](fa: F[A])(implicit F: TaskLike[F]): Observable[A]
Repeats the evaluation of given effectful value, emitting the results indefinitely.
- def resource[A](acquire: Task[A])(release: (A) => Task[Unit]): Observable[A]
Creates a
Observable
that depends on resource allocated by a monadic value, ensuring the resource is released.Creates a
Observable
that depends on resource allocated by a monadic value, ensuring the resource is released.Typical use-cases are working with files or network sockets
Example
import monix.eval.Task import java.io.PrintWriter val printer = Observable.resource { Task(new PrintWriter("./lines.txt")) } { writer => Task(writer.close()) } // Safely use the resource, because the release is // scheduled to happen afterwards val writeLines = printer.flatMap { writer => Observable .fromIterator(Task(Iterator.from(1))) .mapEval(i => Task { writer.println(s"Line #\$i") }) } // Write 100 numbered lines to the file, closing the writer // when finished (after `runAsync`): writeLines.take(100).completedL
- acquire
resource to acquire at the start of the stream
- release
function that releases the acquired resource
- def resourceCase[A](acquire: Task[A])(release: (A, ExitCase[Throwable]) => Task[Unit]): Observable[A]
Creates a stream that depends on resource allocated by a monadic value, ensuring the resource is released.
Creates a stream that depends on resource allocated by a monadic value, ensuring the resource is released.
Typical use-cases are working with files or network sockets
Example
import cats.effect.ExitCase import monix.eval.Task import java.io.PrintWriter val printer = Observable.resourceCase { Task(new PrintWriter("./lines.txt")) } { case (writer, ExitCase.Canceled | ExitCase.Completed) => Task(writer.close()) case (writer, ExitCase.Error(e)) => Task { println(e.getMessage); writer.close() } } // Safely use the resource, because the release is // scheduled to happen afterwards val writeLines = printer.flatMap { writer => Observable .fromIterator(Task(Iterator.from(1))) .mapEval(i => Task { writer.println(s"Line #\$i") }) } // Write 100 numbered lines to the file, closing the writer // when finished (after `runAsync`): writeLines.take(100).completedL
- acquire
an effect that acquires an expensive resource
- release
function that releases the acquired resource
- def resourceCaseF[F[_], A](acquire: F[A])(release: (A, ExitCase[Throwable]) => F[Unit])(implicit F: TaskLike[F]): Observable[A]
Version of resourceCase that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.Version of resourceCase that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.So you can work among others with:
cats.effect.IO
monix.eval.Coeval
scala.concurrent.Future
- ...
- def resourceF[F[_], A](acquire: F[A])(release: (A) => F[Unit])(implicit F: TaskLike[F]): Observable[A]
Version of resource that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.Version of resource that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.So you can work among others with:
cats.effect.IO
monix.eval.Coeval
scala.concurrent.Future
- ...
- def suspend[A](fa: => Observable[A]): Observable[A]
Alias for defer.
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def tailRecM[A, B](a: A)(f: (A) => Observable[Either[A, B]]): Observable[B]
Keeps calling
f
and concatenating the resulting observables for eachscala.util.Left
event emitted by the source, concatenating the resulting observables and pushing everyscala.util.Right[B]
events downstream.Keeps calling
f
and concatenating the resulting observables for eachscala.util.Left
event emitted by the source, concatenating the resulting observables and pushing everyscala.util.Right[B]
events downstream.Based on Phil Freeman's Stack Safety for Free.
It helps to wrap your head around it if you think of it as being equivalent to this inefficient and unsafe implementation (for
Observable
):// Don't do this kind of recursion, because `flatMap` can throw // stack overflow errors: def tailRecM[A, B](a: A)(f: (A) => Observable[Either[A, B]]): Observable[B] = f(a).flatMap { case Right(b) => Observable.pure(b) case Left(nextA) => tailRecM(nextA)(f) }
- def timerRepeated[A](initialDelay: FiniteDuration, period: FiniteDuration, unit: A): Observable[A]
Create an Observable that repeatedly emits the given
item
, until the underlying Observer cancels. - def toReactive[A](source: Observable[A])(implicit s: Scheduler): Publisher[A]
Wraps this Observable into a
org.reactivestreams.Publisher
.Wraps this Observable into a
org.reactivestreams.Publisher
. See the Reactive Streams protocol that Monix implements. - def toString(): String
- Definition Classes
- AnyRef → Any
- def unfold[S, A](seed: => S)(f: (S) => Option[(A, S)]): Observable[A]
Given an initial state and a generator function that produces the next state and the next element in the sequence, creates an observable that keeps generating elements produced by our generator function until
None
is returned.Given an initial state and a generator function that produces the next state and the next element in the sequence, creates an observable that keeps generating elements produced by our generator function until
None
is returned.Observable.unfold(0)(i => if (i < 10) Some((i, i + 1)) else None).toListL result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Example: - def unfoldEval[S, A](seed: => S)(f: (S) => Task[Option[(A, S)]]): Observable[A]
Given an initial state and a generator function that produces the next state and the next element in the sequence, creates an observable that keeps generating elements produced by our generator function until
None
is returned.Given an initial state and a generator function that produces the next state and the next element in the sequence, creates an observable that keeps generating elements produced by our generator function until
None
is returned.Observable.unfoldEval(0)(i => if (i < 10) Task.now(Some((i, i + 1))) else Task.now(None)).toListL result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Example: - def unfoldEvalF[F[_], S, A](seed: => S)(f: (S) => F[Option[(A, S)]])(implicit F: TaskLike[F]): Observable[A]
Version of unfoldEval that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.Version of unfoldEval that can work with generic
F[_]
tasks, anything that's supported via monix.eval.TaskLike conversions.So you can work among others with:
cats.effect.IO
monix.eval.Coeval
scala.concurrent.Future
- ...
- See also
unfoldEval for a version specialized for Task
- val unit: Observable[Unit]
Reusable value for an
Observable[Unit]
that emits a single event, the implementation forcats.effect.Applicative.unit
. - def unsafeCreate[A](f: (Subscriber[A]) => Cancelable): Observable[A]
Given a subscribe function, lifts it into an Observable.
Given a subscribe function, lifts it into an Observable.
This function is unsafe to use because users have to know and apply the Monix communication contract, related to thread-safety, communicating demand (back-pressure) and error handling.
Only use if you know what you're doing. Otherwise prefer create.
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- def zip2[A1, A2](oa1: Observable[A1], oa2: Observable[A2]): Observable[(A1, A2)]
Creates a new observable from two observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from two observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
Visual Example
stream1: 1 - - 2 - - 3 - 4 - - stream2: 1 - - 2 - 3 - - - - 4 result: (1, 1), (2, 2), (3, 3), (4, 4)
See combineLatestMap2 for a more relaxed alternative that doesn't combine items in strict sequence.
- def zip3[A1, A2, A3](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3]): Observable[(A1, A2, A3)]
Creates a new observable from three observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from three observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap3 for a more relaxed alternative that doesn't combine items in strict sequence.
- def zip4[A1, A2, A3, A4](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4]): Observable[(A1, A2, A3, A4)]
Creates a new observable from four observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from four observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap4 for a more relaxed alternative that doesn't combine items in strict sequence.
- def zip5[A1, A2, A3, A4, A5](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5]): Observable[(A1, A2, A3, A4, A5)]
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap5 for a more relaxed alternative that doesn't combine items in strict sequence.
- def zip6[A1, A2, A3, A4, A5, A6](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5], oa6: Observable[A6]): Observable[(A1, A2, A3, A4, A5, A6)]
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap5 for a more relaxed alternative that doesn't combine items in strict sequence.
- def zipList[A](sources: Observable[A]*): Observable[Seq[A]]
Given an observable sequence, it zips them together returning a new observable that generates sequences.
- def zipMap2[A1, A2, R](oa1: Observable[A1], oa2: Observable[A2])(f: (A1, A2) => R): Observable[R]
Creates a new observable from two observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from two observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
Visual Example
stream1: 1 - - 2 - - 3 - 4 - - stream2: 1 - - 2 - 3 - - - - 4 result: (1, 1), (2, 2), (3, 3), (4, 4)
See combineLatestMap2 for a more relaxed alternative that doesn't combine items in strict sequence.
- f
is the mapping function applied over the generated pairs
- def zipMap3[A1, A2, A3, R](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3])(f: (A1, A2, A3) => R): Observable[R]
Creates a new observable from three observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from three observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap3 for a more relaxed alternative that doesn't combine items in strict sequence.
- f
is the mapping function applied over the generated pairs
- def zipMap4[A1, A2, A3, A4, R](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4])(f: (A1, A2, A3, A4) => R): Observable[R]
Creates a new observable from four observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from four observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap4 for a more relaxed alternative that doesn't combine items in strict sequence.
- f
is the mapping function applied over the generated pairs
- def zipMap5[A1, A2, A3, A4, A5, R](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5])(f: (A1, A2, A3, A4, A5) => R): Observable[R]
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap5 for a more relaxed alternative that doesn't combine items in strict sequence.
- f
is the mapping function applied over the generated pairs
- def zipMap6[A1, A2, A3, A4, A5, A6, R](oa1: Observable[A1], oa2: Observable[A2], oa3: Observable[A3], oa4: Observable[A4], oa5: Observable[A5], oa6: Observable[A6])(f: (A1, A2, A3, A4, A5, A6) => R): Observable[R]
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
Creates a new observable from five observable sequences by combining their items in pairs in a strict sequence.
So the first item emitted by the new observable will be the result of the function applied to the first items emitted by each of the source observables; the second item emitted by the new observable will be the result of the function applied to the second items emitted by each of those observables; and so forth.
See combineLatestMap5 for a more relaxed alternative that doesn't combine items in strict sequence.
- f
is the mapping function applied over the generated pairs
Deprecated Value Members
- def concat[A](sources: Observable[A]*): Observable[A]
DEPRECATED — please switch to the concat method.
DEPRECATED — please switch to the concat method.
This deprecation was made because there's no point in having this function described both as a method and as a companion object function. In general in API design we either have both for all functions, or we have to choose.
Switch to:
Observable(list).concat
- Definition Classes
- ObservableDeprecatedBuilders
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Switch to Observable(list).concat
- def concatDelayError[A](sources: Observable[A]*): Observable[A]
DEPRECATED — please switch to the concatDelayErrors method.
DEPRECATED — please switch to the concatDelayErrors method.
This deprecation was made because there's no point in having this function described both as a method and as a companion object function. In general in API design we either have both for all functions, or we have to choose.
Switch to:
Observable(list).concatDelayErrors
- Definition Classes
- ObservableDeprecatedBuilders
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Switch to Observable(list).concatDelayErrors
- def flatten[A](sources: Observable[A]*): Observable[A]
DEPRECATED — please switch to the flatten method.
DEPRECATED — please switch to the flatten method.
This deprecation was made because there's no point in having this function described both as a method and as a companion object function. In general in API design we either have both for all functions, or we have to choose.
Switch to:
Observable(list).flatten
- Definition Classes
- ObservableDeprecatedBuilders
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Switch to Observable(list).flatten
- def flattenDelayError[A](sources: Observable[A]*): Observable[A]
DEPRECATED — please switch to the flattenDelayErrors method.
DEPRECATED — please switch to the flattenDelayErrors method.
This deprecation was made because there's no point in having this function described both as a method and as a companion object function. In general in API design we either have both for all functions, or we have to choose.
Switch to:
Observable(list).flattenDelayErrors
- Definition Classes
- ObservableDeprecatedBuilders
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Switch to Observable(list).flattenDelayErrors
- def fork[A](fa: Observable[A], scheduler: Scheduler): Observable[A]
DEPRECATED — please use .executeOn.
DEPRECATED — please use .executeOn.
The reason for the deprecation is the repurposing of the word "fork" in Task.
- Definition Classes
- ObservableDeprecatedBuilders
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Please use Observable!.executeOn
- def fork[A](fa: Observable[A]): Observable[A]
DEPRECATED — please use .executeAsync.
DEPRECATED — please use .executeAsync.
The reason for the deprecation is the repurposing of the word "fork" in Task.
- Definition Classes
- ObservableDeprecatedBuilders
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Please use Observable!.executeAsync
- def fromEval[A](fa: Eval[A]): Observable[A]
DEPRECATED — switch to Observable.from.
DEPRECATED — switch to Observable.from.
- Definition Classes
- ObservableDeprecatedBuilders
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Switch to Observable.from
- def fromIO[A](fa: IO[A]): Observable[A]
DEPRECATED — switch to Observable.from.
DEPRECATED — switch to Observable.from.
- Definition Classes
- ObservableDeprecatedBuilders
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Switch to Observable.from
- def merge[A](sources: Observable[A]*)(implicit os: OverflowStrategy[A] = OverflowStrategy.Default): Observable[A]
DEPRECATED — please switch to the merge method.
DEPRECATED — please switch to the merge method.
This deprecation was made because there's no point in having this function described both as a method and as a companion object function. In general in API design we either have both for all functions, or we have to choose.
Switch to:
Observable(list).merge
- Definition Classes
- ObservableDeprecatedBuilders
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Switch to Observable(list).merge
- def mergeDelayError[A](sources: Observable[A]*)(implicit os: OverflowStrategy[A] = OverflowStrategy.Default): Observable[A]
DEPRECATED — please switch to the merge method.
DEPRECATED — please switch to the merge method.
This deprecation was made because there's no point in having this function described both as a method and as a companion object function. In general in API design we either have both for all functions, or we have to choose.
Switch to:
Observable(list).merge
- Definition Classes
- ObservableDeprecatedBuilders
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Switch to Observable(list).merge
- def switch[A](sources: Observable[A]*): Observable[A]
DEPRECATED — please switch to the switch method.
DEPRECATED — please switch to the switch method.
This deprecation was made because there's no point in having this function described both as a method and as a companion object function. In general in API design we either have both for all functions, or we have to choose.
Switch to:
Observable(list).switch
- Definition Classes
- ObservableDeprecatedBuilders
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Switch to Observable(list).switch
This is the API documentation for the Monix library.
Package Overview
monix.execution exposes lower level primitives for dealing with asynchronous execution:
Atomic
types, as alternative tojava.util.concurrent.atomic
monix.catnap exposes pure abstractions built on top of the Cats-Effect type classes:
monix.eval is for dealing with evaluation of results, thus exposing Task and Coeval.
monix.reactive exposes the
Observable
pattern:Observable
implementationsmonix.tail exposes Iterant for purely functional pull based streaming:
Batch
andBatchCursor
, the alternatives to Scala'sIterable
andIterator
respectively that we are using within Iterant's encodingYou can control evaluation with type you choose - be it Task, Coeval, cats.effect.IO or your own as long as you provide correct cats-effect or cats typeclass instance.