Monix: Version 2.3.0, New Features, Bug Fixes

Version 2.3.0 is out now.

It is a major release that remains binary and source compatible with 2.2.x.

In summary we’ve got a replacement for Scala’s NonFatal, a new Task.deferAction builder for defering tasks with an injected scheduler, Task and Coeval run-loop optimizations, the Observable.observeOn operation after popular request, a Consumer.transformInput for transforming consumers by transforming their input stream, fixes for the reactive streams integration and more. See details below.

New Features #

Replaces Scala’s NonFatal #

Issue #349 replaces scala.util.control.NonFatal with monix.execution.misc.NonFatal.

This is because Scala’s NonFatal implementation considers these exceptions as being fatal:

  • ThreadDeath, InterruptedException (related to managing interrupting threads)
  • LinkageError (e.g. classpath problems)
  • ControlThrowable (generated by Scala for flow control)

And unfortunately by filtering exceptions with Scala’s NonFatal, this means that we cannot rely on resource handling logic. In reality, we should only care about VirtualMachineError, which includes OutOfMemoryError and StackOverflowError. And that’s what monix.execution.misc.NonFatal does.

Task.deferAction #

Issue #347 adds a Task.deferAction builder.

When building a Task from a Future, we currently have a Task.deferFutureAction utility that can inject the default Scheduler, which is useful for delaying execution, or for injecting the current time.

Task.deferAction is like Task.defer, except that it injects the default Scheduler in that logic:

def measureLatency[A](fa: Task[A]): Task[(A, Long)] =
  Task.deferAction { scheduler =>
    val start = scheduler.currentTimeMillis()
    fa.map { a => 
      val finish = scheduler.currentTimeMillis()
      (a, finish - start)
    }
  }

Task / Coeval Run-Loop Optimization for Attempt / Materialize #

PR #353 optimizes the Task and Coeval run-loop after seeing that loops such as this don’t perform so well:

(0 until 10000).foldLeft(Task.eval(())) { (acc, _) =>
  acc.materialize.map(_ => ())
}

This is part of the work we did for the new cats-effect. You can see the discussion in cats-effect#42.

The new implementation in Monix has high performance and is easy on the heap usage. Also introduced the transformWith operator (along with regular transform):

task.transformWith(a => Success(a), e => Failure(e))

Introduce Observable.observeOn #

Issue #339 introduces the observeOn operator, that allows specifying a separate Scheduler to use when consuming events from the source:

observable.executeOn(io).observeOn(computation)

This is based on the same logic as asyncBoundary. Compared with Rx.NET it introduces a buffer and an async consumer run-loop polling that buffer between the. This keeps throughput and latency good enough. And given that it uses a buffer, an overload allows for specifying an overflow strategy.

Introduce Consumer.transformInput #

Issue #348 introduces a new method on Consumer called transformInput, that takes a mapping function that works on Observable:

val source = Consumer.foldLeft[Long, Long](0L)(_ + _)

val Number = """^(\d+)$""".r

val transformed: Consumer[String, Long] = 
  source.transformInput[String] { strings => 
    strings.collect { case Number(num) => num.toLong }
  }

Full List of Changes #

  • Issue #340: Optimization of TaskSemaphore
  • Issue #349: Replace usage of scala.util.control.NonFatal in handling fatal exceptions to monix.execution.misc.NonFatal
  • Issue #347: Add Task.deferAction builder
  • Issue #339: Add Observable.observeOn method
  • Issue #338: Cancelable refs built with Cancelable.collection should use Cancelable.cancelAll in its implementation
  • Issue #350: Change BackPressure buffer implementation to be more fair and ensure that it doesn’t lose events
  • Issue #353: Refactor Coeval / Task run-loop to introduce optimized attempt / materialize implementations and add transform / transformWith methods making use of this
  • Issue #355: Add Coeval.run method
  • Issue #356: Add Coeval#attempt and Task#attempt methods
  • Issue #358: Deprecate materializeAttempt and dematerializeAttempt on Task and Coeval
  • Issue #359: Rename Coeval.Attempt#isFailure to Coeval.Attempt#isError
  • Issue #348: Add Consumer#transformInput method
  • Issue #352 / PR #361: No back-pressure when converting from org.reactivestreams.Publisher to Observable
  • Issue #362: Replace [T] generic param to [A], as a convention, everywhere
  • PR #341, PR #344, PR #346, Commit 9357ba, etc: Update dependencies (Scalaz 7.2.11, Scala 2.11.11, Scala 2.12.2, Scala.js 0.6.16)
  • Issue #354: Enable Mima and Unidoc error reporting in Travis build
  • PR #351: Specify that Monix is now a Typelevel project with full membership

Upgrading #

To use the new version, include this in your build.sbt (and use %%% for Scala.js):

libraryDependencies += "io.monix" %% "monix" % "2.3.0"

The other projects from the @Monix organization have also been upgraded to depend on this new version.

shade, the Scala Memcached client:

dependencies += "io.monix" %% "shade" % "1.9.5"

monix-kafka, the Apache Kafka integration:

// For Kafka 8
libraryDependencies += "io.monix" %% "monix-kafka-8" % "0.14"

// For Kafka 9
libraryDependencies += "io.monix" %% "monix-kafka-9" % "0.14"

// For Kafka 10
libraryDependencies += "io.monix" %% "monix-kafka-10" % "0.14"

Enjoy!