package schedulers
- Alphabetic
- Public
- Protected
Type Members
- final class AsyncScheduler extends ReferenceScheduler with BatchingScheduler
An
AsyncScheduler
schedules tasks to happen in the future with the givenScheduledExecutorService
and the tasks themselves are executed on the givenExecutionContext
. - trait BatchingScheduler extends Scheduler
Adds trampoline execution capabilities to schedulers, when inherited.
Adds trampoline execution capabilities to schedulers, when inherited.
When it receives TrampolinedRunnable instances, it switches to a trampolined mode where all incoming TrampolinedRunnable are executed on the current thread.
This is useful for light-weight callbacks. The idea is borrowed from the implementation of
scala.concurrent.Future
. Currently used as an optimization byTask
in processing its internal callbacks. - final class CanBlock extends AnyRef
Marker for blocking operations that need to be disallowed on top of JavaScript engines, or other platforms that don't support the blocking of threads.
Marker for blocking operations that need to be disallowed on top of JavaScript engines, or other platforms that don't support the blocking of threads.
As sample, lets implement a low-level blocking operation; but kids, don't do this at home, since this is error prone and you already have Scala's
Await.result
, this sample being shown for pedagogical purposes:import monix.execution.schedulers.CanBlock import java.util.concurrent.CountDownLatch import scala.concurrent.{ExecutionContext, Future} import scala.util.Try def block[A](fa: Future[A]) (implicit ec: ExecutionContext, permit: CanBlock): Try[A] = { var result = Option.empty[Try[A]] val latch = new CountDownLatch(1) fa.onComplete { r => result = r latch.countDown() } latch.await() result.get }
And then for JavaScript engines (Scala.js) you could describe the same function, with the same signature, but without any implementation, since this operation isn't supported:
def block[A](fa: Future[A]) (implicit ec: ExecutionContext, permit: CanBlock): Try[A] = throw new UnsupportedOperationException("Cannot block threads on top of JavaScript")
Now in usage, when the caller is invoking
block
as described, it will work without issues on top of the JVM, but when compiled with Scala.js it will trigger a message like this:[error] Playground.scala:30:8: Blocking operations aren't supported [error] on top of JavaScript, because it cannot block threads! [error] Please use asynchronous API calls. [error] block(Future(1)) [error] ^
- Annotations
- @implicitNotFound("For blocking operations on the JVM, there should be an implicit " +
"available by default, or import monix.execution.schedulers.CanBlock.permit.")
- abstract class ExecutorScheduler extends SchedulerService with ReferenceScheduler with BatchingScheduler
An ExecutorScheduler is a class for building a SchedulerService out of a Java
ExecutorService
. - trait ReferenceScheduler extends Scheduler
Helper for building a Scheduler.
Helper for building a Scheduler.
You can inherit from this class and provided a correct scheduleOnce you'll get Scheduler.scheduleWithFixedDelay(initialDelay:Long* and Scheduler.scheduleAtFixedRate(initialDelay:Long* for free.
- trait SchedulerService extends Scheduler
A Scheduler type that provides methods for managing termination.
A Scheduler type that provides methods for managing termination.
A
SchedulerService
can be shut down, which will cause it to reject new tasks. Theshutdown
method allows previously submitted tasks to execute before terminating. TheawaitTermination
method allows waiting on all active tasks to finish.Upon termination, an executor has no tasks actively executing, no tasks awaiting execution, and no new tasks can be submitted. An unused
SchedulerService
should be shut down to allow reclamation of its resources. - final class ShiftedRunnable extends Runnable
Runnable that defers the execution of the given reference with an
executeAsync
.Runnable that defers the execution of the given reference with an
executeAsync
.This is useful for example when implementing
scheduleOnce
, to introduce a boundary between the scheduling and the execution, otherwise risk executing the runnable on the wrong thread-pool. - final case class StartAsyncBatchRunnable(start: TrampolinedRunnable, s: Scheduler) extends Runnable with Serializable with Product
Forces a real asynchronous boundary before executing the given TrampolinedRunnable.
Forces a real asynchronous boundary before executing the given TrampolinedRunnable.
Sometimes you want to execute multiple TrampolinedRunnable instances as a batch, with the functionality provided by schedulers implementing BatchingScheduler, however you might need the very first execution to force an asynchronous boundary.
- start
is the TrampolinedRunnable instance that will get executed and that is supposed to trigger the execution of other trampolined runnables
- s
is the scheduler that gets used for execution.
- final class TestScheduler extends ReferenceScheduler with BatchingScheduler
Scheduler and a provider of
cats.effect.Timer
instances, that can simulate async boundaries and time passage, useful for testing purposes.Scheduler and a provider of
cats.effect.Timer
instances, that can simulate async boundaries and time passage, useful for testing purposes.Usage for simulating an
ExecutionContext
:implicit val ec = TestScheduler() ec.execute(new Runnable { def run() = println("task1") }) ex.execute(new Runnable { def run() = { println("outer") ec.execute(new Runnable { def run() = println("inner") }) } }) // Nothing executes until `tick` gets called ec.tick() // Testing the resulting state assert(ec.state.tasks.isEmpty) assert(ec.state.lastReportedError == null)
TestScheduler
can also simulate the passage of time:val ctx = TestScheduler() val f = Task(1 + 1).delayExecution(10.seconds).runAsync // This only triggers immediate execution, so nothing happens yet ctx.tick() assert(f.value == None) // Simulating the passage of 5 seconds, nothing happens yet ctx.tick(5.seconds) assert(f.value == None) // Simulating another 5 seconds, now we're done! assert(f.value == Some(Success(2)))
We are also able to build a
cats.effect.Timer
from anyScheduler
and for any data type:val ctx = TestScheduler() val timer: Timer[IO] = ctx.timer[IO]
We can now simulate time passage for
cats.effect.IO
as well:val io = timer.sleep(10.seconds) *> IO(1 + 1) val f = io.unsafeToFuture() // This invariant holds true, because our IO is async assert(f.value == None) // Not yet completed, because this does not simulate time passing: ctx.tick() assert(f.value == None) // Simulating time passing: ctx.tick(10.seconds) assert(f.value == Some(Success(2))
Simulating time makes this pretty useful for testing race conditions:
val timeoutError = new TimeoutException val timeout = Task.raiseError[Int](timeoutError) .delayExecution(10.seconds) val pair = (Task.never, timeout).parMapN(_ + _) // Not yet ctx.tick() assert(f.value == None) // Not yet ctx.tick(5.seconds) assert(f.value == None) // Good to go: ctx.tick(5.seconds) assert(f.value == Some(Failure(timeoutError)))
- final class TracingRunnable extends Runnable
Wraps a
Runnable
into one that restores the given Local.Context upon execution ofrun()
.Wraps a
Runnable
into one that restores the given Local.Context upon execution ofrun()
.Used by TracingScheduler.
- final class TracingScheduler extends Base
The
TracingScheduler
is a Scheduler implementation that wraps anotherScheduler
reference, but that propagates the Local.Context on async execution. - final class TracingSchedulerService extends Base with SchedulerService
The
TracingScheduler
is a Scheduler implementation that wraps another SchedulerService reference, with the purpose of propagating the Local.Context on async execution. - final class TrampolineExecutionContext extends ExecutionContextExecutor
A
scala.concurrentExecutionContext
implementation that executes runnables immediately, on the current thread, by means of a trampoline implementation.A
scala.concurrentExecutionContext
implementation that executes runnables immediately, on the current thread, by means of a trampoline implementation.Can be used in some cases to keep the asynchronous execution on the current thread, as an optimization, but be warned, you have to know what you're doing.
The
TrampolineExecutionContext
keeps a reference to anotherunderlying
context, to which it defers for:- reporting errors
- deferring the rest of the queue in problematic situations
Deferring the rest of the queue happens:
- in case we have a runnable throwing an exception, the rest
of the tasks get re-scheduled for execution by using
the
underlying
context - in case we have a runnable triggering a Scala
blocking
context, the rest of the tasks get re-scheduled for execution on theunderlying
context to prevent any deadlocks
Thus this implementation is compatible with the
scala.concurrent.BlockContext
, detectingblocking
blocks and reacting by forking the rest of the queue to prevent deadlocks. - final class TrampolineScheduler extends Scheduler
A Scheduler implementation that executes runnables immediately, on the current thread, by means of a trampoline implementation.
A Scheduler implementation that executes runnables immediately, on the current thread, by means of a trampoline implementation.
Can be used in some cases to keep the asynchronous execution on the current thread, as an optimization, but be warned, you have to know what you're doing.
The
TrampolineScheduler
keeps a reference to anotherunderlying
scheduler, to which it defers for:- reporting errors
- time-delayed execution
- deferring the rest of the queue in problematic situations
Deferring the rest of the queue happens:
- in case we have a runnable throwing an exception, the rest
of the tasks get re-scheduled for execution by using
the
underlying
scheduler - in case we have a runnable triggering a Scala
blocking
context, the rest of the tasks get re-scheduled for execution on theunderlying
scheduler to prevent any deadlocks
Thus this implementation is compatible with the
scala.concurrent.BlockContext
, detectingblocking
blocks and reacting by forking the rest of the queue to prevent deadlocks. - trait TrampolinedRunnable extends Runnable
A marker for callbacks that can be batched and executed locally (on the current thread) by means of a trampoline (if the execution context / scheduler allows it).
A marker for callbacks that can be batched and executed locally (on the current thread) by means of a trampoline (if the execution context / scheduler allows it).
Idea was taken from the
scala.concurrent.Future
implementation. Credit should be given where due.DO NOT use unless you know what you're doing.
- Annotations
- @FunctionalInterface()
Value Members
- object AsyncScheduler extends Serializable
- object CanBlock
- object ExecutorScheduler extends Serializable
- object ReferenceScheduler extends Serializable
- object SchedulerService extends Serializable
- object TestScheduler extends Serializable
- object TracingScheduler extends Serializable
- object TracingSchedulerService extends Serializable
- object TrampolineExecutionContext
- object TrampolineScheduler extends Serializable
This is the API documentation for the Monix library.
Package Overview
monix.execution exposes lower level primitives for dealing with asynchronous execution:
Atomic
types, as alternative tojava.util.concurrent.atomic
monix.catnap exposes pure abstractions built on top of the Cats-Effect type classes:
monix.eval is for dealing with evaluation of results, thus exposing Task and Coeval.
monix.reactive exposes the
Observable
pattern:Observable
implementationsmonix.tail exposes Iterant for purely functional pull based streaming:
Batch
andBatchCursor
, the alternatives to Scala'sIterable
andIterator
respectively that we are using within Iterant's encodingYou can control evaluation with type you choose - be it Task, Coeval, cats.effect.IO or your own as long as you provide correct cats-effect or cats typeclass instance.