Packages

final class ConcurrentChannel[F[_], E, A] extends ProducerF[F, E, A] with ChannelF[F, E, A]

ConcurrentChannel can be used to model complex producer-consumer communication channels.

It exposes these fundamental operations:

  • push for pushing single events to consumers (producer side)
  • pushMany for pushing event sequences to consumers (producer side)
  • halt for pushing the final completion event to all consumers (producer side)
  • consume for creating a ConsumerF value that can consume the incoming events from the channel

Example

import cats.implicits._
import cats.effect._
import monix.execution.Scheduler.global

// For being able to do IO.start
implicit val cs = SchedulerEffect.contextShift[IO](global)
// We need a `Timer` for this to work
implicit val timer = SchedulerEffect.timer[IO](global)

// Completion event
sealed trait Complete
object Complete extends Complete

def logLines(consumer: ConsumerF[IO, Complete, String], index: Int): IO[Unit] =
  consumer.pull.flatMap {
    case Right(message) =>
      IO(println("Worker $$index: $$message"))
        // continue loop
        .flatMap(_ => logLines(consumer, index))
    case Left(Complete) =>
      IO(println("Worker $$index is done!"))
  }

for {
  channel <- ConcurrentChannel[IO].of[Complete, String]
  // Workers 1 & 2, sharing the load between them
  task_1_2 = channel.consume.use { ref =>
    (logLines(ref, 1), logLines(ref, 2)).parSequence_
  }
  consumers_1_2 <- task_1_2.start // fiber
  // Workers 3 & 4, receiving the same events as workers 1 & 2,
  // but sharing the load between them
  task_3_4 = channel.consume.use { ref =>
    (logLines(ref, 3), logLines(ref, 4)).parSequence_
  }
  consumers_3_4 <- task_3_4.start // fiber
  // Pushing some samples
  _ <- channel.push("Hello, ")
  _ <- channel.push("World!")
  // Signal there are no more events
  _ <- channel.halt(Complete)
  // Await for the completion of the consumers
  _ <- consumers_1_2.join
  _ <- consumers_3_4.join
} yield ()

Unicasting vs Broadcasting vs Multicasting

Unicasting: A communication channel between one producer and one ConsumerF. Multiple workers can share the load of processing incoming events. For example in case we want to have 8 workers running in parallel, you can create one ConsumerF, via consume and then use it for multiple workers. Internally this setup uses a single queue and whatever workers you have will share it.

Broadcasting: the same events can be sent to multiple consumers, thus duplicating the load, as a broadcasting setup can be created by creating and consuming from multiple ConsumerF via multiple calls to consume. Internally each ConsumerF gets its own queue and hence messages are duplicated.

Multicasting: multiple producers can push events at the same time, provided the channel's type is configured as a MultiProducer.

Back-Pressuring and the Polling Model

When consumers get created via consume, a buffer gets created and assigned per consumer.

Depending on what the BufferCapacity is configured to be, the initialized consumer can work with a maximum buffer size, a size that could be rounded to a power of 2, so you can't rely on it to be precise. See consumeWithConfig for customizing this buffer on a per-consumer basis, or the ConcurrentChannel.withConfig builder for setting the default used in consume.

On push, when the queue is full, the implementation back-pressures until the channel has room again in its internal buffer(s), the task being completed when the value was pushed successfully. Similarly ConsumerF.pull (returned by consume) awaits the channel to have items in it. This works for both bounded and unbounded channels.

For both push and pull, in case awaiting a result happens, the implementation does so asynchronously, without any threads being blocked.

Multi-threading Scenario

This channel supports the fine-tuning of the concurrency scenario via ChannelType.ProducerSide (see ConcurrentChannel.withConfig) and the ChannelType.ConsumerSide that can be specified per consumer (see consumeWithConfig).

The default is set to MultiProducer and MultiConsumer, which is always the safe choice, however these can be customized for better performance.

These scenarios are available:

  • MPMC: multi-producer, multi-consumer, when MultiProducer is selected on the channel's creation and MultiConsumer is selected when consuming; this is the safe scenario and should be used as the default, especially when in doubt
  • MPSC: multi-producer, single-consumer, when MultiProducer is selected on the channel's creation and SingleConsumer is selected when consuming; this scenario should be selected when there are multiple producers, but a single worker that consumes data sequentially (per ConsumerF); note that this means a single worker per ConsumerF instance, but you can still have multiple ConsumerF instances created, , because each ConsumerF gets its own buffer anyway
  • SPMC: single-producer, multi-consumer, when SingleProducer is selected on the channel's creation and MultiConsumer is selected when consuming; this scenario should be selected when there are multiple workers processing data in parallel (e.g. pulling from the same ConsumerF), but a single producer that pushes data on the channel sequentially
  • SPSC: single-producer, single-consumer, when SingleProducer is selected on the channel's creation and SingleConsumer is selected when consuming; this scenario should be selected when there is a single producer that pushes data on the channel sequentially and a single worker per ConsumerF instance that pulls data from the channel sequentially; note you can still have multiple ConsumerF instances running in parallel, because each ConsumerF gets its own buffer anyway

The default is MPMC, because that's the safest scenario.

import cats.implicits._
import cats.effect.IO
import monix.execution.ChannelType.{SingleProducer, SingleConsumer}
import monix.execution.BufferCapacity.Bounded

val channel = ConcurrentChannel[IO].withConfig[Int, Int](
  producerType = SingleProducer
)

val consumerConfig = ConsumerF.Config(
  consumerType = Some(SingleConsumer)
)

for {
  producer  <- channel
  consumer1 =  producer.consumeWithConfig(consumerConfig)
  consumer2 =  producer.consumeWithConfig(consumerConfig)
  fiber1    <- consumer1.use { ref => ref.pull }.start
  fiber2    <- consumer2.use { ref => ref.pull }.start
  _         <- producer.push(1)
  value1    <- fiber1.join
  value2    <- fiber2.join
} yield {
  (value1, value2)
}

Note that in this example, even if we used SingleConsumer as the type passed in consumeWithConfig, we can still consume from two ConsumerF instances at the same time, because each one gets its own internal buffer. But you cannot have multiple workers per ConsumerF in this scenario, because this would break the internal synchronization / visibility guarantees.

WARNING: default is MPMC, however any other scenario implies a relaxation of the internal synchronization between threads.

This means that using the wrong scenario can lead to severe concurrency bugs. If you're not sure what multi-threading scenario you have, then just stick with the default MPMC.

Credits

Inspired by Haskell's Control.Concurrent.ConcurrentChannel, but note that this isn't a straight port — e.g. the Monix implementation has a cleaner, non-leaky interface, is back-pressured and allows for termination (via halt), which changes its semantics significantly.

Source
ConcurrentChannel.scala
Linear Supertypes
ChannelF[F, E, A], ProducerF[F, E, A], Serializable, AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. ConcurrentChannel
  2. ChannelF
  3. ProducerF
  4. Serializable
  5. AnyRef
  6. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  5. def awaitConsumers(n: Int): F[Boolean]

    Awaits for the specified number of consumers to be connected.

    Awaits for the specified number of consumers to be connected.

    This is an utility to ensure that a certain number of consumers are connected before we start emitting events.

    n

    is a number indicating the number of consumers that need to be connected before the returned task completes

    returns

    a task that will complete only after the required number of consumers are observed as being connected to the channel

    Definition Classes
    ConcurrentChannelProducerF
  6. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.CloneNotSupportedException]) @native() @HotSpotIntrinsicCandidate()
  7. def consume: Resource[F, ConsumerF[F, E, A]]

    Create a ConsumerF value that can be used to consume events from the channel.

    Create a ConsumerF value that can be used to consume events from the channel.

    Note in case multiple consumers are created, all of them will see the events being pushed, so a broadcasting setup is possible. Also multiple workers can consumer from the same ConsumerF value, to share the load.

    The returned value is a Resource, because a consumer can be unsubscribed from the channel, with its internal buffer being garbage collected.

    Definition Classes
    ConcurrentChannelChannelF
    See also

    consumeWithConfig for fine tuning the internal buffer of the created consumer

  8. def consumeWithConfig(config: Config): Resource[F, ConsumerF[F, E, A]]

    Version of consume that allows for fine tuning the underlying buffer used.

    Version of consume that allows for fine tuning the underlying buffer used.

    config

    is configuration for the created buffer, see ConsumerF.Config for details

    Definition Classes
    ConcurrentChannelChannelF
    Annotations
    @UnsafeProtocol()
  9. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  10. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  11. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  12. def halt(e: E): F[Unit]

    Stops the channel and sends a halt event to all current and future consumers.

    Stops the channel and sends a halt event to all current and future consumers.

    Consumers will receive a Left(e) event after halt is observed.

    Note that if multiple halt events happen, then only the first one will be taken into account, all other halt messages are ignored.

    Definition Classes
    ConcurrentChannelProducerF
  13. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  14. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  15. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  16. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  17. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native() @HotSpotIntrinsicCandidate()
  18. def push(a: A): F[Boolean]

    Publishes an event on the channel.

    Publishes an event on the channel.

    If the internal buffer is full, it asynchronously waits until the operation succeeds, or until the channel is halted.

    If the channel has been halted (via halt), then nothing gets published, the function eventually returning a false value, which signals that no more values can be published on the channel.

    Example

    import cats.implicits._
    import cats.effect.Sync
    
    sealed trait Complete
    object Complete extends Complete
    
    def range[F[_]](from: Int, until: Int, increment: Int = 1)
      (channel: ConcurrentChannel[F, Complete, Int])
      (implicit F: Sync[F]): F[Unit] = {
    
      if (from != until)
        channel.push(from).flatMap {
          case true =>
            range(from + increment, until, increment)(channel)
          case false =>
            F.unit // we need to stop
        }
      else // we're done, close the channel
        channel.halt(Complete)
    }
    a

    is the message to publish

    returns

    a boolean that is true if the value was pushed on the internal queue and the producer can push more values, or false if the channel is halted and cannot receive any more events

    Definition Classes
    ConcurrentChannelProducerF
  19. def pushMany(seq: Iterable[A]): F[Boolean]

    Publishes multiple events on the channel.

    Publishes multiple events on the channel.

    If the channel has been halted (via halt), then the publishing is interrupted, the function returning a false value signalling that the channel was halted and can no longer receive any more events.

    Example

    import cats.implicits._
    import cats.effect.Sync
    
    sealed trait Complete
    object Complete extends Complete
    
    def range[F[_]](from: Int, until: Int, increment: Int = 1)
      (channel: ConcurrentChannel[F, Complete, Int])
      (implicit F: Sync[F]): F[Unit] = {
    
      channel.pushMany(Range(from, until, increment)).flatMap {
        case true =>
          channel.halt(Complete)
        case false =>
          F.unit // was already halted, do nothing else
      }
    }
    seq

    is the sequence of messages to publish on the channel

    returns

    a boolean that is true if all the values were pushed on the internal queue and the producer can push more values, or false if the channel is halted and cannot receive any more events

    Definition Classes
    ConcurrentChannelProducerF
  20. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
    AnyRef
  21. def toString(): String
    Definition Classes
    AnyRef → Any
  22. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  23. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException]) @native()
  24. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])

Deprecated Value Members

  1. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.Throwable]) @Deprecated @deprecated
    Deprecated

    (Since version ) see corresponding Javadoc for more information.

Inherited from ChannelF[F, E, A]

Inherited from ProducerF[F, E, A]

Inherited from Serializable

Inherited from AnyRef

Inherited from Any

Ungrouped