MVar
Older versions: 2.x
An MVar
is a mutable location that can be empty or contains a value,
asynchronously blocking reads when empty and blocking writes when full.
Introduction #
Use-cases:
- As synchronized, thread-safe mutable variables
- As channels, with
take
andput
acting as “receive” and “send” - As a binary semaphore, with
take
andput
acting as “acquire” and “release”
It has these fundamental, atomic operations:
put
which fills the var if empty, or blocks (asynchronously) until the var is empty againtryPut
which fills the var if empty; returnstrue
if successfultake
which empties the var if full, returning the contained value, or blocks (asynchronously) otherwise until there is a value to pulltryTake
empties if full, returnsNone
if empty.read
which reads the current value without touching it, assuming there is one, or otherwise it waits until a value is made available via puttryRead
returnsSome(a)
if full, without modifying the var, or else returnsNone
isEmpty
returnstrue
if currently empty
In this context “asynchronous blocking” means that we are not blocking any threads. Instead the implementation uses callbacks to notify clients when the operation has finished (notifications exposed by means of Task) and it thus works on top of Javascript as well.
Inspiration #
This data type is inspired by Control.Concurrent.MVar
from Haskell, introduced in the paper
Concurrent Haskell,
by Simon Peyton Jones, Andrew Gordon and Sigbjorn Finne, though some details of
their implementation are changed (in particular, a put on a full MVar
used
to error, but now merely blocks).
Appropriate for building synchronization primitives and performing simple
interthread communication, it’s the equivalent of a BlockingQueue(capacity = 1)
,
except that there’s no actual thread blocking involved and it is powered by Task
.
Cats-Effect #
MVar
is generic, being built to abstract over the effect type via the
Cats-Effect type classes, meaning
you can use it with Monix’s Task
just as well as with
cats.effect.IO
or any data types implementing Async
or Concurrent
.
Note that MVar
is already described in
cats.effect.concurrent.MVar
and Monix’s implementation does in fact implement that interface.
MVar
will remain in Monix as well because:
- it shares implementation with
monix.execution.AsyncVar,
the
Future
-enabled alternative - we can use our Atomic implementations
- at this point Monix’s
MVar
has some fixes that have to wait for the next version of Cats-Effect to be merged upstream
Use-case: Synchronized Mutable Variables #
import monix.execution.CancelableFuture
import monix.catnap.MVar
import monix.eval.Task
def sum(state: MVar[Task, Int], list: List[Int]): Task[Int] =
list match {
case Nil => state.take
case x :: xs =>
state.take.flatMap { current =>
state.put(current + x).flatMap(_ => sum(state, xs))
}
}
val task =
for {
state <- MVar[Task].of(0)
r <- sum(state, (0 until 100).toList)
} yield r
// Evaluate
task.runToFuture.foreach(println)
//=> 4950
This sample isn’t very useful, except to show how MVar
can be used
as a variable. The take
and put
operations are atomic.
The take
call will (asynchronously) block if there isn’t a value
available, whereas the call to put
blocks if the MVar
already
has a value in it waiting to be consumed.
Obviously after the call for take
and before the call for put
happens
we could have concurrent logic that can update the same variable.
While the two operations are atomic by themselves, a combination of them
isn’t atomic (i.e. atomic operations don’t compose), therefore if we want
this sample to be safe, then we need extra synchronization.
Use-case: Asynchronous Lock (Binary Semaphore, Mutex) #
The take
operation can act as “acquire” and put
can act as the “release”.
Let’s do it:
final class MLock(mvar: MVar[Task, Unit]) {
def acquire: Task[Unit] =
mvar.take
def release: Task[Unit] =
mvar.put(())
def greenLight[A](fa: Task[A]): Task[A] =
for {
_ <- acquire
a <- fa.doOnCancel(release)
_ <- release
} yield a
}
object MLock {
/** Builder. */
def apply(): Task[MLock] =
MVar[Task].of(()).map(v => new MLock(v))
}
And now we can apply synchronization to the previous example:
val task =
for {
lock <- MLock()
state <- MVar[Task].of(0)
task = sum(state, (0 until 100).toList)
r <- lock.greenLight(task)
} yield r
// Evaluate
task.runToFuture.foreach(println)
//=> 4950
Use-case: Producer/Consumer Channel #
An obvious use-case is to model a simple producer-consumer channel.
Say that you have a producer that needs to push events. But we also need some back-pressure, so we need to wait on the consumer to consume the last event before being able to generate a new event.
// Signaling option, because we need to detect completion
type Channel[A] = MVar[Task, Option[A]]
def producer(ch: Channel[Int], list: List[Int]): Task[Unit] =
list match {
case Nil =>
ch.put(None) // we are done!
case head :: tail =>
// next please
ch.put(Some(head)).flatMap(_ => producer(ch, tail))
}
def consumer(ch: Channel[Int], sum: Long): Task[Long] =
ch.take.flatMap {
case Some(x) =>
// next please
consumer(ch, sum + x)
case None =>
Task.now(sum) // we are done!
}
val count = 100000
val sumTask =
for {
channel <- MVar[Task].empty[Option[Int]]()
producerTask = producer(channel, (0 until count).toList).executeAsync
consumerTask = consumer(channel, 0L).executeAsync
// Ensure they run in parallel, not really necessary, just for kicks
sum <- Task.parMap2(producerTask, consumerTask)((_,sum) => sum)
} yield sum
// Evaluate
sumTask.runToFuture.foreach(println)
//=> 4999950000
Running this will work as expected. Our producer
pushes values
into our MVar
and our consumer
will consume all of those values.