monifu.reactive.streams
Wraps a SynchronousObserver instance into a org.reactivestreams.Subscriber instance. The resulting subscriber respects the Reactive Streams contract.
org.reactivestreams.Subscriber
Given that we can guarantee a SynchronousObserver is used, then no buffering is needed and thus the implementation is very efficient.
To create an instance, SynchronousSubscriberAsReactiveSubscriber must be used:
// uses the default requestCount of 128 val subscriber = SynchronousSubscriberAsReactiveSubscriber(new Observer[Int] { private[this] var sum = 0 def onNext(elem: Int) = { sum += elem Continue } def onError(ex: Throwable) = { logger.error(ex) } def onComplete() = { logger.info("Stream completed") } })
Wraps a SynchronousObserver instance into a
org.reactivestreams.Subscriber
instance. The resulting subscriber respects the Reactive Streams contract.Given that we can guarantee a SynchronousObserver is used, then no buffering is needed and thus the implementation is very efficient.
To create an instance, SynchronousSubscriberAsReactiveSubscriber must be used: