final class ConcurrentQueue[F[_], A] extends Serializable
A high-performance, back-pressured, generic concurrent queue implementation.
This is the pure and generic version of monix.execution.AsyncQueue.
Example
import cats.implicits._ import cats.effect._ import monix.execution.Scheduler.global // For being able to do IO.start implicit val cs = SchedulerEffect.contextShift[IO](global) // We need a `Timer` for this to work implicit val timer = SchedulerEffect.timer[IO](global) def consumer(queue: ConcurrentQueue[IO, Int], index: Int): IO[Unit] = queue.poll.flatMap { a => println(s"Worker $$index: $$a") consumer(queue, index) } for { queue <- ConcurrentQueue[IO].bounded[Int](capacity = 32) consumer1 <- consumer(queue, 1).start consumer2 <- consumer(queue, 1).start // Pushing some samples _ <- queue.offer(1) _ <- queue.offer(2) _ <- queue.offer(3) // Stopping the consumer loops _ <- consumer1.cancel _ <- consumer2.cancel } yield ()
Back-Pressuring and the Polling Model
The initialized queue can be limited to a maximum buffer size, a size that could be rounded to a power of 2, so you can't rely on it to be precise. Such a bounded queue can be initialized via ConcurrentQueue.bounded. Also see BufferCapacity, the configuration parameter that can be passed in the ConcurrentQueue.withConfig builder.
On offer, when the queue is full, the implementation back-pressures until the queue has room again in its internal buffer, the task being completed when the value was pushed successfully. Similarly poll awaits the queue to have items in it. This works for both bounded and unbounded queues.
For both offer
and poll
, in case awaiting a result happens, the
implementation does so asynchronously, without any threads being blocked.
Multi-threading Scenario
This queue supports a ChannelType configuration, for fine tuning depending on the needed multi-threading scenario. And this can yield better performance:
- MPMC: multi-producer, multi-consumer
- MPSC: multi-producer, single-consumer
- SPMC: single-producer, multi-consumer
- SPSC: single-producer, single-consumer
The default is MPMC
, because that's the safest scenario.
import monix.execution.ChannelType.MPSC import monix.execution.BufferCapacity.Bounded val queue = ConcurrentQueue[IO].withConfig[Int]( capacity = Bounded(128), channelType = MPSC )
WARNING: default is MPMC
, however any other scenario implies
a relaxation of the internal synchronization between threads.
This means that using the wrong scenario can lead to severe
concurrency bugs. If you're not sure what multi-threading scenario you
have, then just stick with the default MPMC
.
- Source
- ConcurrentQueue.scala
- Alphabetic
- By Inheritance
- ConcurrentQueue
- Serializable
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##(): Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clear: F[Unit]
Removes all items from the queue.
Removes all items from the queue.
Called from the consumer thread, subject to the restrictions appropriate to the implementation indicated by ChannelType.
WARNING: the
clear
operation should be done on the consumer side, so it must be called from the same thread(s) that call poll. - def clone(): AnyRef
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- def drain(minLength: Int, maxLength: Int): F[Seq[A]]
Fetches multiple elements from the queue, if available.
Fetches multiple elements from the queue, if available.
This operation back-pressures until the
minLength
requirement is achieved.- minLength
specifies the minimum length of the returned sequence; the operation back-pressures until this length is satisfied
- maxLength
is the capacity of the used buffer, being the max length of the returned sequence
- returns
a future with a sequence of length between minLength and maxLength; it can also be cancelled, interrupting the wait
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def finalize(): Unit
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- def offer(a: A): F[Unit]
Pushes a value in the queue, or if the queue is full, then repeats the operation until it succeeds.
Pushes a value in the queue, or if the queue is full, then repeats the operation until it succeeds.
- returns
a task that when evaluated, will complete with a value, or wait until such a value is ready
- def offerMany(seq: Iterable[A]): F[Unit]
Pushes multiple values in the queue.
Pushes multiple values in the queue. Back-pressures if the queue is full.
- returns
a task that will eventually complete when the push has succeeded; it can also be cancelled, interrupting the waiting
- def poll: F[A]
Fetches a value from the queue, or if the queue is empty it awaits asynchronously until a value is made available.
Fetches a value from the queue, or if the queue is empty it awaits asynchronously until a value is made available.
- returns
a task that when evaluated, will eventually complete after the value has been successfully pushed in the queue
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- def tryOffer(a: A): F[Boolean]
Try pushing a value to the queue.
Try pushing a value to the queue.
The protocol is unsafe because usage of the "try*" methods imply an understanding of concurrency, or otherwise the code can be very fragile and buggy.
- a
is the value pushed in the queue
- returns
true
if the operation succeeded, orfalse
if the queue is full and cannot accept any more elements
- Annotations
- @UnsafeProtocol()
- def tryPoll: F[Option[A]]
Try pulling a value out of the queue.
Try pulling a value out of the queue.
The protocol is unsafe because usage of the "try*" methods imply an understanding of concurrency, or otherwise the code can be very fragile and buggy.
- returns
Some(a)
in case a value was successfully retrieved from the queue, orNone
in case the queue is empty
- Annotations
- @UnsafeProtocol()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
This is the API documentation for the Monix library.
Package Overview
monix.execution exposes lower level primitives for dealing with asynchronous execution:
Atomic
types, as alternative tojava.util.concurrent.atomic
monix.catnap exposes pure abstractions built on top of the Cats-Effect type classes:
monix.eval is for dealing with evaluation of results, thus exposing Task and Coeval.
monix.reactive exposes the
Observable
pattern:Observable
implementationsmonix.tail exposes Iterant for purely functional pull based streaming:
Batch
andBatchCursor
, the alternatives to Scala'sIterable
andIterator
respectively that we are using within Iterant's encodingYou can control evaluation with type you choose - be it Task, Coeval, cats.effect.IO or your own as long as you provide correct cats-effect or cats typeclass instance.