Monix: Version 2.3.0, New Features, Bug Fixes
Version 2.3.0
is out now.
It is a major release that remains binary and source compatible with
2.2.x
.
In summary we’ve got a replacement for Scala’s NonFatal
,
a new Task.deferAction
builder for defering tasks with an injected scheduler,
Task
and Coeval
run-loop optimizations, the Observable.observeOn
operation after popular request, a Consumer.transformInput
for
transforming consumers by transforming their input stream,
fixes for the reactive streams integration and more.
See details below.
New Features #
Replaces Scala’s NonFatal #
Issue #349
replaces scala.util.control.NonFatal
with monix.execution.misc.NonFatal
.
This is because Scala’s NonFatal
implementation considers these
exceptions as being fatal:
ThreadDeath
,InterruptedException
(related to managing interrupting threads)LinkageError
(e.g. classpath problems)ControlThrowable
(generated by Scala for flow control)
And unfortunately by filtering exceptions with Scala’s NonFatal
,
this means that we cannot rely on resource handling logic.
In reality, we should only care about VirtualMachineError
,
which includes OutOfMemoryError
and StackOverflowError
.
And that’s what monix.execution.misc.NonFatal
does.
Task.deferAction #
Issue #347
adds a Task.deferAction
builder.
When building a Task
from a Future
, we currently have a
Task.deferFutureAction
utility that can inject the default
Scheduler
, which is useful for delaying execution, or for
injecting the current time.
Task.deferAction
is like Task.defer
, except that it injects
the default Scheduler
in that logic:
def measureLatency[A](fa: Task[A]): Task[(A, Long)] =
Task.deferAction { scheduler =>
val start = scheduler.currentTimeMillis()
fa.map { a =>
val finish = scheduler.currentTimeMillis()
(a, finish - start)
}
}
Task / Coeval Run-Loop Optimization for Attempt / Materialize #
PR #353 optimizes the
Task
and Coeval
run-loop after seeing that loops such as this
don’t perform so well:
(0 until 10000).foldLeft(Task.eval(())) { (acc, _) =>
acc.materialize.map(_ => ())
}
This is part of the work we did for the new cats-effect
.
You can see the discussion in
cats-effect#42.
The new implementation in Monix has high performance and is easy on the
heap usage. Also introduced the transformWith
operator (along with
regular transform
):
task.transformWith(a => Success(a), e => Failure(e))
Introduce Observable.observeOn #
Issue #339 introduces
the observeOn
operator, that allows specifying a separate
Scheduler
to use when consuming events from the source:
observable.executeOn(io).observeOn(computation)
This is based on the same logic as asyncBoundary
. Compared
with Rx.NET it introduces a buffer and an async consumer run-loop
polling that buffer between the. This keeps throughput and latency
good enough. And given that it uses a buffer, an overload allows
for specifying an overflow strategy.
Introduce Consumer.transformInput #
Issue #348
introduces a new method on Consumer
called transformInput
, that
takes a mapping function that works on Observable
:
val source = Consumer.foldLeft[Long, Long](0L)(_ + _)
val Number = """^(\d+)$""".r
val transformed: Consumer[String, Long] =
source.transformInput[String] { strings =>
strings.collect { case Number(num) => num.toLong }
}
Full List of Changes #
- Issue #340:
Optimization of
TaskSemaphore
- Issue #349:
Replace usage of
scala.util.control.NonFatal
in handling fatal exceptions tomonix.execution.misc.NonFatal
- Issue #347:
Add
Task.deferAction
builder - Issue #339:
Add
Observable.observeOn
method - Issue #338:
Cancelable
refs built withCancelable.collection
should useCancelable.cancelAll
in its implementation - Issue #350:
Change
BackPressure
buffer implementation to be more fair and ensure that it doesn’t lose events - Issue #353:
Refactor
Coeval
/Task
run-loop to introduce optimizedattempt
/materialize
implementations and addtransform
/transformWith
methods making use of this - Issue #355:
Add
Coeval.run
method - Issue #356:
Add
Coeval#attempt
andTask#attempt
methods - Issue #358:
Deprecate
materializeAttempt
anddematerializeAttempt
onTask
andCoeval
- Issue #359:
Rename
Coeval.Attempt#isFailure
toCoeval.Attempt#isError
- Issue #348:
Add
Consumer#transformInput
method - Issue #352 /
PR #361:
No back-pressure when converting from
org.reactivestreams.Publisher
toObservable
- Issue #362:
Replace
[T]
generic param to[A]
, as a convention, everywhere - PR #341, PR #344, PR #346, Commit 9357ba, etc: Update dependencies (Scalaz 7.2.11, Scala 2.11.11, Scala 2.12.2, Scala.js 0.6.16)
- Issue #354: Enable Mima and Unidoc error reporting in Travis build
- PR #351: Specify that Monix is now a Typelevel project with full membership
Upgrading #
To use the new version, include this in your build.sbt
(and use
%%%
for Scala.js):
libraryDependencies += "io.monix" %% "monix" % "2.3.0"
The other projects from the @Monix organization have also been upgraded to depend on this new version.
shade, the Scala Memcached client:
dependencies += "io.monix" %% "shade" % "1.9.5"
monix-kafka, the Apache Kafka integration:
// For Kafka 8
libraryDependencies += "io.monix" %% "monix-kafka-8" % "0.14"
// For Kafka 9
libraryDependencies += "io.monix" %% "monix-kafka-9" % "0.14"
// For Kafka 10
libraryDependencies += "io.monix" %% "monix-kafka-10" % "0.14"
Enjoy!