Wraps a SynchronousObserver instance into a
org.reactivestreams.Subscriber
instance.
Wraps a SynchronousObserver instance into a
org.reactivestreams.Subscriber
instance. The resulting
subscriber respects the Reactive Streams
contract.
Given that we can guarantee a SynchronousObserver is used, then no buffering is needed and thus the implementation is very efficient.
To create an instance, SynchronousSubscriberAsReactiveSubscriber.apply must be used:
// uses the default requestCount of 128 val subscriber = SynchronousSubscriberAsReactiveSubscriber(new Observer[Int] { private[this] var sum = 0 def onNext(elem: Int) = { sum += elem Continue } def onError(ex: Throwable) = { logger.error(ex) } def onComplete() = { logger.info("Stream completed") } })
the observer instance that will get wrapped into a
org.reactivestreams.Subscriber
, along with the
used scheduler
the parameter passed to Subscription.request