Package

monifu.reactive

streams

Permalink

package streams

Visibility
  1. Public
  2. All

Type Members

  1. final class ReactiveSubscriberAsMonifuSubscriber[T] extends Subscriber[T]

    Permalink

    Wraps a org.reactivestreams.Subscriber instance that respects the Reactive Streams contract into an Observer instance that respect the Observer contract.

  2. final class SingleAssignmentSubscription extends Subscription

    Permalink

    Represents a org.reactivestreams.Subscription that can be assigned only once to another subscription reference.

    Represents a org.reactivestreams.Subscription that can be assigned only once to another subscription reference.

    If the assignment happens after this subscription has been canceled, then on assignment the reference will get canceled too. If the assignment after request(n) has been called on this subscription, then request(n) will get called immediately on the assigned reference as well.

    Useful in case you need a thread-safe forward reference.

  3. final class SubscriberAsReactiveSubscriber[T] extends org.reactivestreams.Subscriber[T]

    Permalink

    Wraps a Observer instance into an org.reactivestreams.Subscriber instance.

    Wraps a Observer instance into an org.reactivestreams.Subscriber instance. The resulting subscriber respects the Reactive Streams contract.

    Given that when emitting Observer.onNext calls, the call may pass asynchronous boundaries, the emitted events need to be buffered. The requestCount constructor parameter also represents the buffer size.

    To create an instance, SubscriberAsReactiveSubscriber must be used:

    // uses the default requestCount of 128
    val subscriber = SubscriberAsReactiveSubscriber(new Observer[Int] {
      private[this] var sum = 0
    
      def onNext(elem: Int) = {
        sum += elem
        Continue
      }
    
      def onError(ex: Throwable) = {
        logger.error(ex)
      }
    
      def onComplete() = {
        logger.info("Stream completed")
      }
    })
  4. final class SynchronousSubscriberAsReactiveSubscriber[T] extends org.reactivestreams.Subscriber[T]

    Permalink

    Wraps a SynchronousObserver instance into a org.reactivestreams.Subscriber instance.

    Wraps a SynchronousObserver instance into a org.reactivestreams.Subscriber instance. The resulting subscriber respects the Reactive Streams contract.

    Given that we can guarantee a SynchronousObserver is used, then no buffering is needed and thus the implementation is very efficient.

    To create an instance, SynchronousSubscriberAsReactiveSubscriber must be used:

    // uses the default requestCount of 128
    val subscriber = SynchronousSubscriberAsReactiveSubscriber(new Observer[Int] {
      private[this] var sum = 0
    
      def onNext(elem: Int) = {
        sum += elem
        Continue
      }
    
      def onError(ex: Throwable) = {
        logger.error(ex)
      }
    
      def onComplete() = {
        logger.info("Stream completed")
      }
    })

Ungrouped