package execution
- Source
- package.scala
- Alphabetic
- By Inheritance
- execution
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Package Members
- package annotations
- package atomic
A small toolkit of classes that support compare-and-swap semantics for safe mutation of variables.
A small toolkit of classes that support compare-and-swap semantics for safe mutation of variables.
On top of the JVM, this means dealing with lock-free thread-safe programming. Also works on top of Javascript, with Scala.js, for API compatibility purposes and because it's a useful way to box a value.
The backbone of Atomic references is this method:
def compareAndSet(expect: T, update: T): Boolean
This method atomically sets a variable to the
update
value if it currently holds theexpect
value, reportingtrue
on success orfalse
on failure. The classes in this package also contain methods to get and unconditionally set values.Building a reference is easy with the provided constructor, which will automatically return the most specific type needed (in the following sample, that's an
AtomicDouble
, inheriting fromAtomicNumber[A]
):val atomicNumber = Atomic(12.2) atomicNumber.incrementAndGet() // => 13.2
These also provide useful helpers for atomically mutating of values (i.e.
transform
,transformAndGet
,getAndTransform
, etc...) or of numbers of any kind (incrementAndGet
,getAndAdd
, etc...). - package cancelables
Cancelables represent asynchronous units of work or other things scheduled for execution and whose execution can be canceled.
Cancelables represent asynchronous units of work or other things scheduled for execution and whose execution can be canceled.
One use-case is the scheduling done by monix.execution.Scheduler, in which the scheduling methods return a
Cancelable
, allowing the canceling of the scheduling.Example:
val s = ConcurrentScheduler() val task = s.scheduleRepeated(10.seconds, 50.seconds, { doSomething() }) // later, cancels the scheduling ... task.cancel()
- package exceptions
- package misc
- package rstreams
Package exposing utilities for working with the Reactive Streams specification.
- package schedulers
Type Members
- sealed abstract class Ack extends Future[Ack] with Serializable
Represents an acknowledgement of processing that a consumer sends back upstream.
Represents an acknowledgement of processing that a consumer sends back upstream. Useful to implement back-pressure.
- final class AsyncQueue[A] extends AnyRef
A high-performance, back-pressured, asynchronous queue implementation.
A high-performance, back-pressured, asynchronous queue implementation.
This is the impure, future-enabled version of monix.catnap.ConcurrentQueue.
Example
import monix.execution.Scheduler.Implicits.global val queue = AsyncQueue(capacity = 32) def producer(n: Int): CancelableFuture[Unit] = queue.offer(n).flatMap { _ => if (n >= 0) producer(n - 1) else CancelableFuture.unit } def consumer(index: Int): CancelableFuture[Unit] = queue.poll().flatMap { a => println(s"Worker $$index: $$a") }
Back-Pressuring and the Polling Model
The initialized queue can be limited to a maximum buffer size, a size that could be rounded to a power of 2, so you can't rely on it to be precise. Such a bounded queue can be initialized via AsyncQueue.bounded. Also see BufferCapacity, the configuration parameter that can be passed in the AsyncQueue.withConfig builder.
On offer, when the queue is full, the implementation back-pressures until the queue has room again in its internal buffer, the future being completed when the value was pushed successfully. Similarly poll awaits the queue to have items in it. This works for both bounded and unbounded queues.
For both
offer
andpoll
, in case awaiting a result happens, the implementation does so asynchronously, without any threads being blocked.Currently the implementation is optimized for speed. In a producer-consumer pipeline the best performance is achieved if the producer(s) and the consumer(s) do not contend for the same resources. This is why when doing asynchronous waiting for the queue to be empty or full, the implementation does so by repeatedly retrying the operation, with asynchronous boundaries and delays, until it succeeds. Fairness is ensured by the implementation.
Multi-threading Scenario
This queue support a ChannelType configuration, for fine tuning depending on the needed multi-threading scenario — and this can yield better performance:
- ChannelType.MPMC: multi-producer, multi-consumer
- ChannelType.MPSC: multi-producer, single-consumer
- ChannelType.SPMC: single-producer, multi-consumer
- ChannelType.SPSC: single-producer, single-consumer
The default is
MPMC
, because that's the safest scenario.import monix.execution.ChannelType.MPSC val queue = AsyncQueue( capacity = 64, channelType = MPSC )
WARNING: default is
MPMC
, however any other scenario implies a relaxation of the internal synchronization between threads.This means that using the wrong scenario can lead to severe concurrency bugs. If you're not sure what multi-threading scenario you have, then just stick with the default
MPMC
. - final class AsyncSemaphore extends GenericSemaphore[Cancelable]
The
AsyncSemaphore
is an asynchronous semaphore implementation that limits the parallelism onFuture
execution.The
AsyncSemaphore
is an asynchronous semaphore implementation that limits the parallelism onFuture
execution.The following example instantiates a semaphore with a maximum parallelism of 10:
val semaphore = AsyncSemaphore(maxParallelism = 10) def makeRequest(r: HttpRequest): Future[HttpResponse] = ??? // For such a task no more than 10 requests // are allowed to be executed in parallel. val future = semaphore.greenLight(() => makeRequest(???))
- final class AsyncVar[A] extends GenericVar[A, Cancelable]
Asynchronous mutable location, that is either empty or contains a value of type
A
.Asynchronous mutable location, that is either empty or contains a value of type
A
.It has these fundamental atomic operations:
- put which fills the var if empty, or waits (asynchronously) otherwise until the var is empty again (with the putByCallback overload)
- tryPut which fills the var if empty, returning
true
if it succeeded, or returning immediatelyfalse
in case the var was full and thus the operation failed - take which empties the var if full, returning the contained value, or waits (asynchronously) otherwise until there is a value to pull (with the takeByCallback overload)
- tryTake which empties the var if full, returning the
contained value immediately as
Some(a)
, or otherwise returningNone
in case the var was empty and thus the operation failed - read which reads the var if full, but without taking it from the interval var, or waits (asynchronously) until there is a value to read
- tryRead tries reading the var without modifying it in
any way; if full then returns
Some(a)
, orNone
if empty
The
AsyncVar
is appropriate for building synchronization primitives and performing simple inter-thread communications. If it helps, it's similar with aBlockingQueue(capacity = 1)
, except that it doesn't block any threads, all waiting being callback-based.Given its asynchronous, non-blocking nature, it can be used on top of Javascript as well.
This is inspired by Control.Concurrent.MVar from Haskell, except that the implementation is made to work with plain Scala futures (and is thus impure).
- sealed abstract class BufferCapacity extends Product with Serializable
Describes the capacity of internal buffers.
Describes the capacity of internal buffers.
For abstractions that use an internal buffer, like AsyncQueue, this type provides the info required to build the internal buffer.
- abstract class Callback[-E, -A] extends (Either[E, A]) => Unit
Represents a callback that should be called asynchronously with the result of a computation.
Represents a callback that should be called asynchronously with the result of a computation.
This is an
Either[E, A] => Unit
with an OOP interface that avoids extra boxing, along with overloads ofapply
.The
onSuccess
method should be called only once, with the successful result, whereasonError
should be called if the result is an error.Obviously
Callback
describes unsafe side-effects, a fact that is highlighted by the usage ofUnit
as the return type. Obviously callbacks are unsafe to use in pure code, but are necessary for describing asynchronous processes.THREAD-SAFETY: callback implementations are NOT thread-safe by contract, this depends on the implementation. Callbacks can be made easily thread-safe via wrapping with:
NOTE that callbacks injected in the Task async builders (e.g. Task.async) are thread-safe.
- trait Cancelable extends Serializable
Represents a one-time idempotent action that can be used to cancel async computations, or to release resources that active data sources are holding.
Represents a one-time idempotent action that can be used to cancel async computations, or to release resources that active data sources are holding.
It is equivalent to
java.io.Closeable
, but without the I/O focus, or toIDisposable
in Microsoft .NET, or toakka.actor.Cancellable
. - sealed abstract class CancelableFuture[+A] extends Future[A] with Cancelable
Represents an asynchronous computation that can be canceled as long as it isn't complete.
- final class CancelableFutureCatsInstances extends Monad[CancelableFuture] with StackSafeMonad[CancelableFuture] with CoflatMap[CancelableFuture] with MonadError[CancelableFuture, Throwable]
Implementation of Cats type classes for the CancelableFuture data type.
- sealed abstract class CancelablePromise[A] extends Promise[A]
CancelablePromise
is a scala.concurrent.Promise implementation that allows listeners to unsubscribe from receiving future results.CancelablePromise
is a scala.concurrent.Promise implementation that allows listeners to unsubscribe from receiving future results.It does so by:
- adding a low-level subscribe method, that allows for callbacks to be subscribed
- returning CancelableFuture in its future method implementation, allowing created future objects to unsubscribe (being the high-level subscribe that should be preferred for most usage)
Being able to unsubscribe listeners helps with avoiding memory leaks in case of listeners or futures that are being timed-out due to promises that take a long time to complete.
- sealed abstract class ChannelType extends Serializable
An enumeration of all types
- sealed abstract class ExecutionModel extends Product with Serializable
Specification for run-loops, imposed by the
Scheduler
.Specification for run-loops, imposed by the
Scheduler
.When executing tasks, a run-loop can always execute tasks asynchronously (by forking logical threads), or it can always execute them synchronously (same thread and call-stack, by using an internal trampoline), or it can do a mixed mode that executes tasks in batches before forking.
The specification is considered a recommendation for how run loops should behave, but ultimately it's up to the client to choose the best execution model. This can be related to recursive loops or to events pushed into consumers.
- final class Features extends AnyVal with Serializable
Features
describes a set of features described via bitwise operators applied to ints, but made type safe.Features
describes a set of features described via bitwise operators applied to ints, but made type safe.This is like a special purpose
BitSet
.Currently used to describe the features of Scheduler.
- trait Scheduler extends ExecutionContext with UncaughtExceptionReporter with Executor
A Scheduler is an
scala.concurrent.ExecutionContext
that additionally can schedule the execution of units of work to run with a delay or periodically.A Scheduler is an
scala.concurrent.ExecutionContext
that additionally can schedule the execution of units of work to run with a delay or periodically.- Annotations
- @implicitNotFound("Cannot find an implicit Scheduler, either " +
"import monix.execution.Scheduler.Implicits.global or use a custom one")
- trait UncaughtExceptionReporter extends Serializable
An exception reporter is a function that logs an uncaught error.
An exception reporter is a function that logs an uncaught error.
Usually taken as an implicit when executing computations that could fail, but that must not blow up the call-stack, like asynchronous tasks.
A default implicit is provided that simply logs the error on STDERR.
- Annotations
- @implicitNotFound("No ExceptionReporter was found in context for " +
"reporting uncaught errors, either build one yourself or use " +
"an implicit Scheduler (schedulers are ExceptionReporters)")
Value Members
- implicit def cancelableFutureCatsInstances(implicit ec: ExecutionContext): CancelableFutureCatsInstances
Returns the associated Cats type class instances for the CancelableFuture data type.
Returns the associated Cats type class instances for the CancelableFuture data type.
- ec
is the ExecutionContext needed in order to create the needed type class instances, since future transformations rely on an
ExecutionContext
passed explicitly (by means of an implicit parameter) on each operation
- implicit def contravariantCallback[E]: Contravariant[[β$0$]Callback[E, β$0$]]
Contravariant type class instance of Callback for Cats.
- object Ack extends Serializable
- object AsyncQueue
- object AsyncSemaphore extends Serializable
- object AsyncVar
- object BufferCapacity extends Serializable
- object Callback
- object Cancelable extends Serializable
- object CancelableFuture extends CancelableFutureForPlatform with Serializable
- object CancelableFutureCatsInstances extends Serializable
- object CancelablePromise
- object ChannelType extends Serializable
- object ExecutionModel extends Serializable
- object Features extends Serializable
- object FutureUtils extends FutureUtilsForPlatform
Utilities for Scala's standard
concurrent.Future
. - object Scheduler extends SchedulerCompanionImpl with Serializable
- object UncaughtExceptionReporter extends Serializable
- object compat
This is the API documentation for the Monix library.
Package Overview
monix.execution exposes lower level primitives for dealing with asynchronous execution:
Atomic
types, as alternative tojava.util.concurrent.atomic
monix.catnap exposes pure abstractions built on top of the Cats-Effect type classes:
monix.eval is for dealing with evaluation of results, thus exposing Task and Coeval.
monix.reactive exposes the
Observable
pattern:Observable
implementationsmonix.tail exposes Iterant for purely functional pull based streaming:
Batch
andBatchCursor
, the alternatives to Scala'sIterable
andIterator
respectively that we are using within Iterant's encodingYou can control evaluation with type you choose - be it Task, Coeval, cats.effect.IO or your own as long as you provide correct cats-effect or cats typeclass instance.