Monix: Version 2.3.0, New Features, Bug Fixes

Version 2.3.0 is out now.

It is a major release that remains binary and source compatible with 2.2.x.

In summary we’ve got a replacement for Scala’s NonFatal, a new Task.deferAction builder for defering tasks with an injected scheduler, Task and Coeval run-loop optimizations, the Observable.observeOn operation after popular request, a Consumer.transformInput for transforming consumers by transforming their input stream, fixes for the reactive streams integration and more. See details below.

New Features

Replaces Scala’s NonFatal

Issue #349 replaces scala.util.control.NonFatal with monix.execution.misc.NonFatal.

This is because Scala’s NonFatal implementation considers these exceptions as being fatal:

And unfortunately by filtering exceptions with Scala’s NonFatal, this means that we cannot rely on resource handling logic. In reality, we should only care about VirtualMachineError, which includes OutOfMemoryError and StackOverflowError. And that’s what monix.execution.misc.NonFatal does.


Issue #347 adds a Task.deferAction builder.

When building a Task from a Future, we currently have a Task.deferFutureAction utility that can inject the default Scheduler, which is useful for delaying execution, or for injecting the current time.

Task.deferAction is like Task.defer, except that it injects the default Scheduler in that logic:

def measureLatency[A](fa: Task[A]): Task[(A, Long)] =
  Task.deferAction { scheduler =>
    val start = scheduler.currentTimeMillis() { a => 
      val finish = scheduler.currentTimeMillis()
      (a, finish - start)

Task / Coeval Run-Loop Optimization for Attempt / Materialize

PR #353 optimizes the Task and Coeval run-loop after seeing that loops such as this don’t perform so well:

(0 until 10000).foldLeft(Task.eval(())) { (acc, _) => => ())

This is part of the work we did for the new cats-effect. You can see the discussion in cats-effect#42.

The new implementation in Monix has high performance and is easy on the heap usage. Also introduced the transformWith operator (along with regular transform):

task.transformWith(a => Success(a), e => Failure(e))

Introduce Observable.observeOn

Issue #339 introduces the observeOn operator, that allows specifying a separate Scheduler to use when consuming events from the source:


This is based on the same logic as asyncBoundary. Compared with Rx.NET it introduces a buffer and an async consumer run-loop polling that buffer between the producer and the consumer(s). This keeps throughput and latency good enough. And given that it uses a buffer, an overload allows for specifying an overflow strategy.

Introduce Consumer.transformInput

Issue #348 introduces a new method on Consumer called transformInput, that takes a mapping function that works on Observable:

val source = Consumer.foldLeft[Long, Long](0L)(_ + _)

val Number = """^(\d+)$""".r

val transformed: Consumer[String, Long] = 
  source.transformInput[String] { strings => 
    strings.collect { case Number(num) => num.toLong }

Full List of Changes


To use the new version, include this in your build.sbt (and use %%% for Scala.js):

libraryDependencies += "io.monix" %% "monix" % "2.3.0"

The other projects from the @Monix organization have also been upgraded to depend on this new version.

shade, the Scala Memcached client:

dependencies += "io.monix" %% "shade" % "1.9.5"

monix-kafka, the Apache Kafka integration:

// For Kafka 8
libraryDependencies += "io.monix" %% "monix-kafka-8" % "0.14"

// For Kafka 9
libraryDependencies += "io.monix" %% "monix-kafka-9" % "0.14"

// For Kafka 10
libraryDependencies += "io.monix" %% "monix-kafka-10" % "0.14"