object Task extends TaskInstancesLevel1 with Serializable
- Alphabetic
- By Inheritance
- Task
- Serializable
- TaskInstancesLevel1
- TaskInstancesLevel0
- TaskParallelNewtype
- TaskContextShift
- TaskTimers
- TaskClocks
- Companion
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Type Members
- abstract class AsyncBuilder[CancelationToken] extends AnyRef
The
AsyncBuilder
is a type used by the Task.create builder, in order to change its behavior based on the type of the cancelation token.The
AsyncBuilder
is a type used by the Task.create builder, in order to change its behavior based on the type of the cancelation token.In combination with the Partially-Applied Type technique, this ends up providing a polymorphic Task.create that can support multiple cancelation tokens optimally, i.e. without implicit conversions and that can be optimized depending on the
CancelToken
used - for example ifUnit
is returned, then the yielded task will not be cancelable and the internal implementation will not have to worry about managing it, thus increasing performance. - implicit final class DeprecatedExtensions[+A] extends AnyVal with Extensions[A]
Deprecated operations, described as extension methods.
- final case class Options(autoCancelableRunLoops: Boolean, localContextPropagation: Boolean) extends Product with Serializable
Set of options for customizing the task's behavior.
Set of options for customizing the task's behavior.
See Task.defaultOptions for the default
Options
instance used by Task.runAsync or Task.runToFuture.- autoCancelableRunLoops
should be set to
true
in case you wantflatMap
driven loops to be auto-cancelable. Defaults totrue
.- localContextPropagation
should be set to
true
in case you want the Local variables to be propagated on async boundaries. Defaults tofalse
.
- type Par[+A] = TaskParallelNewtype.Par.Type[A]
Newtype encoding for a
Task
data type that has a cats.Applicative capable of doing parallel processing inap
andmap2
, needed for implementingcats.Parallel
.Newtype encoding for a
Task
data type that has a cats.Applicative capable of doing parallel processing inap
andmap2
, needed for implementingcats.Parallel
.Helpers are provided for converting back and forth in
Par.apply
for wrapping anyTask
value andPar.unwrap
for unwrapping.The encoding is based on the "newtypes" project by Alexander Konovalov, chosen because it's devoid of boxing issues and a good choice until opaque types will land in Scala.
- Definition Classes
- TaskParallelNewtype
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##(): Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def apply[A](a: => A): Task[A]
Lifts the given thunk in the
Task
context, processing it synchronously when the task gets evaluated.Lifts the given thunk in the
Task
context, processing it synchronously when the task gets evaluated.This is an alias for:
val thunk = () => 42 Task.eval(thunk())
WARN: behavior of
Task.apply
has changed since 3.0.0-RC2. Before the change (during Monix 2.x series), this operation was forcing a fork, being equivalent to the new Task.evalAsync.Switch to Task.evalAsync if you wish the old behavior, or combine Task.eval with Task.executeAsync.
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def async[A](register: (Callback[Throwable, A]) => Unit): Task[A]
Create a non-cancelable
Task
from an asynchronous computation, which takes the form of a function with which we can register a callback to execute upon completion.Create a non-cancelable
Task
from an asynchronous computation, which takes the form of a function with which we can register a callback to execute upon completion.This operation is the implementation for
cats.effect.Async
and is thus yielding non-cancelable tasks, being the simplified version of Task.cancelable. This can be used to translate from a callback-based API to pureTask
values that cannot be canceled.See the the documentation for cats.effect.Async.
For example, in case we wouldn't have Task.deferFuture already defined, we could do this:
import scala.concurrent.{Future, ExecutionContext} import scala.util._ def deferFuture[A](f: => Future[A])(implicit ec: ExecutionContext): Task[A] = Task.async { cb => // N.B. we could do `f.onComplete(cb)` directly ;-) f.onComplete { case Success(a) => cb.onSuccess(a) case Failure(e) => cb.onError(e) } }
Note that this function needs an explicit
ExecutionContext
in order to triggerFuture#complete
, however Monix'sTask
can inject a Scheduler for you, thus allowing you to get rid of these pesky execution contexts being passed around explicitly. See Task.async0.CONTRACT for
register
:- the provided function is executed when the
Task
will be evaluated (viarunAsync
or when its turn comes in theflatMap
chain, not before) - the injected Callback can be called at most once, either with a successful result, or with an error; calling it more than once is a contract violation
- the injected callback is thread-safe and in case it gets called multiple times it will throw a monix.execution.exceptions.CallbackCalledMultipleTimesException; also see Callback.tryOnSuccess and Callback.tryOnError
- See also
Task.async0 for a variant that also injects a Scheduler into the provided callback, useful for forking, or delaying tasks or managing async boundaries
Task.cancelable and Task.cancelable0 for creating cancelable tasks
Task.create for the builder that does it all
- the provided function is executed when the
- def async0[A](register: (Scheduler, Callback[Throwable, A]) => Unit): Task[A]
Create a non-cancelable
Task
from an asynchronous computation, which takes the form of a function with which we can register a callback to execute upon completion, a function that also injects a Scheduler for managing async boundaries.Create a non-cancelable
Task
from an asynchronous computation, which takes the form of a function with which we can register a callback to execute upon completion, a function that also injects a Scheduler for managing async boundaries.This operation is the implementation for
cats.effect.Async
and is thus yielding non-cancelable tasks, being the simplified version of Task.cancelable0. It can be used to translate from a callback-based API to pureTask
values that cannot be canceled.See the the documentation for cats.effect.Async.
For example, in case we wouldn't have Task.deferFuture already defined, we could do this:
import scala.concurrent.Future import scala.util._ def deferFuture[A](f: => Future[A]): Task[A] = Task.async0 { (scheduler, cb) => // We are being given an ExecutionContext ;-) implicit val ec = scheduler // N.B. we could do `f.onComplete(cb)` directly ;-) f.onComplete { case Success(a) => cb.onSuccess(a) case Failure(e) => cb.onError(e) } }
Note that this function doesn't need an implicit
ExecutionContext
. Compared with usage of Task.async, this function injects a Scheduler for us to use for managing async boundaries.CONTRACT for
register
:- the provided function is executed when the
Task
will be evaluated (viarunAsync
or when its turn comes in theflatMap
chain, not before) - the injected monix.execution.Callback can be called at most once, either with a successful result, or with an error; calling it more than once is a contract violation
- the injected callback is thread-safe and in case it gets called multiple times it will throw a monix.execution.exceptions.CallbackCalledMultipleTimesException; also see Callback.tryOnSuccess and Callback.tryOnError
NOTES on the naming:
async
comes fromcats.effect.Async#async
- the
0
suffix is about overloading the simpler Task.async builder
- See also
Task.async for a simpler variant that doesn't inject a
Scheduler
, in case you don't need oneTask.cancelable and Task.cancelable0 for creating cancelable tasks
Task.create for the builder that does it all
- the provided function is executed when the
- def asyncF[A](register: (Callback[Throwable, A]) => Task[Unit]): Task[A]
Suspends an asynchronous side effect in
Task
, this being a variant of async that takes a pure registration function.Suspends an asynchronous side effect in
Task
, this being a variant of async that takes a pure registration function.Implements
cats.effect.Async.asyncF
.The difference versus async is that this variant can suspend side-effects via the provided function parameter. It's more relevant in polymorphic code making use of the
cats.effect.Async
type class, as it alleviates the need forcats.effect.Effect
.Contract for the returned
Task[Unit]
in the provided function:- can be asynchronous
- can be cancelable, in which case it hooks into IO's cancelation mechanism such that the resulting task is cancelable
- it should not end in error, because the provided callback
is the only way to signal the final result and it can only
be called once, so invoking it twice would be a contract
violation; so on errors thrown in
Task
, the task can become non-terminating, with the error being printed via Scheduler.reportFailure
- See also
Task.async and Task.async0 for a simpler variants
Task.cancelable and Task.cancelable0 for creating cancelable tasks
- val cancelBoundary: Task[Unit]
Returns a cancelable boundary — a
Task
that checks for the cancellation status of the run-loop and does not allow for the bind continuation to keep executing in case cancellation happened.Returns a cancelable boundary — a
Task
that checks for the cancellation status of the run-loop and does not allow for the bind continuation to keep executing in case cancellation happened.This operation is very similar to
Task.shift
, as it can be dropped inflatMap
chains in order to make loops cancelable.Example:
import cats.syntax.all._ def fib(n: Int, a: Long, b: Long): Task[Long] = Task.suspend { if (n <= 0) Task.pure(a) else { val next = fib(n - 1, b, a + b) // Every 100-th cycle, check cancellation status if (n % 100 == 0) Task.cancelBoundary *> next else next } }
NOTE: that by default
Task
is configured to be auto-cancelable (see Task.Options), so this isn't strictly needed, unless you want to fine tune the cancelation boundaries. - def cancelable[A](register: (Callback[Throwable, A]) => CancelToken[Task]): Task[A]
Create a cancelable
Task
from an asynchronous computation that can be canceled, taking the form of a function with which we can register a callback to execute upon completion.Create a cancelable
Task
from an asynchronous computation that can be canceled, taking the form of a function with which we can register a callback to execute upon completion.This operation is the implementation for
cats.effect.Concurrent#cancelable
and is thus yielding cancelable tasks. It can be used to translate from a callback-based API to pureTask
values that can be canceled.See the the documentation for cats.effect.Concurrent.
For example, in case we wouldn't have Task.delayExecution already defined and we wanted to delay evaluation using a Java ScheduledExecutorService (no need for that because we've got Scheduler, but lets say for didactic purposes):
import java.util.concurrent.ScheduledExecutorService import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.util.control.NonFatal def delayed[A](sc: ScheduledExecutorService, timespan: FiniteDuration) (thunk: => A) (implicit ec: ExecutionContext): Task[A] = { Task.cancelable { cb => val future = sc.schedule(new Runnable { // scheduling delay def run() = ec.execute(new Runnable { // scheduling thunk execution def run() = try cb.onSuccess(thunk) catch { case NonFatal(e) => cb.onError(e) } }) }, timespan.length, timespan.unit) // Returning the cancelation token that is able to cancel the // scheduling in case the active computation hasn't finished yet Task(future.cancel(false)) } }
Note in this sample we are passing an implicit
ExecutionContext
in order to do the actual processing, theScheduledExecutorService
being in charge just of scheduling. We don't need to do that, asTask
affords to have a Scheduler injected instead via Task.cancelable0.CONTRACT for
register
:- the provided function is executed when the
Task
will be evaluated (viarunAsync
or when its turn comes in theflatMap
chain, not before) - the injected Callback can be called at most once, either with a successful result, or with an error; calling it more than once is a contract violation
- the injected callback is thread-safe and in case it gets called multiple times it will throw a monix.execution.exceptions.CallbackCalledMultipleTimesException; also see Callback.tryOnSuccess and Callback.tryOnError
- register
is a function that will be called when this
Task
is executed, receiving a callback as a parameter, a callback that the user is supposed to call in order to signal the desired outcome of thisTask
. This function also receives a Scheduler that can be used for managing asynchronous boundaries, a scheduler being nothing more than an evolvedExecutionContext
.
- See also
Task.cancelable0 for the version that also injects a Scheduler in that callback
Task.async0 and Task.async for the simpler versions of this builder that create non-cancelable tasks from callback-based APIs
Task.create for the builder that does it all
- the provided function is executed when the
- def cancelable0[A](register: (Scheduler, Callback[Throwable, A]) => CancelToken[Task]): Task[A]
Create a cancelable
Task
from an asynchronous computation, which takes the form of a function with which we can register a callback to execute upon completion, a function that also injects a Scheduler for managing async boundaries.Create a cancelable
Task
from an asynchronous computation, which takes the form of a function with which we can register a callback to execute upon completion, a function that also injects a Scheduler for managing async boundaries.This operation is the implementation for
cats.effect.Concurrent#cancelable
and is thus yielding cancelable tasks. It can be used to translate from a callback-based API to pureTask
values that can be canceled.See the the documentation for cats.effect.Concurrent.
For example, in case we wouldn't have Task.delayExecution already defined and we wanted to delay evaluation using a Java ScheduledExecutorService (no need for that because we've got Scheduler, but lets say for didactic purposes):
import java.util.concurrent.ScheduledExecutorService import scala.concurrent.duration._ import scala.util.control.NonFatal def delayed1[A](sc: ScheduledExecutorService, timespan: FiniteDuration) (thunk: => A): Task[A] = { Task.cancelable0 { (scheduler, cb) => val future = sc.schedule(new Runnable { // scheduling delay def run = scheduler.execute(new Runnable { // scheduling thunk execution def run() = try cb.onSuccess(thunk) catch { case NonFatal(e) => cb.onError(e) } }) }, timespan.length, timespan.unit) // Returning the cancel token that is able to cancel the // scheduling in case the active computation hasn't finished yet Task(future.cancel(false)) } }
As can be seen, the passed function needs to pass a Cancelable in order to specify cancelation logic.
This is a sample given for didactic purposes. Our
cancelable0
is being injected a Scheduler and it is perfectly capable of doing such delayed execution without help from Java's standard library:def delayed2[A](timespan: FiniteDuration)(thunk: => A): Task[A] = Task.cancelable0 { (scheduler, cb) => // N.B. this already returns the Cancelable that we need! val cancelable = scheduler.scheduleOnce(timespan) { try cb.onSuccess(thunk) catch { case NonFatal(e) => cb.onError(e) } } // `scheduleOnce` above returns a Cancelable, which // has to be converted into a Task[Unit] Task(cancelable.cancel()) }
CONTRACT for
register
:- the provided function is executed when the
Task
will be evaluated (viarunAsync
or when its turn comes in theflatMap
chain, not before) - the injected Callback can be called at most once, either with a successful result, or with an error; calling it more than once is a contract violation
- the injected callback is thread-safe and in case it gets called multiple times it will throw a monix.execution.exceptions.CallbackCalledMultipleTimesException; also see Callback.tryOnSuccess and Callback.tryOnError
NOTES on the naming:
cancelable
comes fromcats.effect.Concurrent#cancelable
- the
0
suffix is about overloading the simpler Task.cancelable builder
- register
is a function that will be called when this
Task
is executed, receiving a callback as a parameter, a callback that the user is supposed to call in order to signal the desired outcome of thisTask
. This function also receives a Scheduler that can be used for managing asynchronous boundaries, a scheduler being nothing more than an evolvedExecutionContext
.
- See also
Task.cancelable for the simpler variant that doesn't inject the
Scheduler
in that callbackTask.async0 and Task.async for the simpler versions of this builder that create non-cancelable tasks from callback-based APIs
Task.create for the builder that does it all
- the provided function is executed when the
- implicit def catsAsync: CatsConcurrentForTask
Global instance for
cats.effect.Async
and forcats.effect.Concurrent
.Global instance for
cats.effect.Async
and forcats.effect.Concurrent
.Implied are also
cats.CoflatMap
,cats.Applicative
,cats.Monad
,cats.MonadError
andcats.effect.Sync
.As trivia, it's named "catsAsync" and not "catsConcurrent" because it represents the
cats.effect.Async
lineage, up untilcats.effect.Effect
, which imposes extra restrictions, in our case the need for aScheduler
to be in scope (see Task.catsEffect). So by naming the lineage, not the concrete sub-type implemented, we avoid breaking compatibility whenever a new type class (that we can implement) gets added into Cats.Seek more info about Cats, the standard library for FP, at:
- Definition Classes
- TaskInstancesLevel1
- implicit def catsEffect(implicit s: Scheduler, opts: Options = Task.defaultOptions): CatsConcurrentEffectForTask
Global instance for
cats.effect.Effect
and forcats.effect.ConcurrentEffect
.Global instance for
cats.effect.Effect
and forcats.effect.ConcurrentEffect
.Implied are
cats.CoflatMap
,cats.Applicative
,cats.Monad
,cats.MonadError
,cats.effect.Sync
andcats.effect.Async
.Note this is different from Task.catsAsync because we need an implicit Scheduler in scope in order to trigger the execution of a
Task
. It's also lower priority in order to not trigger conflicts, becauseEffect <: Async
andConcurrentEffect <: Concurrent with Effect
.As trivia, it's named "catsEffect" and not "catsConcurrentEffect" because it represents the
cats.effect.Effect
lineage, as in the minimum that this value will support in the future. So by naming the lineage, not the concrete sub-type implemented, we avoid breaking compatibility whenever a new type class (that we can implement) gets added into Cats.Seek more info about Cats, the standard library for FP, at:
- s
is a Scheduler that needs to be available in scope
- Definition Classes
- TaskInstancesLevel0
- implicit def catsMonoid[A](implicit A: Monoid[A]): Monoid[Task[A]]
Given an
A
type that has acats.Monoid[A]
implementation, then this provides the evidence thatTask[A]
also has aMonoid[ Task[A] ]
implementation.Given an
A
type that has acats.Monoid[A]
implementation, then this provides the evidence thatTask[A]
also has aMonoid[ Task[A] ]
implementation.- Definition Classes
- TaskInstancesLevel1
- implicit def catsParallel: CatsParallelForTask
Global instance for
cats.Parallel
.Global instance for
cats.Parallel
.The
Parallel
type class is useful for processing things in parallel in a generic way, usable with Cats' utils and syntax:import cats.syntax.all._ import scala.concurrent.duration._ val taskA = Task.sleep(1.seconds).map(_ => "a") val taskB = Task.sleep(2.seconds).map(_ => "b") val taskC = Task.sleep(3.seconds).map(_ => "c") // Returns "abc" after 3 seconds (taskA, taskB, taskC).parMapN { (a, b, c) => a + b + c }
Seek more info about Cats, the standard library for FP, at:
- Definition Classes
- TaskInstancesLevel1
- implicit def catsSemigroup[A](implicit A: Semigroup[A]): Semigroup[Task[A]]
Given an
A
type that has acats.Semigroup[A]
implementation, then this provides the evidence thatTask[A]
also has aSemigroup[ Task[A] ]
implementation.Given an
A
type that has acats.Semigroup[A]
implementation, then this provides the evidence thatTask[A]
also has aSemigroup[ Task[A] ]
implementation.This has a lower-level priority than Task.catsMonoid in order to avoid conflicts.
- Definition Classes
- TaskInstancesLevel0
- def clock(s: Scheduler): Clock[Task]
Builds a
cats.effect.Clock
instance, given a Scheduler reference.Builds a
cats.effect.Clock
instance, given a Scheduler reference.- Definition Classes
- TaskClocks
- val clock: Clock[Task]
Default, pure, globally visible
cats.effect.Clock
implementation that defers the evaluation toTask
's default Scheduler (that's being injected in Task.runToFuture).Default, pure, globally visible
cats.effect.Clock
implementation that defers the evaluation toTask
's default Scheduler (that's being injected in Task.runToFuture).- Definition Classes
- TaskClocks
- def clone(): AnyRef
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native()
- def coeval[A](value: Coeval[A]): Task[A]
- def contextShift(s: Scheduler): ContextShift[Task]
Builds a
cats.effect.ContextShift
instance, given a Scheduler reference.Builds a
cats.effect.ContextShift
instance, given a Scheduler reference.- Definition Classes
- TaskContextShift
- implicit val contextShift: ContextShift[Task]
Default, pure, globally visible
cats.effect.ContextShift
implementation that shifts the evaluation toTask
's default Scheduler (that's being injected in Task.runToFuture).Default, pure, globally visible
cats.effect.ContextShift
implementation that shifts the evaluation toTask
's default Scheduler (that's being injected in Task.runToFuture).- Definition Classes
- TaskContextShift
- def create[A]: CreatePartiallyApplied[A]
Polymorphic
Task
builder that is able to describe asynchronous tasks depending on the type of the given callback.Polymorphic
Task
builder that is able to describe asynchronous tasks depending on the type of the given callback.Note that this function uses the Partially-Applied Type technique.
Calling
create
with a callback that returnsUnit
is equivalent with Task.async0:Task.async0(f) <-> Task.create(f)
Example:
import scala.concurrent.Future def deferFuture[A](f: => Future[A]): Task[A] = Task.create { (scheduler, cb) => f.onComplete(cb(_))(scheduler) }
We could return a Cancelable reference and thus make a cancelable task. Example:
import monix.execution.Cancelable import scala.concurrent.duration.FiniteDuration import scala.util.Try def delayResult1[A](timespan: FiniteDuration)(thunk: => A): Task[A] = Task.create { (scheduler, cb) => val c = scheduler.scheduleOnce(timespan)(cb(Try(thunk))) // We can simply return `c`, but doing this for didactic purposes! Cancelable(() => c.cancel()) }
Passed function can also return
IO[Unit]
as a task that describes a cancelation action:import cats.effect.IO def delayResult2[A](timespan: FiniteDuration)(thunk: => A): Task[A] = Task.create { (scheduler, cb) => val c = scheduler.scheduleOnce(timespan)(cb(Try(thunk))) // We can simply return `c`, but doing this for didactic purposes! IO(c.cancel()) }
Passed function can also return
Task[Unit]
as a task that describes a cancelation action, thus for anf
that can be passed to Task.cancelable0, and this equivalence holds:Task.cancelable(f) <-> Task.create(f)
def delayResult3[A](timespan: FiniteDuration)(thunk: => A): Task[A] = Task.create { (scheduler, cb) => val c = scheduler.scheduleOnce(timespan)(cb(Try(thunk))) // We can simply return `c`, but doing this for didactic purposes! Task(c.cancel()) }
Passed function can also return
Coeval[Unit]
as a task that describes a cancelation action:def delayResult4[A](timespan: FiniteDuration)(thunk: => A): Task[A] = Task.create { (scheduler, cb) => val c = scheduler.scheduleOnce(timespan)(cb(Try(thunk))) // We can simply return `c`, but doing this for didactic purposes! Coeval(c.cancel()) }
The supported types for the cancelation tokens are:
Unit
, yielding non-cancelable tasks- Cancelable, the Monix standard
- Task[Unit]
- Coeval[Unit]
cats.effect.IO[Unit]
, see IO docs
Support for more might be added in the future.
- val defaultOptions: Options
Default Options to use for Task evaluation, thus:
Default Options to use for Task evaluation, thus:
autoCancelableRunLoops
istrue
by defaultlocalContextPropagation
isfalse
by default
On top of the JVM the default can be overridden by setting the following system properties:
monix.environment.autoCancelableRunLoops
(false
,no
or0
for disabling)monix.environment.localContextPropagation
(true
,yes
or1
for enabling)
- See also
- def defer[A](fa: => Task[A]): Task[A]
Promote a non-strict value representing a Task to a Task of the same type.
- def deferAction[A](f: (Scheduler) => Task[A]): Task[A]
Defers the creation of a
Task
by using the provided function, which has the ability to inject a needed Scheduler.Defers the creation of a
Task
by using the provided function, which has the ability to inject a needed Scheduler.Example:
import scala.concurrent.duration.MILLISECONDS def measureLatency[A](source: Task[A]): Task[(A, Long)] = Task.deferAction { implicit s => // We have our Scheduler, which can inject time, we // can use it for side-effectful operations val start = s.clockRealTime(MILLISECONDS) source.map { a => val finish = s.clockRealTime(MILLISECONDS) (a, finish - start) } }
- f
is the function that's going to be called when the resulting
Task
gets evaluated
- def deferFuture[A](fa: => Future[A]): Task[A]
Promote a non-strict Scala
Future
to aTask
of the same type.Promote a non-strict Scala
Future
to aTask
of the same type.The equivalent of doing:
import scala.concurrent.Future def mkFuture = Future.successful(27) Task.defer(Task.fromFuture(mkFuture))
- def deferFutureAction[A](f: (Scheduler) => Future[A]): Task[A]
Wraps calls that generate
Future
results into Task, provided a callback with an injected Scheduler to act as the necessaryExecutionContext
.Wraps calls that generate
Future
results into Task, provided a callback with an injected Scheduler to act as the necessaryExecutionContext
.This builder helps with wrapping
Future
-enabled APIs that need an implicitExecutionContext
to work. Consider this example:import scala.concurrent.{ExecutionContext, Future} def sumFuture(list: Seq[Int])(implicit ec: ExecutionContext): Future[Int] = Future(list.sum)
We'd like to wrap this function into one that returns a lazy
Task
that evaluates this sum every time it is called, because that's how tasks work best. However in order to invoke this function anExecutionContext
is needed:def sumTask(list: Seq[Int])(implicit ec: ExecutionContext): Task[Int] = Task.deferFuture(sumFuture(list))
But this is not only superfluous, but against the best practices of using
Task
. The difference is thatTask
takes a Scheduler (inheriting fromExecutionContext
) only when runAsync happens. But withdeferFutureAction
we get to have an injectedScheduler
in the passed callback:def sumTask2(list: Seq[Int]): Task[Int] = Task.deferFutureAction { implicit scheduler => sumFuture(list) }
- f
is the function that's going to be executed when the task gets evaluated, generating the wrapped
Future
- def delay[A](a: => A): Task[A]
Alias for eval.
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def eval[A](a: => A): Task[A]
Promote a non-strict value, a thunk, to a
Task
, catching exceptions in the process.Promote a non-strict value, a thunk, to a
Task
, catching exceptions in the process.Note that since
Task
is not memoized or strict, this will recompute the value each time theTask
is executed, behaving like a function.- a
is the thunk to process on evaluation
- def evalAsync[A](a: => A): Task[A]
Lifts a non-strict value, a thunk, to a
Task
that will trigger a logical fork before evaluation.Lifts a non-strict value, a thunk, to a
Task
that will trigger a logical fork before evaluation.Like eval, but the provided
thunk
will not be evaluated immediately. Equivalence:Task.evalAsync(a) <-> Task.eval(a).executeAsync
- a
is the thunk to process on evaluation
- def evalOnce[A](a: => A): Task[A]
Promote a non-strict value to a Task that is memoized on the first evaluation, the result being then available on subsequent evaluations.
- def finalize(): Unit
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable])
- def from[F[_], A](fa: F[A])(implicit F: TaskLike[F]): Task[A]
Converts to Task from any
F[_]
for which there exists a TaskLike implementation. - def fromCancelablePromise[A](p: CancelablePromise[A]): Task[A]
Wraps a monix.execution.CancelablePromise into
Task
. - def fromConcurrentEffect[F[_], A](fa: F[A])(implicit F: ConcurrentEffect[F]): Task[A]
Builds a Task instance out of any data type that implements Concurrent and ConcurrentEffect.
Builds a Task instance out of any data type that implements Concurrent and ConcurrentEffect.
Example:
import cats.effect._ import cats.syntax.all._ import monix.execution.Scheduler.Implicits.global import scala.concurrent.duration._ implicit val timer = IO.timer(global) val io = IO.sleep(5.seconds) *> IO(println("Hello!")) // Resulting task is cancelable val task: Task[Unit] = Task.fromEffect(io)
Cancellation / finalization behavior is carried over, so the resulting task can be safely cancelled.
- F
is the
cats.effect.Effect
type class instance necessary for converting toTask
; this instance can also be acats.effect.Concurrent
, in which case the resultingTask
value is cancelable if the source is
- See also
Task.liftToConcurrent for its dual
Task.fromEffect for a version that works with simpler, non-cancelable
Async
data typesTask.from for a more generic version that works with any TaskLike data type
- def fromEffect[F[_], A](fa: F[A])(implicit F: Effect[F]): Task[A]
Builds a Task instance out of any data type that implements Async and Effect.
Builds a Task instance out of any data type that implements Async and Effect.
Example:
import cats.effect._ val io = IO(println("Hello!")) val task: Task[Unit] = Task.fromEffect(io)
WARNING: the resulting task might not carry the source's cancelation behavior if the source is cancelable! This is implicit in the usage of
Effect
.- F
is the
cats.effect.Effect
type class instance necessary for converting toTask
; this instance can also be acats.effect.Concurrent
, in which case the resultingTask
value is cancelable if the source is
- See also
Task.fromConcurrentEffect for a version that can use Concurrent for converting cancelable tasks.
Task.from for a more generic version that works with any TaskLike data type
Task.liftToAsync for its dual
- def fromEither[E, A](f: (E) => Throwable)(a: Either[E, A]): Task[A]
Builds a Task instance out of a Scala
Either
. - def fromEither[E <: Throwable, A](a: Either[E, A]): Task[A]
Builds a Task instance out of a Scala
Either
. - def fromFuture[A](f: Future[A]): Task[A]
Converts the given Scala
Future
into aTask
.Converts the given Scala
Future
into aTask
. There is an async boundary inserted at the end to guarantee that we stay on the main Scheduler.NOTE: if you want to defer the creation of the future, use in combination with defer.
- def fromFutureLike[F[_], A](tfa: Task[F[A]])(implicit F: FutureLift[Task, F]): Task[A]
Converts any Future-like data-type via monix.catnap.FutureLift.
- def fromTry[A](a: Try[A]): Task[A]
Builds a Task instance out of a Scala
Try
. - def gather[A, M[X] <: Iterable[X]](in: M[Task[A]])(implicit bf: BuildFrom[M[Task[A]], A, M[A]]): Task[M[A]]
Executes the given sequence of tasks in parallel, non-deterministically gathering their results, returning a task that will signal the sequence of results once all tasks are finished.
Executes the given sequence of tasks in parallel, non-deterministically gathering their results, returning a task that will signal the sequence of results once all tasks are finished.
This function is the nondeterministic analogue of
sequence
and should behave identically tosequence
so long as there is no interaction between the effects being gathered. However, unlikesequence
, which decides on a total order of effects, the effects in agather
are unordered with respect to each other, the tasks being execute in parallel, not in sequence.Although the effects are unordered, we ensure the order of results matches the order of the input sequence. Also see gatherUnordered for the more efficient alternative.
Example:
val tasks = List(Task(1 + 1), Task(2 + 2), Task(3 + 3)) // Yields 2, 4, 6 Task.gather(tasks)
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
- See also
gatherN for a version that limits parallelism.
- def gatherN[A](parallelism: Int)(in: Iterable[Task[A]]): Task[List[A]]
Executes the given sequence of tasks in parallel, non-deterministically gathering their results, returning a task that will signal the sequence of results once all tasks are finished.
Executes the given sequence of tasks in parallel, non-deterministically gathering their results, returning a task that will signal the sequence of results once all tasks are finished.
Implementation ensure there are at most
n
(=parallelism
parameter) tasks running concurrently and the results are returned in order.Example:
import scala.concurrent.duration._ val tasks = List( Task(1 + 1).delayExecution(1.second), Task(2 + 2).delayExecution(2.second), Task(3 + 3).delayExecution(3.second), Task(4 + 4).delayExecution(4.second) ) // Yields 2, 4, 6, 8 after around 6 seconds Task.gatherN(2)(tasks)
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
- See also
gather for a version that does not limit parallelism.
- def gatherUnordered[A](in: Iterable[Task[A]]): Task[List[A]]
Processes the given collection of tasks in parallel and nondeterministically gather the results without keeping the original ordering of the given tasks.
Processes the given collection of tasks in parallel and nondeterministically gather the results without keeping the original ordering of the given tasks.
This function is similar to gather, but neither the effects nor the results will be ordered. Useful when you don't need ordering because:
- it has non-blocking behavior (but not wait-free)
- it can be more efficient (compared with gather), but not necessarily (if you care about performance, then test)
Example:
val tasks = List(Task(1 + 1), Task(2 + 2), Task(3 + 3)) // Yields 2, 4, 6 (but order is NOT guaranteed) Task.gatherUnordered(tasks)
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
- in
is a list of tasks to execute
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def liftFrom[F[_]](implicit F: TaskLike[F]): ~>[F, Task]
Returns a
F ~> Coeval
(FunctionK
) for transforming any supported data-type into Task.Returns a
F ~> Coeval
(FunctionK
) for transforming any supported data-type into Task.Useful for
mapK
transformations, for example when working withResource
orIterant
:import cats.effect._ import monix.eval._ import java.io._ def open(file: File) = Resource[IO, InputStream](IO { val in = new FileInputStream(file) (in, IO(in.close())) }) // Lifting to a Resource of Task val res: Resource[Task, InputStream] = open(new File("sample")).mapK(Task.liftFrom[IO])
- def liftFromConcurrentEffect[F[_]](implicit F: ConcurrentEffect[F]): ~>[F, Task]
Returns a
F ~> Coeval
(FunctionK
) for transforming any supported data-type, that implementsConcurrentEffect
, into Task.Returns a
F ~> Coeval
(FunctionK
) for transforming any supported data-type, that implementsConcurrentEffect
, into Task.Useful for
mapK
transformations, for example when working withResource
orIterant
.This is the less generic liftFrom operation, supplied in order order to force the usage of ConcurrentEffect for where it matters.
- def liftFromEffect[F[_]](implicit F: Effect[F]): ~>[F, Task]
Returns a
F ~> Coeval
(FunctionK
) for transforming any supported data-type, that implementsEffect
, into Task.Returns a
F ~> Coeval
(FunctionK
) for transforming any supported data-type, that implementsEffect
, into Task.Useful for
mapK
transformations, for example when working withResource
orIterant
.This is the less generic liftFrom operation, supplied in order order to force the usage of Effect for where it matters.
- def liftTo[F[_]](implicit F: TaskLift[F]): ~>[Task, F]
Generates
cats.FunctionK
values for converting fromTask
to supporting types (for which we have a TaskLift instance).Generates
cats.FunctionK
values for converting fromTask
to supporting types (for which we have a TaskLift instance).See https://typelevel.org/cats/datatypes/functionk.html.
import cats.effect._ import monix.eval._ import java.io._ // Needed for converting from Task to something else, because we need // ConcurrentEffect[Task] capabilities, also provided by TaskApp import monix.execution.Scheduler.Implicits.global def open(file: File) = Resource[Task, InputStream](Task { val in = new FileInputStream(file) (in, Task(in.close())) }) // Lifting to a Resource of IO val res: Resource[IO, InputStream] = open(new File("sample")).mapK(Task.liftTo[IO]) // This was needed in order to process the resource // with a Task, instead of a Coeval res.use { in => IO { in.read() } }
- def liftToAsync[F[_]](implicit F: cats.effect.Async[F], eff: Effect[Task]): ~>[Task, F]
Generates
cats.FunctionK
values for converting fromTask
to supporting types (for which we have acats.effect.Async
) instance.Generates
cats.FunctionK
values for converting fromTask
to supporting types (for which we have acats.effect.Async
) instance.See https://typelevel.org/cats/datatypes/functionk.html.
Prefer to use liftTo, this alternative is provided in order to force the usage of
cats.effect.Async
, since TaskLift is lawless. - def liftToConcurrent[F[_]](implicit F: Concurrent[F], eff: ConcurrentEffect[Task]): ~>[Task, F]
Generates
cats.FunctionK
values for converting fromTask
to supporting types (for which we have acats.effect.Concurrent
) instance.Generates
cats.FunctionK
values for converting fromTask
to supporting types (for which we have acats.effect.Concurrent
) instance.See https://typelevel.org/cats/datatypes/functionk.html.
Prefer to use liftTo, this alternative is provided in order to force the usage of
cats.effect.Concurrent
, since TaskLift is lawless. - def map2[A1, A2, R](fa1: Task[A1], fa2: Task[A2])(f: (A1, A2) => R): Task[R]
Pairs 2
Task
values, applying the given mapping function.Pairs 2
Task
values, applying the given mapping function.Returns a new
Task
reference that completes with the result of mapping that function to their successful results, or in failure in case either of them fails.This is a specialized Task.sequence operation and as such the tasks are evaluated in order, one after another, the operation being described in terms of .flatMap.
val fa1 = Task(1) val fa2 = Task(2) // Yields Success(3) Task.map2(fa1, fa2) { (a, b) => a + b } // Yields Failure(e), because the second arg is a failure Task.map2(fa1, Task.raiseError[Int](new RuntimeException("boo"))) { (a, b) => a + b }
See Task.parMap2 for parallel processing.
- def map3[A1, A2, A3, R](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3])(f: (A1, A2, A3) => R): Task[R]
Pairs 3
Task
values, applying the given mapping function.Pairs 3
Task
values, applying the given mapping function.Returns a new
Task
reference that completes with the result of mapping that function to their successful results, or in failure in case either of them fails.This is a specialized Task.sequence operation and as such the tasks are evaluated in order, one after another, the operation being described in terms of .flatMap.
val fa1 = Task(1) val fa2 = Task(2) val fa3 = Task(3) // Yields Success(6) Task.map3(fa1, fa2, fa3) { (a, b, c) => a + b + c } // Yields Failure(e), because the second arg is a failure Task.map3(fa1, Task.raiseError[Int](new RuntimeException("boo")), fa3) { (a, b, c) => a + b + c }
See Task.parMap3 for parallel processing.
- def map4[A1, A2, A3, A4, R](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4])(f: (A1, A2, A3, A4) => R): Task[R]
Pairs 4
Task
values, applying the given mapping function.Pairs 4
Task
values, applying the given mapping function.Returns a new
Task
reference that completes with the result of mapping that function to their successful results, or in failure in case either of them fails.This is a specialized Task.sequence operation and as such the tasks are evaluated in order, one after another, the operation being described in terms of .flatMap.
val fa1 = Task(1) val fa2 = Task(2) val fa3 = Task(3) val fa4 = Task(4) // Yields Success(10) Task.map4(fa1, fa2, fa3, fa4) { (a, b, c, d) => a + b + c + d } // Yields Failure(e), because the second arg is a failure Task.map4(fa1, Task.raiseError[Int](new RuntimeException("boo")), fa3, fa4) { (a, b, c, d) => a + b + c + d }
See Task.parMap4 for parallel processing.
- def map5[A1, A2, A3, A4, A5, R](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4], fa5: Task[A5])(f: (A1, A2, A3, A4, A5) => R): Task[R]
Pairs 5
Task
values, applying the given mapping function.Pairs 5
Task
values, applying the given mapping function.Returns a new
Task
reference that completes with the result of mapping that function to their successful results, or in failure in case either of them fails.This is a specialized Task.sequence operation and as such the tasks are evaluated in order, one after another, the operation being described in terms of .flatMap.
val fa1 = Task(1) val fa2 = Task(2) val fa3 = Task(3) val fa4 = Task(4) val fa5 = Task(5) // Yields Success(15) Task.map5(fa1, fa2, fa3, fa4, fa5) { (a, b, c, d, e) => a + b + c + d + e } // Yields Failure(e), because the second arg is a failure Task.map5(fa1, Task.raiseError[Int](new RuntimeException("boo")), fa3, fa4, fa5) { (a, b, c, d, e) => a + b + c + d + e }
See Task.parMap5 for parallel processing.
- def map6[A1, A2, A3, A4, A5, A6, R](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4], fa5: Task[A5], fa6: Task[A6])(f: (A1, A2, A3, A4, A5, A6) => R): Task[R]
Pairs 6
Task
values, applying the given mapping function.Pairs 6
Task
values, applying the given mapping function.Returns a new
Task
reference that completes with the result of mapping that function to their successful results, or in failure in case either of them fails.This is a specialized Task.sequence operation and as such the tasks are evaluated in order, one after another, the operation being described in terms of .flatMap.
val fa1 = Task(1) val fa2 = Task(2) val fa3 = Task(3) val fa4 = Task(4) val fa5 = Task(5) val fa6 = Task(6) // Yields Success(21) Task.map6(fa1, fa2, fa3, fa4, fa5, fa6) { (a, b, c, d, e, f) => a + b + c + d + e + f } // Yields Failure(e), because the second arg is a failure Task.map6(fa1, Task.raiseError[Int](new RuntimeException("boo")), fa3, fa4, fa5, fa6) { (a, b, c, d, e, f) => a + b + c + d + e + f }
See Task.parMap6 for parallel processing.
- def mapBoth[A1, A2, R](fa1: Task[A1], fa2: Task[A2])(f: (A1, A2) => R): Task[R]
Yields a task that on evaluation will process the given tasks in parallel, then apply the given mapping function on their results.
Yields a task that on evaluation will process the given tasks in parallel, then apply the given mapping function on their results.
Example:
val task1 = Task(1 + 1) val task2 = Task(2 + 2) // Yields 6 Task.mapBoth(task1, task2)((a, b) => a + b)
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def never[A]: Task[A]
A Task instance that upon evaluation will never complete.
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
- def now[A](a: A): Task[A]
Returns a
Task
that on execution is always successful, emitting the given strict value. - def parMap2[A1, A2, R](fa1: Task[A1], fa2: Task[A2])(f: (A1, A2) => R): Task[R]
Pairs 2
Task
values, applying the given mapping function, ordering the results, but not the side effects, the evaluation being done in parallel.Pairs 2
Task
values, applying the given mapping function, ordering the results, but not the side effects, the evaluation being done in parallel.This is a specialized Task.gather operation and as such the tasks are evaluated in parallel, ordering the results. In case one of the tasks fails, then all other tasks get cancelled and the final result will be a failure.
val fa1 = Task(1) val fa2 = Task(2) // Yields Success(3) Task.parMap2(fa1, fa2) { (a, b) => a + b } // Yields Failure(e), because the second arg is a failure Task.parMap2(fa1, Task.raiseError[Int](new RuntimeException("boo"))) { (a, b) => a + b }
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
See Task.map2 for sequential processing.
- def parMap3[A1, A2, A3, R](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3])(f: (A1, A2, A3) => R): Task[R]
Pairs 3
Task
values, applying the given mapping function, ordering the results, but not the side effects, the evaluation being done in parallel.Pairs 3
Task
values, applying the given mapping function, ordering the results, but not the side effects, the evaluation being done in parallel.This is a specialized Task.gather operation and as such the tasks are evaluated in parallel, ordering the results. In case one of the tasks fails, then all other tasks get cancelled and the final result will be a failure.
val fa1 = Task(1) val fa2 = Task(2) val fa3 = Task(3) // Yields Success(6) Task.parMap3(fa1, fa2, fa3) { (a, b, c) => a + b + c } // Yields Failure(e), because the second arg is a failure Task.parMap3(fa1, Task.raiseError[Int](new RuntimeException("boo")), fa3) { (a, b, c) => a + b + c }
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
See Task.map3 for sequential processing.
- def parMap4[A1, A2, A3, A4, R](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4])(f: (A1, A2, A3, A4) => R): Task[R]
Pairs 4
Task
values, applying the given mapping function, ordering the results, but not the side effects, the evaluation being done in parallel if the tasks are async.Pairs 4
Task
values, applying the given mapping function, ordering the results, but not the side effects, the evaluation being done in parallel if the tasks are async.This is a specialized Task.gather operation and as such the tasks are evaluated in parallel, ordering the results. In case one of the tasks fails, then all other tasks get cancelled and the final result will be a failure.
val fa1 = Task(1) val fa2 = Task(2) val fa3 = Task(3) val fa4 = Task(4) // Yields Success(10) Task.parMap4(fa1, fa2, fa3, fa4) { (a, b, c, d) => a + b + c + d } // Yields Failure(e), because the second arg is a failure Task.parMap4(fa1, Task.raiseError[Int](new RuntimeException("boo")), fa3, fa4) { (a, b, c, d) => a + b + c + d }
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
See Task.map4 for sequential processing.
- def parMap5[A1, A2, A3, A4, A5, R](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4], fa5: Task[A5])(f: (A1, A2, A3, A4, A5) => R): Task[R]
Pairs 5
Task
values, applying the given mapping function, ordering the results, but not the side effects, the evaluation being done in parallel if the tasks are async.Pairs 5
Task
values, applying the given mapping function, ordering the results, but not the side effects, the evaluation being done in parallel if the tasks are async.This is a specialized Task.gather operation and as such the tasks are evaluated in parallel, ordering the results. In case one of the tasks fails, then all other tasks get cancelled and the final result will be a failure.
val fa1 = Task(1) val fa2 = Task(2) val fa3 = Task(3) val fa4 = Task(4) val fa5 = Task(5) // Yields Success(15) Task.parMap5(fa1, fa2, fa3, fa4, fa5) { (a, b, c, d, e) => a + b + c + d + e } // Yields Failure(e), because the second arg is a failure Task.parMap5(fa1, Task.raiseError[Int](new RuntimeException("boo")), fa3, fa4, fa5) { (a, b, c, d, e) => a + b + c + d + e }
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
See Task.map5 for sequential processing.
- def parMap6[A1, A2, A3, A4, A5, A6, R](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4], fa5: Task[A5], fa6: Task[A6])(f: (A1, A2, A3, A4, A5, A6) => R): Task[R]
Pairs 6
Task
values, applying the given mapping function, ordering the results, but not the side effects, the evaluation being done in parallel if the tasks are async.Pairs 6
Task
values, applying the given mapping function, ordering the results, but not the side effects, the evaluation being done in parallel if the tasks are async.This is a specialized Task.gather operation and as such the tasks are evaluated in parallel, ordering the results. In case one of the tasks fails, then all other tasks get cancelled and the final result will be a failure.
val fa1 = Task(1) val fa2 = Task(2) val fa3 = Task(3) val fa4 = Task(4) val fa5 = Task(5) val fa6 = Task(6) // Yields Success(21) Task.parMap6(fa1, fa2, fa3, fa4, fa5, fa6) { (a, b, c, d, e, f) => a + b + c + d + e + f } // Yields Failure(e), because the second arg is a failure Task.parMap6(fa1, Task.raiseError[Int](new RuntimeException("boo")), fa3, fa4, fa5, fa6) { (a, b, c, d, e, f) => a + b + c + d + e + f }
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
See Task.map6 for sequential processing.
- def parZip2[A1, A2, R](fa1: Task[A1], fa2: Task[A2]): Task[(A1, A2)]
- def parZip3[A1, A2, A3](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3]): Task[(A1, A2, A3)]
- def parZip4[A1, A2, A3, A4](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4]): Task[(A1, A2, A3, A4)]
- def parZip5[A1, A2, A3, A4, A5](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4], fa5: Task[A5]): Task[(A1, A2, A3, A4, A5)]
- def parZip6[A1, A2, A3, A4, A5, A6](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4], fa5: Task[A5], fa6: Task[A6]): Task[(A1, A2, A3, A4, A5, A6)]
- def pure[A](a: A): Task[A]
Lifts a value into the task context.
Lifts a value into the task context. Alias for now.
- def race[A, B](fa: Task[A], fb: Task[B]): Task[Either[A, B]]
Run two
Task
actions concurrently, and return the first to finish, either in success or error.Run two
Task
actions concurrently, and return the first to finish, either in success or error. The loser of the race is cancelled.The two tasks are executed in parallel, the winner being the first that signals a result.
As an example, this would be equivalent with Task.timeout:
import scala.concurrent.duration._ import scala.concurrent.TimeoutException // some long running task val myTask = Task(42) val timeoutError = Task .raiseError(new TimeoutException) .delayExecution(5.seconds) Task.race(myTask, timeoutError)
Similarly Task.timeoutTo is expressed in terms of
race
.NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
- def raceMany[A](tasks: Iterable[Task[A]]): Task[A]
Runs multiple
Task
actions concurrently, returning the first to finish, either in success or error.Runs multiple
Task
actions concurrently, returning the first to finish, either in success or error. All losers of the race get cancelled.The tasks get executed in parallel, the winner being the first that signals a result.
import scala.concurrent.duration._ val list: List[Task[Int]] = List(1, 2, 3).map(i => Task.sleep(i.seconds).map(_ => i)) val winner: Task[Int] = Task.raceMany(list)
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
- def racePair[A, B](fa: Task[A], fb: Task[B]): Task[Either[(A, Fiber[B]), (Fiber[A], B)]]
Run two
Task
actions concurrently, and returns a pair containing both the winner's successful value and the loser represented as a still-unfinished task.Run two
Task
actions concurrently, and returns a pair containing both the winner's successful value and the loser represented as a still-unfinished task.If the first task completes in error, then the result will complete in error, the other task being cancelled.
On usage the user has the option of cancelling the losing task, this being equivalent with plain race:
import scala.concurrent.duration._ val ta = Task.sleep(2.seconds).map(_ => "a") val tb = Task.sleep(3.seconds).map(_ => "b") // `tb` is going to be cancelled as it returns 1 second after `ta` Task.racePair(ta, tb).flatMap { case Left((a, taskB)) => taskB.cancel.map(_ => a) case Right((taskA, b)) => taskA.cancel.map(_ => b) }
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
- def raiseError[A](ex: Throwable): Task[A]
Returns a task that on execution is always finishing in error emitting the specified exception.
- val readOptions: Task[Options]
Returns the current Task.Options configuration, which determine the task's run-loop behavior.
Returns the current Task.Options configuration, which determine the task's run-loop behavior.
- See also
- def sequence[A, M[X] <: Iterable[X]](in: M[Task[A]])(implicit bf: BuildFrom[M[Task[A]], A, M[A]]): Task[M[A]]
Given a
Iterable
of tasks, transforms it to a task signaling the collection, executing the tasks one by one and gathering their results in the same collection.Given a
Iterable
of tasks, transforms it to a task signaling the collection, executing the tasks one by one and gathering their results in the same collection.This operation will execute the tasks one by one, in order, which means that both effects and results will be ordered. See gather and gatherUnordered for unordered results or effects, and thus potential of running in parallel.
It's a simple version of traverse.
- def shift(ec: ExecutionContext): Task[Unit]
Asynchronous boundary described as an effectful
Task
that can be used inflatMap
chains to "shift" the continuation of the run-loop to another call stack or thread, managed by the given execution context.Asynchronous boundary described as an effectful
Task
that can be used inflatMap
chains to "shift" the continuation of the run-loop to another call stack or thread, managed by the given execution context.This is the equivalent of
IO.shift
.For example we can introduce an asynchronous boundary in the
flatMap
chain before a certain task, this being literally the implementation of executeAsync:val task = Task.eval(35) Task.shift.flatMap(_ => task)
And this can also be described with
*>
from Cats:import cats.syntax.all._ Task.shift *> task
Or we can specify an asynchronous boundary after the evaluation of a certain task, this being literally the implementation of .asyncBoundary:
task.flatMap(a => Task.shift.map(_ => a))
And again we can also describe this with
<*
from Cats:task <* Task.shift
- val shift: Task[Unit]
Asynchronous boundary described as an effectful
Task
that can be used inflatMap
chains to "shift" the continuation of the run-loop to another thread or call stack, managed by the default Scheduler.Asynchronous boundary described as an effectful
Task
that can be used inflatMap
chains to "shift" the continuation of the run-loop to another thread or call stack, managed by the default Scheduler.This is the equivalent of
IO.shift
, except that Monix'sTask
gets executed with an injectedScheduler
in Task.runAsync or in Task.runToFuture and that's going to be theScheduler
responsible for the "shift".For example we can introduce an asynchronous boundary in the
flatMap
chain before a certain task, this being literally the implementation of executeAsync:val task = Task.eval(35) Task.shift.flatMap(_ => task)
And this can also be described with
*>
from Cats:import cats.syntax.all._ Task.shift *> task
Or we can specify an asynchronous boundary after the evaluation of a certain task, this being literally the implementation of .asyncBoundary:
task.flatMap(a => Task.shift.map(_ => a))
And again we can also describe this with
<*
from Cats:task <* Task.shift
- def sleep(timespan: FiniteDuration): Task[Unit]
Creates a new
Task
that will sleep for the given duration, emitting a tick when that time span is over.Creates a new
Task
that will sleep for the given duration, emitting a tick when that time span is over.As an example on evaluation this will print "Hello!" after 3 seconds:
import scala.concurrent.duration._ Task.sleep(3.seconds).flatMap { _ => Task.eval(println("Hello!")) }
See Task.delayExecution for this operation described as a method on
Task
references or Task.delayResult for the helper that triggers the evaluation of the source on time, but then delays the result. - def suspend[A](fa: => Task[A]): Task[A]
Alias for defer.
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def tailRecM[A, B](a: A)(f: (A) => Task[Either[A, B]]): Task[B]
Keeps calling
f
until it returns aRight
result.Keeps calling
f
until it returns aRight
result.Based on Phil Freeman's Stack Safety for Free.
- def timer(s: Scheduler): Timer[Task]
Builds a
cats.effect.Timer
instance, given a Scheduler reference.Builds a
cats.effect.Timer
instance, given a Scheduler reference.- Definition Classes
- TaskTimers
- implicit val timer: Timer[Task]
Default, pure, globally visible
cats.effect.Timer
implementation that defers the evaluation toTask
's default Scheduler (that's being injected in Task.runToFuture).Default, pure, globally visible
cats.effect.Timer
implementation that defers the evaluation toTask
's default Scheduler (that's being injected in Task.runToFuture).- Definition Classes
- TaskTimers
- def toString(): String
- Definition Classes
- AnyRef → Any
- def traverse[A, B, M[X] <: Iterable[X]](in: M[A])(f: (A) => Task[B])(implicit bf: BuildFrom[M[A], B, M[B]]): Task[M[B]]
Given a
Iterable[A]
and a functionA => Task[B]
, sequentially apply the function to each element of the collection and gather their results in the same collection.Given a
Iterable[A]
and a functionA => Task[B]
, sequentially apply the function to each element of the collection and gather their results in the same collection.It's a generalized version of sequence.
- val unit: Task[Unit]
A
Task[Unit]
provided for convenience. - final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- def wander[A, B, M[X] <: Iterable[X]](in: M[A])(f: (A) => Task[B])(implicit bf: BuildFrom[M[A], B, M[B]]): Task[M[B]]
Given a
Iterable[A]
and a functionA => Task[B]
, nondeterministically apply the function to each element of the collection and return a task that will signal a collection of the results once all tasks are finished.Given a
Iterable[A]
and a functionA => Task[B]
, nondeterministically apply the function to each element of the collection and return a task that will signal a collection of the results once all tasks are finished.This function is the nondeterministic analogue of
traverse
and should behave identically totraverse
so long as there is no interaction between the effects being gathered. However, unliketraverse
, which decides on a total order of effects, the effects in awander
are unordered with respect to each other.Although the effects are unordered, we ensure the order of results matches the order of the input sequence. Also see wanderUnordered for the more efficient alternative.
It's a generalized version of gather.
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
- See also
wanderN for a version that limits parallelism.
- def wanderN[A, B](parallelism: Int)(in: Iterable[A])(f: (A) => Task[B]): Task[List[B]]
Given a
Iterable[A]
and a functionA => Task[B]
, nondeterministically apply the function to each element of the collection and return a task that will signal a collection of the results once all tasks are finished.Given a
Iterable[A]
and a functionA => Task[B]
, nondeterministically apply the function to each element of the collection and return a task that will signal a collection of the results once all tasks are finished.Implementation ensure there are at most
n
(=parallelism
parameter) tasks running concurrently and the results are returned in order.Example:
import scala.concurrent.duration._ val numbers = List(1, 2, 3, 4) // Yields 2, 4, 6, 8 after around 6 seconds Task.wanderN(2)(numbers)(n => Task(n + n).delayExecution(n.second))
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
- See also
wander for a version that does not limit parallelism.
- def wanderUnordered[A, B, M[X] <: Iterable[X]](in: M[A])(f: (A) => Task[B]): Task[List[B]]
Given a
Iterable[A]
and a functionA => Task[B]
, nondeterministically apply the function to each element of the collection without keeping the original ordering of the results.Given a
Iterable[A]
and a functionA => Task[B]
, nondeterministically apply the function to each element of the collection without keeping the original ordering of the results.This function is similar to wander, but neither the effects nor the results will be ordered. Useful when you don't need ordering because:
- it has non-blocking behavior (but not wait-free)
- it can be more efficient (compared with wander), but not necessarily (if you care about performance, then test)
It's a generalized version of gatherUnordered.
ADVICE: In a real life scenario the tasks should be expensive in order to warrant parallel execution. Parallelism doesn't magically speed up the code - it's usually fine for I/O-bound tasks, however for CPU-bound tasks it can make things worse. Performance improvements need to be verified.
NOTE: the tasks get forked automatically so there's no need to force asynchronous execution for immediate tasks, parallelism being guaranteed when multi-threading is available!
All specified tasks get evaluated in parallel, regardless of their execution model (Task.eval vs Task.evalAsync doesn't matter). Also the implementation tries to be smart about detecting forked tasks so it can eliminate extraneous forks for the very obvious cases.
- object AsyncBuilder extends AsyncBuilder0
- object Par extends Newtype1[Task]
Newtype encoding, see the Task.Par type alias for more details.
Newtype encoding, see the Task.Par type alias for more details.
- Definition Classes
- TaskParallelNewtype
Deprecated Value Members
- def fork[A](fa: Task[A], s: Scheduler): Task[A]
DEPRECATED — please use .executeOn.
DEPRECATED — please use .executeOn.
The reason for the deprecation is the repurposing of the word "fork".
- Definition Classes
- Companion
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Please use Task!.executeOn
- def fork[A](fa: Task[A]): Task[A]
DEPRECATED — please use .executeAsync.
DEPRECATED — please use .executeAsync.
The reason for the deprecation is the repurposing of the word "fork".
- Definition Classes
- Companion
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Please use Task!.executeAsync
- def fromEval[A](a: cats.Eval[A]): Task[A]
DEPRECATED — please use Task.from.
DEPRECATED — please use Task.from.
- Definition Classes
- Companion
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Please use Task.from
- def fromIO[A](ioa: IO[A]): Task[A]
DEPRECATED — please use Task.from.
DEPRECATED — please use Task.from.
- Definition Classes
- Companion
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0) Please use Task.from
- def zip2[A1, A2, R](fa1: Task[A1], fa2: Task[A2]): Task[(A1, A2)]
DEPRECATED — renamed to Task.parZip2.
DEPRECATED — renamed to Task.parZip2.
- Definition Classes
- Companion
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0-RC2) Renamed to Task.parZip2
- def zip3[A1, A2, A3](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3]): Task[(A1, A2, A3)]
DEPRECATED — renamed to Task.parZip3.
DEPRECATED — renamed to Task.parZip3.
- Definition Classes
- Companion
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0-RC2) Renamed to Task.parZip3
- def zip4[A1, A2, A3, A4](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4]): Task[(A1, A2, A3, A4)]
DEPRECATED — renamed to Task.parZip4.
DEPRECATED — renamed to Task.parZip4.
- Definition Classes
- Companion
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0-RC2) Renamed to Task.parZip4
- def zip5[A1, A2, A3, A4, A5](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4], fa5: Task[A5]): Task[(A1, A2, A3, A4, A5)]
DEPRECATED — renamed to Task.parZip5.
DEPRECATED — renamed to Task.parZip5.
- Definition Classes
- Companion
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0-RC2) Renamed to Task.parZip5
- def zip6[A1, A2, A3, A4, A5, A6](fa1: Task[A1], fa2: Task[A2], fa3: Task[A3], fa4: Task[A4], fa5: Task[A5], fa6: Task[A6]): Task[(A1, A2, A3, A4, A5, A6)]
DEPRECATED — renamed to Task.parZip6.
DEPRECATED — renamed to Task.parZip6.
- Definition Classes
- Companion
- Annotations
- @deprecated
- Deprecated
(Since version 3.0.0-RC2) Renamed to Task.parZip6
This is the API documentation for the Monix library.
Package Overview
monix.execution exposes lower level primitives for dealing with asynchronous execution:
Atomic
types, as alternative tojava.util.concurrent.atomic
monix.catnap exposes pure abstractions built on top of the Cats-Effect type classes:
monix.eval is for dealing with evaluation of results, thus exposing Task and Coeval.
monix.reactive exposes the
Observable
pattern:Observable
implementationsmonix.tail exposes Iterant for purely functional pull based streaming:
Batch
andBatchCursor
, the alternatives to Scala'sIterable
andIterator
respectively that we are using within Iterant's encodingYou can control evaluation with type you choose - be it Task, Coeval, cats.effect.IO or your own as long as you provide correct cats-effect or cats typeclass instance.