final class ConcurrentChannel[F[_], E, A] extends ProducerF[F, E, A] with ChannelF[F, E, A]
ConcurrentChannel
can be used to model complex producer-consumer communication channels.
It exposes these fundamental operations:
- push for pushing single events to consumers (producer side)
- pushMany for pushing event sequences to consumers (producer side)
- halt for pushing the final completion event to all consumers (producer side)
- consume for creating a ConsumerF value that can consume the incoming events from the channel
Example
import cats.implicits._ import cats.effect._ import monix.execution.Scheduler.global // For being able to do IO.start implicit val cs = SchedulerEffect.contextShift[IO](global) // We need a `Timer` for this to work implicit val timer = SchedulerEffect.timer[IO](global) // Completion event sealed trait Complete object Complete extends Complete def logLines(consumer: ConsumerF[IO, Complete, String], index: Int): IO[Unit] = consumer.pull.flatMap { case Right(message) => IO(println("Worker $$index: $$message")) // continue loop .flatMap(_ => logLines(consumer, index)) case Left(Complete) => IO(println("Worker $$index is done!")) } for { channel <- ConcurrentChannel[IO].of[Complete, String] // Workers 1 & 2, sharing the load between them task_1_2 = channel.consume.use { ref => (logLines(ref, 1), logLines(ref, 2)).parSequence_ } consumers_1_2 <- task_1_2.start // fiber // Workers 3 & 4, receiving the same events as workers 1 & 2, // but sharing the load between them task_3_4 = channel.consume.use { ref => (logLines(ref, 3), logLines(ref, 4)).parSequence_ } consumers_3_4 <- task_3_4.start // fiber // Pushing some samples _ <- channel.push("Hello, ") _ <- channel.push("World!") // Signal there are no more events _ <- channel.halt(Complete) // Await for the completion of the consumers _ <- consumers_1_2.join _ <- consumers_3_4.join } yield ()
Unicasting vs Broadcasting vs Multicasting
Unicasting: A communication channel between one producer and one ConsumerF. Multiple workers can share the load of processing incoming events. For example in case we want to have 8 workers running in parallel, you can create one ConsumerF, via consume and then use it for multiple workers. Internally this setup uses a single queue and whatever workers you have will share it.
Broadcasting: the same events can be sent to multiple consumers,
thus duplicating the load, as a broadcasting setup can be created
by creating and consuming from multiple ConsumerF via multiple calls
to consume. Internally each ConsumerF
gets its own queue and hence
messages are duplicated.
Multicasting: multiple producers can push events at the same time, provided the channel's type is configured as a MultiProducer.
Back-Pressuring and the Polling Model
When consumers get created via consume, a buffer gets created and assigned per consumer.
Depending on what the BufferCapacity is configured to be, the initialized consumer can work with a maximum buffer size, a size that could be rounded to a power of 2, so you can't rely on it to be precise. See consumeWithConfig for customizing this buffer on a per-consumer basis, or the ConcurrentChannel.withConfig builder for setting the default used in consume.
On push, when the queue is full, the implementation back-pressures until the channel has room again in its internal buffer(s), the task being completed when the value was pushed successfully. Similarly ConsumerF.pull (returned by consume) awaits the channel to have items in it. This works for both bounded and unbounded channels.
For both push
and pull
, in case awaiting a result happens, the
implementation does so asynchronously, without any threads being blocked.
Multi-threading Scenario
This channel supports the fine-tuning of the concurrency scenario via ChannelType.ProducerSide (see ConcurrentChannel.withConfig) and the ChannelType.ConsumerSide that can be specified per consumer (see consumeWithConfig).
The default is set to MultiProducer and MultiConsumer, which is always the safe choice, however these can be customized for better performance.
These scenarios are available:
- MPMC: multi-producer, multi-consumer, when MultiProducer is selected on the channel's creation and MultiConsumer is selected when consuming; this is the safe scenario and should be used as the default, especially when in doubt
- MPSC: multi-producer, single-consumer, when MultiProducer is selected on the channel's creation and SingleConsumer is selected when consuming; this scenario should be selected when there are multiple producers, but a single worker that consumes data sequentially (per ConsumerF); note that this means a single worker per ConsumerF instance, but you can still have multiple ConsumerF instances created, , because each ConsumerF gets its own buffer anyway
- SPMC: single-producer, multi-consumer, when SingleProducer is selected on the channel's creation and MultiConsumer is selected when consuming; this scenario should be selected when there are multiple workers processing data in parallel (e.g. pulling from the same ConsumerF), but a single producer that pushes data on the channel sequentially
- SPSC: single-producer, single-consumer, when SingleProducer is selected on the channel's creation and SingleConsumer is selected when consuming; this scenario should be selected when there is a single producer that pushes data on the channel sequentially and a single worker per ConsumerF instance that pulls data from the channel sequentially; note you can still have multiple ConsumerF instances running in parallel, because each ConsumerF gets its own buffer anyway
The default is MPMC
, because that's the safest scenario.
import cats.implicits._ import cats.effect.IO import monix.execution.ChannelType.{SingleProducer, SingleConsumer} import monix.execution.BufferCapacity.Bounded val channel = ConcurrentChannel[IO].withConfig[Int, Int]( producerType = SingleProducer ) val consumerConfig = ConsumerF.Config( consumerType = Some(SingleConsumer) ) for { producer <- channel consumer1 = producer.consumeWithConfig(consumerConfig) consumer2 = producer.consumeWithConfig(consumerConfig) fiber1 <- consumer1.use { ref => ref.pull }.start fiber2 <- consumer2.use { ref => ref.pull }.start _ <- producer.push(1) value1 <- fiber1.join value2 <- fiber2.join } yield { (value1, value2) }
Note that in this example, even if we used SingleConsumer
as the type
passed in consumeWithConfig, we can still consume from two ConsumerF
instances at the same time, because each one gets its own internal buffer.
But you cannot have multiple workers per ConsumerF in this scenario,
because this would break the internal synchronization / visibility
guarantees.
WARNING: default is MPMC
, however any other scenario implies
a relaxation of the internal synchronization between threads.
This means that using the wrong scenario can lead to severe
concurrency bugs. If you're not sure what multi-threading scenario you
have, then just stick with the default MPMC
.
Credits
Inspired by Haskell's Control.Concurrent.ConcurrentChannel, but note that this isn't a straight port — e.g. the Monix implementation has a cleaner, non-leaky interface, is back-pressured and allows for termination (via halt), which changes its semantics significantly.
- Source
- ConcurrentChannel.scala
- Alphabetic
- By Inheritance
- ConcurrentChannel
- ChannelF
- ProducerF
- Serializable
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##(): Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def awaitConsumers(n: Int): F[Boolean]
Awaits for the specified number of consumers to be connected.
Awaits for the specified number of consumers to be connected.
This is an utility to ensure that a certain number of consumers are connected before we start emitting events.
- n
is a number indicating the number of consumers that need to be connected before the returned task completes
- returns
a task that will complete only after the required number of consumers are observed as being connected to the channel
- Definition Classes
- ConcurrentChannel → ProducerF
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- def consume: Resource[F, ConsumerF[F, E, A]]
Create a ConsumerF value that can be used to consume events from the channel.
Create a ConsumerF value that can be used to consume events from the channel.
Note in case multiple consumers are created, all of them will see the events being pushed, so a broadcasting setup is possible. Also multiple workers can consumer from the same
ConsumerF
value, to share the load.The returned value is a Resource, because a consumer can be unsubscribed from the channel, with its internal buffer being garbage collected.
- Definition Classes
- ConcurrentChannel → ChannelF
- See also
consumeWithConfig for fine tuning the internal buffer of the created consumer
- def consumeWithConfig(config: Config): Resource[F, ConsumerF[F, E, A]]
Version of consume that allows for fine tuning the underlying buffer used.
Version of consume that allows for fine tuning the underlying buffer used.
- config
is configuration for the created buffer, see ConsumerF.Config for details
- Definition Classes
- ConcurrentChannel → ChannelF
- Annotations
- @UnsafeProtocol()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def halt(e: E): F[Unit]
Stops the channel and sends a halt event to all current and future consumers.
Stops the channel and sends a halt event to all current and future consumers.
Consumers will receive a
Left(e)
event after halt is observed.Note that if multiple
halt
events happen, then only the first one will be taken into account, all otherhalt
messages are ignored.- Definition Classes
- ConcurrentChannel → ProducerF
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- def push(a: A): F[Boolean]
Publishes an event on the channel.
Publishes an event on the channel.
If the internal buffer is full, it asynchronously waits until the operation succeeds, or until the channel is halted.
If the channel has been halted (via halt), then nothing gets published, the function eventually returning a
false
value, which signals that no more values can be published on the channel.Example
import cats.implicits._ import cats.effect.Sync sealed trait Complete object Complete extends Complete def range[F[_]](from: Int, until: Int, increment: Int = 1) (channel: ConcurrentChannel[F, Complete, Int]) (implicit F: Sync[F]): F[Unit] = { if (from != until) channel.push(from).flatMap { case true => range(from + increment, until, increment)(channel) case false => F.unit // we need to stop } else // we're done, close the channel channel.halt(Complete) }
- a
is the message to publish
- returns
a boolean that is
true
if the value was pushed on the internal queue and the producer can push more values, orfalse
if the channel is halted and cannot receive any more events
- Definition Classes
- ConcurrentChannel → ProducerF
- def pushMany(seq: Iterable[A]): F[Boolean]
Publishes multiple events on the channel.
Publishes multiple events on the channel.
If the channel has been halted (via halt), then the publishing is interrupted, the function returning a
false
value signalling that the channel was halted and can no longer receive any more events.Example
import cats.implicits._ import cats.effect.Sync sealed trait Complete object Complete extends Complete def range[F[_]](from: Int, until: Int, increment: Int = 1) (channel: ConcurrentChannel[F, Complete, Int]) (implicit F: Sync[F]): F[Unit] = { channel.pushMany(Range(from, until, increment)).flatMap { case true => channel.halt(Complete) case false => F.unit // was already halted, do nothing else } }
- seq
is the sequence of messages to publish on the channel
- returns
a boolean that is
true
if all the values were pushed on the internal queue and the producer can push more values, orfalse
if the channel is halted and cannot receive any more events
- Definition Classes
- ConcurrentChannel → ProducerF
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
This is the API documentation for the Monix library.
Package Overview
monix.execution exposes lower level primitives for dealing with asynchronous execution:
Atomic
types, as alternative tojava.util.concurrent.atomic
monix.catnap exposes pure abstractions built on top of the Cats-Effect type classes:
monix.eval is for dealing with evaluation of results, thus exposing Task and Coeval.
monix.reactive exposes the
Observable
pattern:Observable
implementationsmonix.tail exposes Iterant for purely functional pull based streaming:
Batch
andBatchCursor
, the alternatives to Scala'sIterable
andIterator
respectively that we are using within Iterant's encodingYou can control evaluation with type you choose - be it Task, Coeval, cats.effect.IO or your own as long as you provide correct cats-effect or cats typeclass instance.