Packages

t

monix.catnap

ProducerF

trait ProducerF[F[_], E, A] extends Serializable

A simple interface that models the producer side of a producer-consumer communication channel.

In a producer-consumer communication channel we've got these concerns to take care of:

  • back-pressure, which is handled automatically via this interface
  • halting the channel with a final event and informing all current and future consumers about it, while stopping future producers from pushing more events

The ProducerF interface takes care of these concerns via:

  • the F[Boolean] result, which should return true for as long as the channel wasn't halted, so further events can be pushed; these tasks also block (asynchronously) when internal buffers are full, so back-pressure concerns are handled automatically
  • halt, being able to close the channel with a final event that will be visible to all current and future consumers

Currently implemented by ConcurrentChannel.

Source
ProducerF.scala
Linear Supertypes
Serializable, AnyRef, Any
Known Subclasses
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. ProducerF
  2. Serializable
  3. AnyRef
  4. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Abstract Value Members

  1. abstract def awaitConsumers(n: Int): F[Boolean]

    Awaits for the specified number of consumers to be connected.

    Awaits for the specified number of consumers to be connected.

    This is an utility to ensure that a certain number of consumers are connected before we start emitting events.

    n

    is a number indicating the number of consumers that need to be connected before the returned task completes

    returns

    a task that will complete only after the required number of consumers are observed as being connected to the channel

  2. abstract def halt(e: E): F[Unit]

    Closes the communication channel with a message that will be visible to all current and future consumers.

    Closes the communication channel with a message that will be visible to all current and future consumers.

    Note that if multiple halt events happen, then only the first one will be taken into account, all other halt messages are ignored.

  3. abstract def push(a: A): F[Boolean]

    Publishes an event on the channel.

    Publishes an event on the channel.

    Contract:

    • in case the internal buffers are full, back-pressures until there's enough space for pushing the message, or until the channel was halted, whichever comes first
    • returns true in case the message was pushed in the internal buffer or false in case the channel was halted and cannot receive any more events, in which case the producer's loop should terminate

    Example:

    import cats.implicits._
    import cats.effect.Async
    
    def range[F[_]](channel: ProducerF[F, Int, Int], from: Int, until: Int)
      (implicit F: Async[F]): F[Unit] = {
    
      if (from < until) {
        if (from + 1 < until)
          channel.push(from).flatMap {
            case true =>
              // keep going
              range(channel, from + 1, until)
            case false =>
              // channel was halted by another producer loop, so stopping
              F.unit
          }
        else // we are done, publish the final event
          channel.halt(from + 1).as(())
      } else {
        F.unit // invalid range
      }
    }
    a

    is the message to publish

    returns

    true in case the message was published successfully, or false in case the channel was halted by another producer

  4. abstract def pushMany(seq: Iterable[A]): F[Boolean]

    Publishes multiple events on the channel.

    Publishes multiple events on the channel.

    Contract:

    • in case the internal buffers are full, back-pressures until there's enough space and the whole sequence was published, or until the channel was halted, in which case no further messages are allowed for being pushed
    • returns true in case the whole sequence was pushed in the internal b or false in case the channel was halted and cannot receive any more events, in which case the producer's loop should terminate

    Note: implementations may try to push events one by one. This is not an atomic operation. In case of concurrent producers, there's absolutely no guarantee for the order of messages coming from multiple producers. Also in case the channel is halted (and the resulting task returns false), the outcome can be that the sequence gets published partially.

    import cats.implicits._
    import cats.effect.Async
    
    def range[F[_]](channel: ProducerF[F, Int, Int], from: Int, until: Int)
      (implicit F: Async[F]): F[Unit] = {
    
      if (from < until) {
        val to = until - 1
        channel.pushMany(Range(from, to)).flatMap {
          case true =>
            channel.halt(to).as(()) // final event
          case false =>
            // channel was halted by a concurrent producer, so stop
            F.unit
        }
      } else {
        F.unit // invalid range
      }
    }
    seq

    is the sequence of messages to publish on the channel

    returns

    true in case the message was published successfully, or false in case the channel was halted by another producer

Concrete Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  5. def clone(): AnyRef
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.CloneNotSupportedException]) @native()
  6. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  7. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  8. def finalize(): Unit
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.Throwable])
  9. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  10. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native()
  11. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  12. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  13. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  14. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native()
  15. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
    AnyRef
  16. def toString(): String
    Definition Classes
    AnyRef → Any
  17. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  18. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  19. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException]) @native()

Inherited from Serializable

Inherited from AnyRef

Inherited from Any

Ungrouped