Wraps a Observer instance into a
org.reactivestreams.Subscriber
instance.
Wraps a Observer instance into a
org.reactivestreams.Subscriber
instance. The resulting
subscriber respects the Reactive Streams
contract.
Given that when emitting Observer.onNext calls,
the call may pass asynchronous boundaries, the emitted events need to be buffered.
The requestCount
constructor parameter also represents the buffer size.
To create an instance, SubscriberAsReactiveSubscriber.apply must be used:
// uses the default requestCount of 128 val subscriber = SubscriberAsReactiveSubscriber(new Observer[Int] { private[this] var sum = 0 def onNext(elem: Int) = { sum += elem Continue } def onError(ex: Throwable) = { logger.error(ex) } def onComplete() = { logger.info("Stream completed") } })
the subscriber instance that will get wrapped into a
org.reactivestreams.Subscriber
the parameter passed to each Subscription.request
call,
also representing the buffer size; MUST BE strictly positive