trait ProducerF[F[_], E, A] extends Serializable
A simple interface that models the producer side of a producer-consumer communication channel.
In a producer-consumer communication channel we've got these concerns to take care of:
- back-pressure, which is handled automatically via this interface
- halting the channel with a final event and informing all current and future consumers about it, while stopping future producers from pushing more events
The ProducerF
interface takes care of these concerns via:
- the
F[Boolean]
result, which should returntrue
for as long as the channel wasn't halted, so further events can be pushed; these tasks also block (asynchronously) when internal buffers are full, so back-pressure concerns are handled automatically - halt, being able to close the channel with a final event that will be visible to all current and future consumers
Currently implemented by ConcurrentChannel.
- Source
- ProducerF.scala
- Alphabetic
- By Inheritance
- ProducerF
- Serializable
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Abstract Value Members
- abstract def awaitConsumers(n: Int): F[Boolean]
Awaits for the specified number of consumers to be connected.
Awaits for the specified number of consumers to be connected.
This is an utility to ensure that a certain number of consumers are connected before we start emitting events.
- n
is a number indicating the number of consumers that need to be connected before the returned task completes
- returns
a task that will complete only after the required number of consumers are observed as being connected to the channel
- abstract def halt(e: E): F[Unit]
Closes the communication channel with a message that will be visible to all current and future consumers.
Closes the communication channel with a message that will be visible to all current and future consumers.
Note that if multiple
halt
events happen, then only the first one will be taken into account, all otherhalt
messages are ignored. - abstract def push(a: A): F[Boolean]
Publishes an event on the channel.
Publishes an event on the channel.
Contract:
- in case the internal buffers are full, back-pressures until there's enough space for pushing the message, or until the channel was halted, whichever comes first
- returns
true
in case the message was pushed in the internal buffer orfalse
in case the channel was halted and cannot receive any more events, in which case the producer's loop should terminate
Example:
import cats.implicits._ import cats.effect.Async def range[F[_]](channel: ProducerF[F, Int, Int], from: Int, until: Int) (implicit F: Async[F]): F[Unit] = { if (from < until) { if (from + 1 < until) channel.push(from).flatMap { case true => // keep going range(channel, from + 1, until) case false => // channel was halted by another producer loop, so stopping F.unit } else // we are done, publish the final event channel.halt(from + 1).as(()) } else { F.unit // invalid range } }
- a
is the message to publish
- returns
true
in case the message was published successfully, orfalse
in case the channel was halted by another producer
- abstract def pushMany(seq: Iterable[A]): F[Boolean]
Publishes multiple events on the channel.
Publishes multiple events on the channel.
Contract:
- in case the internal buffers are full, back-pressures until there's enough space and the whole sequence was published, or until the channel was halted, in which case no further messages are allowed for being pushed
- returns
true
in case the whole sequence was pushed in the internal b orfalse
in case the channel was halted and cannot receive any more events, in which case the producer's loop should terminate
Note: implementations may try to push events one by one. This is not an atomic operation. In case of concurrent producers, there's absolutely no guarantee for the order of messages coming from multiple producers. Also in case the channel is halted (and the resulting task returns
false
), the outcome can be that the sequence gets published partially.import cats.implicits._ import cats.effect.Async def range[F[_]](channel: ProducerF[F, Int, Int], from: Int, until: Int) (implicit F: Async[F]): F[Unit] = { if (from < until) { val to = until - 1 channel.pushMany(Range(from, to)).flatMap { case true => channel.halt(to).as(()) // final event case false => // channel was halted by a concurrent producer, so stop F.unit } } else { F.unit // invalid range } }
- seq
is the sequence of messages to publish on the channel
- returns
true
in case the message was published successfully, orfalse
in case the channel was halted by another producer
Concrete Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native() @HotSpotIntrinsicCandidate()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
This is the API documentation for the Monix library.
Package Overview
monix.execution exposes lower level primitives for dealing with asynchronous execution:
Atomic
types, as alternative tojava.util.concurrent.atomic
monix.catnap exposes pure abstractions built on top of the Cats-Effect type classes:
monix.eval is for dealing with evaluation of results, thus exposing Task and Coeval.
monix.reactive exposes the
Observable
pattern:Observable
implementationsmonix.tail exposes Iterant for purely functional pull based streaming:
Batch
andBatchCursor
, the alternatives to Scala'sIterable
andIterator
respectively that we are using within Iterant's encodingYou can control evaluation with type you choose - be it Task, Coeval, cats.effect.IO or your own as long as you provide correct cats-effect or cats typeclass instance.