Packages

p

monix

execution

package execution

Ordering
  1. Alphabetic
Visibility
  1. Public
  2. All

Type Members

  1. sealed abstract class Ack extends Future[Ack] with Serializable

    Represents an acknowledgement of processing that a consumer sends back upstream.

    Represents an acknowledgement of processing that a consumer sends back upstream. Useful to implement back-pressure.

  2. final class AsyncQueue[A] extends AnyRef

    A high-performance, back-pressured, asynchronous queue implementation.

    A high-performance, back-pressured, asynchronous queue implementation.

    This is the impure, future-enabled version of monix.catnap.ConcurrentQueue.

    Example

    import monix.execution.Scheduler.Implicits.global
    
    val queue = AsyncQueue(capacity = 32)
    
    def producer(n: Int): CancelableFuture[Unit] =
      queue.offer(n).flatMap { _ =>
        if (n >= 0) producer(n - 1)
        else CancelableFuture.unit
      }
    
    def consumer(index: Int): CancelableFuture[Unit] =
      queue.poll().flatMap { a =>
        println(s"Worker $$index: $$a")
      }

    Back-Pressuring and the Polling Model

    The initialized queue can be limited to a maximum buffer size, a size that could be rounded to a power of 2, so you can't rely on it to be precise. Such a bounded queue can be initialized via AsyncQueue.bounded. Also see BufferCapacity, the configuration parameter that can be passed in the AsyncQueue.custom builder.

    On offer, when the queue is full, the implementation back-pressures until the queue has room again in its internal buffer, the future being completed when the value was pushed successfully. Similarly poll awaits the queue to have items in it. This works for both bounded and unbounded queues.

    For both offer and poll, in case awaiting a result happens, the implementation does so asynchronously, without any threads being blocked.

    Currently the implementation is optimized for speed. In a producer-consumer pipeline the best performance is achieved if the producer(s) and the consumer(s) do not contend for the same resources. This is why when doing asynchronous waiting for the queue to be empty or full, the implementation does so by repeatedly retrying the operation, with asynchronous boundaries and delays, until it succeeds. Fairness is ensured by the implementation.

    Multi-threading Scenario

    This queue support a ChannelType configuration, for fine tuning depending on the needed multi-threading scenario — and this can yield better performance:

    The default is MPMC, because that's the safest scenario.

    import monix.execution.ChannelType.MPSC
    
    val queue = AsyncQueue(
      capacity = 64,
      channelType = MPSC
    )

    WARNING: default is MPMC, however any other scenario implies a relaxation of the internal synchronization between threads.

    This means that using the wrong scenario can lead to severe concurrency bugs. If you're not sure what multi-threading scenario you have, then just stick with the default MPMC.

  3. final class AsyncSemaphore extends GenericSemaphore[Cancelable]

    The AsyncSemaphore is an asynchronous semaphore implementation that limits the parallelism on Future execution.

    The AsyncSemaphore is an asynchronous semaphore implementation that limits the parallelism on Future execution.

    The following example instantiates a semaphore with a maximum parallelism of 10:

    val semaphore = AsyncSemaphore(maxParallelism = 10)
    
    def makeRequest(r: HttpRequest): Future[HttpResponse] = ???
    
    // For such a task no more than 10 requests
    // are allowed to be executed in parallel.
    val future = semaphore.greenLight(() => makeRequest(???))
  4. final class AsyncVar[A] extends GenericVar[A, Cancelable]

    Asynchronous mutable location, that is either empty or contains a value of type A.

    Asynchronous mutable location, that is either empty or contains a value of type A.

    It has these fundamental atomic operations:

    • put which fills the var if empty, or waits (asynchronously) otherwise until the var is empty again (with the putByCallback overload)
    • tryPut which fills the var if empty, returning true if it succeeded, or returning immediately false in case the var was full and thus the operation failed
    • take which empties the var if full, returning the contained value, or waits (asynchronously) otherwise until there is a value to pull (with the takeByCallback overload)
    • tryTake which empties the var if full, returning the contained value immediately as Some(a), or otherwise returning None in case the var was empty and thus the operation failed
    • read which reads the var if full, but without taking it from the interval var, or waits (asynchronously) until there is a value to read
    • tryRead tries reading the var without modifying it in any way; if full then returns Some(a), or None if empty

    The AsyncVar is appropriate for building synchronization primitives and performing simple inter-thread communications. If it helps, it's similar with a BlockingQueue(capacity = 1), except that it doesn't block any threads, all waiting being callback-based.

    Given its asynchronous, non-blocking nature, it can be used on top of Javascript as well.

    This is inspired by Control.Concurrent.MVar from Haskell, except that the implementation is made to work with plain Scala futures (and is thus impure).

  5. sealed abstract class BufferCapacity extends Product with Serializable

    Describes the capacity of internal buffers.

    Describes the capacity of internal buffers.

    For abstractions that use an internal buffer, like AsyncQueue, this type provides the info required to build the internal buffer.

  6. abstract class Callback[-E, -A] extends (Either[E, A]) ⇒ Unit

    Represents a callback that should be called asynchronously with the result of a computation.

    Represents a callback that should be called asynchronously with the result of a computation.

    This is an Either[E, A] => Unit with an OOP interface that avoids extra boxing, along with overloads of apply.

    The onSuccess method should be called only once, with the successful result, whereas onError should be called if the result is an error.

    Obviously Callback describes unsafe side-effects, a fact that is highlighted by the usage of Unit as the return type. Obviously callbacks are unsafe to use in pure code, but are necessary for describing asynchronous processes.

  7. trait Cancelable extends Serializable

    Represents a one-time idempotent action that can be used to cancel async computations, or to release resources that active data sources are holding.

    Represents a one-time idempotent action that can be used to cancel async computations, or to release resources that active data sources are holding.

    It is equivalent to java.io.Closeable, but without the I/O focus, or to IDisposable in Microsoft .NET, or to akka.actor.Cancellable.

  8. sealed abstract class CancelableFuture[+A] extends Future[A] with Cancelable

    Represents an asynchronous computation that can be canceled as long as it isn't complete.

  9. sealed abstract class CancelablePromise[A] extends Promise[A]

    CancelablePromise is a scala.concurrent.Promise implementation that allows listeners to unsubscribe from receiving future results.

    CancelablePromise is a scala.concurrent.Promise implementation that allows listeners to unsubscribe from receiving future results.

    It does so by:

    • adding a low-level subscribe method, that allows for callbacks to be subscribed
    • returning CancelableFuture in its future method implementation, allowing created future objects to unsubscribe (being the high-level subscribe that should be preferred for most usage)

    Being able to unsubscribe listeners helps with avoiding memory leaks in case of listeners or futures that are being timed-out due to promises that take a long time to complete.

    See also

    subscribe

    future

  10. sealed abstract class ChannelType extends Serializable

    An enumeration of all types

  11. sealed abstract class ExecutionModel extends Product with Serializable

    Specification for run-loops, imposed by the Scheduler.

    Specification for run-loops, imposed by the Scheduler.

    When executing tasks, a run-loop can always execute tasks asynchronously (by forking logical threads), or it can always execute them synchronously (same thread and call-stack, by using an internal trampoline), or it can do a mixed mode that executes tasks in batches before forking.

    The specification is considered a recommendation for how run loops should behave, but ultimately it's up to the client to choose the best execution model. This can be related to recursive loops or to events pushed into consumers.

  12. trait Scheduler extends ExecutionContext with UncaughtExceptionReporter with Executor

    A Scheduler is an scala.concurrent.ExecutionContext that additionally can schedule the execution of units of work to run with a delay or periodically.

    A Scheduler is an scala.concurrent.ExecutionContext that additionally can schedule the execution of units of work to run with a delay or periodically.

    Annotations
    @implicitNotFound( ... )
  13. trait UncaughtExceptionReporter extends Serializable

    An exception reporter is a function that logs an uncaught error.

    An exception reporter is a function that logs an uncaught error.

    Usually taken as an implicit when executing computations that could fail, but that must not blow up the call-stack, like asynchronous tasks.

    A default implicit is provided that simply logs the error on STDERR.

    Annotations
    @implicitNotFound( ... )

Value Members

  1. object Ack extends Serializable
  2. object AsyncQueue
  3. object AsyncSemaphore extends Serializable
  4. object AsyncVar
  5. object BufferCapacity extends Serializable
  6. object Callback
  7. object Cancelable extends Serializable
  8. object CancelableFuture extends CancelableFutureForPlatform with Serializable
  9. object CancelablePromise
  10. object ChannelType extends Serializable
  11. object ExecutionModel extends Serializable
  12. object FutureUtils extends FutureUtilsForPlatform

    Utilities for Scala's standard concurrent.Future.

  13. object Scheduler extends SchedulerCompanionImpl with Serializable
  14. object UncaughtExceptionReporter extends Serializable

    See UncaughtExceptionReporter.

Ungrouped