package catnap

  1. Alphabetic
  1. Public
  2. All

Type Members

  1. trait CancelableF[F[_]] extends AnyRef

    Represents a pure data structure that describes an effectful, idempotent action that can be used to cancel async computations, or to release resources.

    Represents a pure data structure that describes an effectful, idempotent action that can be used to cancel async computations, or to release resources.

    This is the pure, higher-kinded equivalent of monix.execution.Cancelable and can be used in combination with data types meant for managing effects, like Task, Coeval or cats.effect.IO.

    Note: the F suffix comes from this data type being abstracted over F[_].

  2. final class CircuitBreaker[F[_]] extends AnyRef

    The CircuitBreaker is used to provide stability and prevent cascading failures in distributed systems.

    The CircuitBreaker is used to provide stability and prevent cascading failures in distributed systems.


    As an example, we have a web application interacting with a remote third party web service. Let's say the third party has oversold their capacity and their database melts down under load. Assume that the database fails in such a way that it takes a very long time to hand back an error to the third party web service. This in turn makes calls fail after a long period of time. Back to our web application, the users have noticed that their form submissions take much longer seeming to hang. Well the users do what they know to do which is use the refresh button, adding more requests to their already running requests. This eventually causes the failure of the web application due to resource exhaustion. This will affect all users, even those who are not using functionality dependent on this third party web service.

    Introducing circuit breakers on the web service call would cause the requests to begin to fail-fast, letting the user know that something is wrong and that they need not refresh their request. This also confines the failure behavior to only those users that are using functionality dependent on the third party, other users are no longer affected as there is no resource exhaustion. Circuit breakers can also allow savvy developers to mark portions of the site that use the functionality unavailable, or perhaps show some cached content as appropriate while the breaker is open.

    How It Works

    The circuit breaker models a concurrent state machine that can be in any of these 3 states:

    1. Closed: During normal operations or when the CircuitBreaker starts
      • Exceptions increment the failures counter
      • Successes reset the failure count to zero
      • When the failures counter reaches the maxFailures count, the breaker is tripped into Open state
    2. Open: The circuit breaker rejects all tasks with an ExecutionRejectedException
      • all tasks fail fast with ExecutionRejectedException
      • after the configured resetTimeout, the circuit breaker enters a HalfOpen state, allowing one task to go through for testing the connection
    3. HalfOpen: The circuit breaker has already allowed a task to go through, as a reset attempt, in order to test the connection
      • The first task when Open has expired is allowed through without failing fast, just before the circuit breaker is evolved into the HalfOpen state
      • All tasks attempted in HalfOpen fail-fast with an exception just as in Open state
      • If that task attempt succeeds, the breaker is reset back to the Closed state, with the resetTimeout and the failures count also reset to initial values
      • If the first call fails, the breaker is tripped again into the Open state (the resetTimeout is multiplied by the exponential backoff factor)


    import monix.catnap._
    import scala.concurrent.duration._
    // Using cats.effect.IO for this sample, but you can use any effect
    // type that integrates with Cats-Effect, including monix.eval.Task:
    import cats.effect.{Clock, IO}
    implicit val clock = Clock.create[IO]
    // Using the "unsafe" builder for didactic purposes, but prefer
    // the safe "apply" builder:
    val circuitBreaker = CircuitBreaker[IO].unsafe(
      maxFailures = 5,
      resetTimeout = 10.seconds
    val problematic = IO {
      val nr = util.Random.nextInt()
      if (nr % 2 == 0) nr else
        throw new RuntimeException("dummy")
    val task = circuitBreaker.protect(problematic)

    When attempting to close the circuit breaker and resume normal operations, we can also apply an exponential backoff for repeated failed attempts, like so:

    val exponential = CircuitBreaker[IO].of(
      maxFailures = 5,
      resetTimeout = 10.seconds,
      exponentialBackoffFactor = 2,
      maxResetTimeout = 10.minutes

    In this sample we attempt to reconnect after 10 seconds, then after 20, 40 and so on, a delay that keeps increasing up to a configurable maximum of 10 minutes.

    Sync versus Async

    The CircuitBreaker works with both Sync and Async type class instances.

    If the F[_] type used implements Async, then the CircuitBreaker gains the ability to wait for it to be closed, via awaitClose.

    Retrying Tasks

    Generally it's best if tasks are retried with an exponential back-off strategy for async tasks.

    import cats.implicits._
    import cats.effect._
    import monix.execution.exceptions.ExecutionRejectedException
    def protectWithRetry[F[_], A](task: F[A], cb: CircuitBreaker[F], delay: FiniteDuration)
      (implicit F: Async[F], timer: Timer[F]): F[A] = {
      cb.protect(task).recoverWith {
        case _: ExecutionRejectedException =>
          // Sleep, then retry
          timer.sleep(delay).flatMap(_ => protectWithRetry(task, cb, delay * 2))

    But an alternative is to wait for the precise moment at which the CircuitBreaker is closed again and you can do so via the awaitClose method:

    def protectWithRetry2[F[_], A](task: F[A], cb: CircuitBreaker[F])
      (implicit F: Async[F]): F[A] = {
      cb.protect(task).recoverWith {
        case _: ExecutionRejectedException =>
          // Waiting for the CircuitBreaker to close, then retry
          cb.awaitClose.flatMap(_ => protectWithRetry2(task, cb))

    Be careful when doing this, plan carefully, because you might end up with the "thundering herd problem".


    This Monix data type was inspired by the availability of Akka's Circuit Breaker.

  3. final class ConcurrentQueue[F[_], A] extends Serializable

    A high-performance, back-pressured, generic concurrent queue implementation.

    A high-performance, back-pressured, generic concurrent queue implementation.

    This is the pure and generic version of monix.execution.AsyncQueue.


    import cats.implicits._
    import cats.effect._
    // For being able to do IO.start
    implicit val cs = global.contextShift[IO]
    // We need a `Timer` for this to work
    implicit val timer = global.timer[IO]
    def consumer(queue: ConcurrentQueue[IO, Int], index: Int): IO[Unit] =
      queue.poll.flatMap { a =>
        println(s"Worker $$index: $$a")
        consumer(queue, index)
    for {
      queue     <- ConcurrentQueue[IO].bounded[Int](capacity = 32)
      consumer1 <- consumer(queue, 1).start
      consumer2 <- consumer(queue, 1).start
      // Pushing some samples
      _         <- queue.offer(1)
      _         <- queue.offer(2)
      _         <- queue.offer(3)
      // Stopping the consumer loops
      _         <- consumer1.cancel
      _         <- consumer2.cancel
    } yield ()

    Back-Pressuring and the Polling Model

    The initialized queue can be limited to a maximum buffer size, a size that could be rounded to a power of 2, so you can't rely on it to be precise. Such a bounded queue can be initialized via ConcurrentQueue.bounded. Also see BufferCapacity, the configuration parameter that can be passed in the ConcurrentQueue.custom builder.

    On offer, when the queue is full, the implementation back-pressures until the queue has room again in its internal buffer, the future being completed when the value was pushed successfully. Similarly poll awaits the queue to have items in it. This works for both bounded and unbounded queues.

    For both offer and poll, in case awaiting a result happens, the implementation does so asynchronously, without any threads being blocked.

    Currently the implementation is optimized for speed. In a producer-consumer pipeline the best performance is achieved if the producer(s) and the consumer(s) do not contend for the same resources. This is why when doing asynchronous waiting for the queue to be empty or full, the implementation does so by repeatedly retrying the operation, with asynchronous boundaries and delays, until it succeeds. Fairness is ensured by the implementation.

    Multi-threading Scenario

    This queue support a ChannelType configuration, for fine tuning depending on the needed multi-threading scenario. And this can yield better performance:

    • MPMC: multi-producer, multi-consumer
    • MPSC: multi-producer, single-consumer
    • SPMC: single-producer, multi-consumer
    • SPSC: single-producer, single-consumer

    The default is MPMC, because that's the safest scenario.

    import monix.execution.ChannelType.MPSC
    import monix.execution.BufferCapacity.Bounded
    val queue = ConcurrentQueue[IO].custom[Int](
      capacity = Bounded(128),
      channelType = MPSC

    WARNING: default is MPMC, however any other scenario implies a relaxation of the internal synchronization between threads.

    This means that using the wrong scenario can lead to severe concurrency bugs. If you're not sure what multi-threading scenario you have, then just stick with the default MPMC.

  4. trait FutureLift[F[_], Future[_]] extends AnyRef

    A type class for conversions from scala.concurrent.Future or other Future-like data type (e.g.

    A type class for conversions from scala.concurrent.Future or other Future-like data type (e.g. Java's CompletableFuture).

    N.B. to use its syntax, you can import monix.catnap.syntax:

    import monix.catnap.syntax._
    import scala.concurrent.Future
    // Used here only for Future.apply as the ExecutionContext
    // Can use any data type implementing Async or Concurrent
    import cats.effect.IO
    val io = IO(Future(1 + 1)).futureLift

    IO provides its own IO.fromFuture of course, however FutureLift is generic and works with CancelableFuture as well.

    import monix.execution.{CancelableFuture, Scheduler, FutureUtils}
    import scala.concurrent.Promise
    import scala.concurrent.duration._
    import scala.util.Try
    def delayed[A](event: => A)(implicit s: Scheduler): CancelableFuture[A] = {
      val p = Promise[A]()
      val c = s.scheduleOnce(1.second) { p.complete(Try(event)) }
      CancelableFuture(p.future, c)
    // The result will be cancelable:
    val sum: IO[Int] = IO(delayed(1 + 1)).futureLift
  5. final class MVar[F[_], A] extends cats.effect.concurrent.MVar[F, A]

    A mutable location, that is either empty or contains a value of type A.

    A mutable location, that is either empty or contains a value of type A.

    It has the following fundamental atomic operations:

    • put which fills the var if empty, or blocks (asynchronously) until the var is empty again
    • tryPut which fills the var if empty. returns true if successful
    • take which empties the var if full, returning the contained value, or blocks (asynchronously) otherwise until there is a value to pull
    • tryTake empties if full, returns None if empty.
    • read which reads the current value without touching it, assuming there is one, or otherwise it waits until a value is made available via put
    • tryRead returns Some(a) if full, without modifying the var, or else returns None
    • isEmpty returns true if currently empty

    The MVar is appropriate for building synchronization primitives and performing simple inter-thread communications. If it helps, it's similar with a BlockingQueue(capacity = 1), except that it is pure and that doesn't block any threads, all waiting being done asynchronously.

    Given its asynchronous, non-blocking nature, it can be used on top of Javascript as well.

    N.B. this is a reimplementation of the interface exposed in Cats-Effect, see: cats.effect.concurrent.MVar

    Inspired by Control.Concurrent.MVar from Haskell.

  6. sealed trait OrElse[+A, +B] extends AnyRef

    A type class for prioritized implicit search.

    A type class for prioritized implicit search.

    Useful for specifying type class instance alternatives. Examples:

    • Async[F] OrElse Sync[F]
    • Concurrent[F] OrElse Async[F]

    Inspired by the implementations in Shapeless and Algebra.

  7. final class Semaphore[F[_]] extends cats.effect.concurrent.Semaphore[F]

    The Semaphore is an asynchronous semaphore implementation that limits the parallelism on task execution.

    The Semaphore is an asynchronous semaphore implementation that limits the parallelism on task execution.

    The following example instantiates a semaphore with a maximum parallelism of 10:

    import cats.implicits._
    import cats.effect.IO
    // Needed for ContextShift[IO]
    import monix.execution.Scheduler
    implicit val cs = IO.contextShift(
    // Dummies for didactic purposes
    case class HttpRequest()
    case class HttpResponse()
    def makeRequest(r: HttpRequest): IO[HttpResponse] = IO(???)
    for {
      semaphore <- Semaphore[IO](provisioned = 10)
      tasks = for (_ <- 0 until 1000) yield {
      // Execute in parallel; note that due to the `semaphore`
      // no more than 10 tasks will be allowed to execute in parallel
      _ <- tasks.toList.parSequence
    } yield ()


    Semaphore is now implementing cats.effect.Semaphore, deprecating the old Monix TaskSemaphore.

    The changes to the interface and some implementation details are inspired by the implementation in Cats-Effect, which was ported from FS2.

Value Members

  1. object CancelableF
  2. object CircuitBreaker extends CircuitBreakerDocs
  3. object ConcurrentQueue extends Serializable

  4. object FutureLift extends FutureLiftForPlatform
  5. object MVar
  6. object OrElse extends OrElse0
  7. object Semaphore
  8. object syntax