Represents the acknowledgement of processing that a consumer
sends back upstream on Observer.onNext
A channel is meant for imperative style feeding of events.
Used by Observable.materialize.
The Observable interface in the Rx pattern.
The Observable interface in the Rx pattern.
An Observable is characterized by a onSubscribe
method that needs
to be implemented. In simple terms, an Observable might as well be
just a function like:
type Observable[+T] = Subscriber[T] => Unit
In other words an Observable is something that provides a
side-effecting function that can connect a Subscriber to a
stream of data. A Subscriber
is a cross between an Observer
and a Scheduler. We need a
Scheduler
when calling subscribe
because that's when the
side-effects happen and a context capable of scheduling tasks for
asynchronous execution is needed. An Observer on the other hand
is the interface implemented by consumer and that receives events
according to the Rx grammar.
On onSubscribe
, because we need the interesting operators and
the polymorphic behavior provided by OOP, the Observable is
being described as an interface that has to be implemented:
class MySampleObservable(unit: Int) extends Observable[Int] { def onSubscribe(sub: Subscriber[Int]): Unit = { implicit val s = sub.scheduler // note we must apply back-pressure // when calling `onNext` sub.onNext(unit).onComplete { case Success(Cancel) => () // do nothing case Success(Continue) => sub.onComplete() case Failure(ex) => sub.onError(ex) } } }
Of course, you don't need to inherit from this trait, as you can just use Observable.create, the following example being equivalent to the above:
Observable.create[Int] { sub => implicit val s = sub.scheduler // note we must apply back-pressure // when calling `onNext` sub.onNext(unit).onComplete { case Success(Cancel) => () // do nothing case Success(Continue) => sub.onComplete() case Failure(ex) => sub.onError(ex) } }
The above is describing how to create your own Observables, however Monifu already provides already made utilities in the Observable companion object. For example, to periodically make a request to a web service, you could do it like this:
// just some http client import play.api.libs.ws._ // triggers an auto-incremented number every second Observable.intervalAtFixedRate(1.second) .flatMap(_ => WS.request(s"http://some.endpoint.com/request").get())
As you might notice, in the above example we are doing
Observable!.flatMap on an Observable that emits Future
instances. And it works, because Monifu considers Scala's Futures
to be just a subset of Observables, see the automatic
FutureIsObservable conversion
that it defines. Or you could just use Observable.fromFuture
for explicit conversions, an Observable builder available
amongst others.
Observables must obey Monifu's contract, this is why if you get away with already built and tested observables, that would be better than implementing your own by means of inheriting the interface or by using create. The contract is this:
onSubscribe
method MUST NOT throw exceptions, any
unforeseen errors that happen in user-code must be emitted to
the observers and the streaming closedonNext* (onComplete | onError)
onNext
eventsonComplete
or onError
onNext
event must happen only after the previous
onNext
completed with a ContinueonNext
event
is signaling a CancelonComplete
and onError
must happen only after the previous
onNext
was completed with a Continue
acknowledgementonNext
event can be sent directly, since there are no
previous eventsonNext
events, then streams can be
closed with onComplete
and onError
directlyonError
happens such that its delivery
is prioritizedBack-pressure means in essence that the speed with which the data-source produces events is adjusted to the speed with which the consumer consumes.
For example, lets say we want to feed an iterator into an observer, similar to what we are doing in Observer.feed, we might build a loop like this:
/** Transforms any Iterable into an Observable */ def fromIterator[T](iterable: Iterable[T]): Observable[T] = Observable.create { sub => implicit val s = sub.scheduler loop(sub.observer, iterable.iterator).onComplete { case Success(Cancel) => () // do nothing case Success(Continue) => sub.onComplete() case Failed(ex) => reportError(sub.observer, ex) } } private def loop[T](o: Observer[T], iterator: Iterator[T]) (implicit s: Scheduler): Future[Ack] = { try { if (iterator.hasNext) { val next = iterator.next() // signaling event, applying back-pressure o.onNext(next).flatMap { case Cancel => Cancel case Continue => // signal next event (recursive, but async) loop(o, iterator) } } else { // nothing left to do, and because we are implementing // Observer.feed, the final acknowledgement is a `Continue` // assuming that the observer hasn't canceled or failed Continue } } catch { case NonFatal(ex) => reportError(o, ex) } } private def reportError[T](o: Observer[T], ex: Throwable): Cancel = try o.onError(ex) catch { case NonFatal(err) => // oops, onError failed, trying to // report it somewhere s.reportFailure(ex) s.reportFailure(err) Cancel }
There are cases in which the data-source can't be slowed down in response to the demand signaled through back-pressure. For such cases buffering is needed.
For example to "imperatively" build an Observable, we could use channels:
val channel = PublishChannel[Int](OverflowStrategy.DropNew(bufferSize = 100)) // look mum, no back-pressure concerns channel.pushNext(1) channel.pushNext(2) channel.pushNext(3) channel.pushComplete()
In Monifu a Channel is much like a Subject, meaning that it can be
used to construct observables, except that a Channel
has a buffer
attached and IS NOT an Observer
(like the Subject
is). In Monifu
(compared to Rx implementations) Subjects are subject to
back-pressure concerns as well, so they can't be used in an imperative way,
like described above, hence the need for Channels.
Or for more serious and lower level jobs, you can simply take an
Observer
and wrap it into a
BufferedSubscriber.
Channel, which are meant for imperatively building Observables without back-pressure concerns
Subject, which are both Observables and Observers
Cancelable, the type returned by higher
level subscribe
variants and that can be used to cancel subscriptions
Subscriber, the cross between an Observer and a Scheduler
Scheduler, our enhanced ExecutionContext
Observer, the interface that must be implemented by consumers
The Observer from the Rx pattern is the trio of callbacks that get subscribed to an Observable for receiving events.
The Observer from the Rx pattern is the trio of callbacks that get subscribed to an Observable for receiving events.
The events received must follow the Rx grammar, which is: onNext * (onComplete | onError)?
That means an Observer can receive zero or multiple events, the stream
ending either in one or zero onComplete
or onError
(just one, not both),
and after onComplete or onError, a well behaved Observable implementation
shouldn't send any more onNext events.
Represents the buffering overflowStrategy chosen for actions that need buffering, instructing the pipeline what to do when the buffer is full.
Represents the buffering overflowStrategy chosen for actions that need buffering, instructing the pipeline what to do when the buffer is full.
For the available policies, see:
- Unbounded - OverflowTriggering - BackPressured
Used in BufferedSubscriber to implement buffering when concurrent actions are needed, such as in Channels or in Observable.merge.
A Subject
is a sort of bridge or proxy that acts both as an
Observer and as an Observable and that must respect the contract of both.
A Subject
is a sort of bridge or proxy that acts both as an
Observer and as an Observable and that must respect the contract of both.
Because it is a Observer
, it can subscribe to an Observable
and because it is an Observable
,
it can pass through the items it observes by re-emitting them and it can also emit new items.
Useful to build multicast Observables or reusable processing pipelines.
A Subscriber
value is a named tuple of an observer and a scheduler,
whose usage is in Observable.create.
A Subscriber
value is a named tuple of an observer and a scheduler,
whose usage is in Observable.create.
An Observable.subscribe
takes as parameters both an Observer
and a Scheduler and the purpose of a
Subscriber
is convenient grouping in Observable.create
.
A Subscriber
value is basically an address that the data source needs
in order to send events.
A channel is meant for imperative style feeding of events.
When emitting events, one doesn't need to follow the back-pressure contract. On the other hand the grammar must still be respected:
(pushNext)* (pushComplete | pushError)