trait ConsumerF[F[_], E, A] extends Serializable
A simple interface that models the consumer side of a producer-consumer communication channel.
Currently exposed by ConcurrentChannel.consume.
- F
is effect type used for processing tasks asynchronously
- E
is the type for the completion event
- A
is the type for the stream of events being consumed
- Source
- ConsumerF.scala
- Alphabetic
- By Inheritance
- ConsumerF
- Serializable
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Abstract Value Members
- abstract def pull: F[Either[E, A]]
Pulls one message from the communication channel, when it becomes available.
Pulls one message from the communication channel, when it becomes available.
Example:
import cats.implicits._ import cats.effect.Async def sum[F[_]](channel: ConsumerF[F, Int, Int], acc: Long = 0) (implicit F: Async[F]): F[Long] = { channel.pull.flatMap { case Left(e) => F.pure(acc + e) case Right(i) => sum(channel, acc + i) } }
- returns
either
Left(e)
, if the channel was closed with a finale
completion event, orRight(a)
, representing a message that was pulled from the channel
- abstract def pullMany(minLength: Int, maxLength: Int): F[Either[E, Seq[A]]]
Pulls a whole batch of messages from the channel, at least one, the returned sequence being no larger than the specified
maxLength
.Pulls a whole batch of messages from the channel, at least one, the returned sequence being no larger than the specified
maxLength
.import cats.implicits._ import cats.effect.Async def sum[F[_]](channel: ConsumerF[F, Int, Int], acc: Long = 0) (implicit F: Async[F]): F[Long] = { channel.pullMany(1, 16).flatMap { case Left(e) => F.pure(acc + e) case Right(seq) => sum(channel, acc + seq.sum) } }
- minLength
is the minimum size of the returned sequence; for as long as the channel isn't halted, the returned task will back-pressure until the required number of events have been collected
- maxLength
is the maximum size of the returned sequence; for fairness purposes (e.g. multiple workers consuming from the same
ConsumerF
), a smaller value is recommended, or otherwiseInt.MaxValue
can be used- returns
either
Left(e)
, if the channel was closed with a finale
completion event, orRight(seq)
, representing a non-empty sequence of messages pulled from the channel, but that is no larger thanmaxLength
This is the API documentation for the Monix library.
Package Overview
monix.execution exposes lower level primitives for dealing with asynchronous execution:
Atomic
types, as alternative tojava.util.concurrent.atomic
monix.catnap exposes pure abstractions built on top of the Cats-Effect type classes:
monix.eval is for dealing with evaluation of results, thus exposing Task and Coeval.
monix.reactive exposes the
Observable
pattern:Observable
implementationsmonix.tail exposes Iterant for purely functional pull based streaming:
Batch
andBatchCursor
, the alternatives to Scala'sIterable
andIterator
respectively that we are using within Iterant's encodingYou can control evaluation with type you choose - be it Task, Coeval, cats.effect.IO or your own as long as you provide correct cats-effect or cats typeclass instance.