Parallel Processing

You are viewing the documentation for the older Monix 2.x series.
For the latest version: see here!

Monix provides multiple ways for achieving parallelism, depending on use-case.

The samples in this document are copy/paste-able, but to get the imports out of the way:

// On evaluation a Scheduler is needed
import monix.execution.Scheduler.Implicits.global
// For Task
import monix.eval._
// For Observable
import monix.reactive._

Parallelism with Task #

We can do parallel execution in batches, that does deterministic (ordered) signaling of results with the help of Task.

The Naive Way #

The following example uses Task.gather, which does parallel processing while preserving result ordering, but in order to ensure that parallel processing actually happens, the tasks need to be effectively asynchronous, which for simple functions need to fork threads, hence the usage of Task.apply, although remember that you can apply Task.fork to any task.

val items = 0 until 1000

// The list of all tasks needed for execution
val tasks = items.map(i => Task(i * 2))
// Processing in parallel
val aggregate = Task.gather(tasks).map(_.toList)

// Evaluation:
aggregate.foreach(println)
//=> List(0, 2, 4, 6, 8, 10, 12, 14, 16,...

If ordering of results does not matter, you can also use Task.gatherUnordered instead of gather, which might yield better results, given its non-blocking execution.

Imposing a Parallelism Limit #

The Task.gather builder, as exemplified above, will potentially execute all given tasks in parallel, the problem being that this can lead to inefficiency. For example we might be doing HTTP requests and starting 10000 HTTP requests in parallel is not necessarily wise as it can choke the server on the other end.

To solve this we can split the workload in batches of parallel tasks that are then sequenced:

val items = 0 until 1000
// The list of all tasks needed for execution
val tasks = items.map(i => Task(i * 2))
// Building batches of 10 tasks to execute in parallel:
val batches = tasks.sliding(10,10).map(b => Task.gather(b))
// Sequencing batches, then flattening the final result
val aggregate = Task.sequence(batches).map(_.flatten.toList)

// Evaluation:
aggregate.foreach(println)
//=> List(0, 2, 4, 6, 8, 10, 12, 14, 16,...

Note how this strategy is difficult to achieve with Scala’s Future because even though we have Future.sequence, its behavior is strict and is thus not able to differentiate well between sequencing and parallelism, this behavior being controlled by passing a lazy or a strict sequence to Future.sequence, which is obviously error-prone.

Batched Observables #

We can also combine this with Observable.flatMap for doing requests in batches:

import monix.eval._
import monix.reactive._

// The `bufferIntrospective` will do buffering, up to a certain
// `bufferSize`, for as long as the downstream is busy and then
// stream a whole sequence of all buffered events at once
val source = Observable.range(0,1000).bufferIntrospective(256)

// Processing in batches, powered by `Task`
val batched = source.flatMap { items =>
  // The list of all tasks needed for execution
  val tasks = items.map(i => Task(i * 2))
  // Building batches of 10 tasks to execute in parallel:
  val batches = tasks.sliding(10,10).map(b => Task.gather(b))
  // Sequencing batches, then flattening the final result
  val aggregate = Task.sequence(batches).map(_.flatten)
  // Converting into an observable, needed for flatMap
  Observable.fromTask(aggregate)
    .flatMap(i => Observable.fromIterator(i))
}

// Evaluation:
batched.toListL.foreach(println)
//=> List(0, 2, 4, 6, 8, 10, 12, 14, 16,...

Note the use of bufferIntrospective, which buffers incoming events while the downstream is busy, after which it emits the buffer as a single bundle. The bufferTumbling operator can be a more deterministic alternative.

Observable.mapAsync #

Another way to achieve parallelism is to use the Observable.mapAsync operator:

val source = Observable.range(0,1000)
// The parallelism factor needs to be specified
val processed = source.mapAsync(parallelism = 10) { i =>
  Task(i * 2)
}

// Evaluation:
processed.toListL.foreach(println)
//=> List(2, 10, 0, 4, 8, 6, 12...

Compared with using Task.gather as exemplified above, this operator does not maintain ordering of results as signaled by the source.

This leads to a more efficient execution, because the source doesn’t get back-pressured for as long as there’s at least one worker active, whereas with the batched execution strategy exemplified above we can have inefficiencies due to a single async task that takes too long to complete.

Observable.mergeMap #

If Observable.mapAsync works with Task, then Observable.mergeMap works by merging Observable instances.

val source = Observable.range(0,1000)
// The parallelism factor needs to be specified
val processed = source.mergeMap { i =>
  Observable.fork(Observable.eval(i * 2))
}

// Evaluation:
processed.toListL.foreach(println)
//=> List(0, 4, 6, 2, 8, 10, 12, 14...

Note that mergeMap is similar with concatMap (aliased by flatMap in Monix), except that the observable streams emitted by the source get subscribed in parallel and thus the result is non-deterministic.

Note that this mergeMap call, as exemplified above, does not have an optional parallelism parameter, which means that if the source is chatty, we can end up with a lot of observables subscribed in parallel. The issue is that the mergeMap operator is not meant for actual processing in parallel, but for joining active, concurrent streams.

Consumer.loadBalancer #

We can apply a mapAsync like operation on the consumer side, as exemplified in the Consumer tutorial, by means of a load-balanced consumer, being able to do a final aggregate of the results of all workers:

import monix.eval._
import monix.reactive._

// A consumer that folds over the elements of the stream,
// producing a sum as a result
val sumConsumer = Consumer.foldLeft[Long,Long](0L)(_ + _)

// For processing sums in parallel, useless of course, but can become 
// really helpful for logic sprinkled with I/O bound stuff
val loadBalancer = {
  Consumer
    .loadBalance(parallelism=10, sumConsumer)
    .map(_.sum)
}

val observable: Observable[Long] = Observable.range(0, 100000)
// Our consumer turns our observable into a Task processing sums, w00t!
val task: Task[Long] = observable.consumeWith(loadBalancer)

// Consume the whole stream and get the result
task.runAsync.foreach(println)
//=> 4999950000

Read the Consumer document for more details.