Subscribe

operate upon the emissions and notifications from an Observable

The Subscribe operator is the glue that connects an observer to an Observable. In order for an observer to see the items being emitted by an Observable, or to receive error or completed notifications from the Observable, it must first subscribe to that Observable with this operator.

A typical implementation of the Subscribe operator may accept one to three methods (which then constitute the observer), or it may accept an object (sometimes called an Observer or Subscriber) that implements the interface which includes those three methods:

onNext
An Observable calls this method whenever the Observable emits an item. This method takes as a parameter the item emitted by the Observable.
onError
An Observable calls this method to indicate that it has failed to generate the expected data or has encountered some other error. This stops the Observable and it will not make further calls to onNext or onCompleted. The onError method takes as its parameter an indication of what caused the error (sometimes an object like an Exception or Throwable, other times a simple string, depending on the implementation).
onCompleted
An Observable calls this method after it has called onNext for the final time, if it has not encountered any errors.

An Observable is called a “cold” Observable if it does not begin to emit items until an observer has subscribed to it; an Observable is called a “hot” Observable if it may begin emitting items at any time, and a subscriber may begin observing the sequence of emitted items at some point after its commencement, missing out on any items emitted previously to the time of the subscription.

See Also

Language-Specific Information

RxGroovy implements several variants of subscribe.

If you pass it no parameters, it will trigger a subscription to the underlying Observable, but will ignore its emissions and notifications. This will activate a cold Observable.

You can also pass it between one and three functions; these will be interpreted as follows:

  1. onNext
  2. onNext and onError
  3. onNext, onError, and onCompleted

Finally, you can pass it an object that implements either of the Observer or Subscriber interfaces. The Observer interface consists of the three previously-described “on” methods. The Subscriber interface implements these also, and adds a number of additional methods that facilitate reactive pull backpressure and that permit the Subscriber to unsubscribe to an Observable before it completes.

The call to subscribe returns an object that implements the Subscription interface. This interface includes the unsubscribe method that you can call at any time to sever the subscription that subscribe established between the Observable and the observer (or the methods that stand in for the observer).

The forEach operators are simpler versions of subscribe. You can pass them between one and three functions, which will be interpreted as follows:

  1. onNext
  2. onNext and onError
  3. onNext, onError, and onCompleted

Unlike subscribe, forEach does not return an object with which you can cancel the subscription. Nor do you have the option of passing a parameter that has this capability. So you should only use this operator if you definitely need to operate on all of the emissions and notifications from the Observable.

forEach

There is also a BlockingObservable method called forEach that is somewhat similar. In order to use it, you must first convert your source Observable into a BlockingObservable by means of either the BlockingObservable.from method or the Observable.toBlocking operator.

BlockingObservable.forEach takes a single function as its parameter, and this function behaves much like an onNext function in the subscription to an ordinary Observable. The forEach operator itself blocks until the BlockingObservable completes, and it is by unblocking, rather than by calling a callback function, that it indicates that it is complete. If it encounters an error it will throw a RuntimeException (rather than calling an analogue to the onError callback).

RxJava implements several variants of subscribe.

If you pass it no parameters, it will trigger a subscription to the underlying Observable, but will ignore its emissions and notifications. This will activate a cold Observable.

You can also pass it between one and three functions; these will be interpreted as follows:

  1. onNext
  2. onNext and onError
  3. onNext, onError, and onCompleted

Finally, you can pass it an object that implements either of the Observer or Subscriber interfaces. The Observer interface consists of the three previously-described “on” methods. The Subscriber interface implements these also, and adds a number of additional methods that facilitate reactive pull backpressure and that permit the Subscriber to unsubscribe to an Observable before it completes.

The call to subscribe returns an object that implements the Subscription interface. This interface includes the unsubscribe method that you can call at any time to sever the subscription that subscribe established between the Observable and the observer (or the methods that stand in for the observer).

The forEach operators are simpler versions of subscribe. You can pass them between one and three functions, which will be interpreted as follows:

  1. onNext
  2. onNext and onError
  3. onNext, onError, and onCompleted

Unlike subscribe, forEach does not return an object with which you can cancel the subscription. Nor do you have the option of passing a parameter that has this capability. So you should only use this operator if you definitely need to operate on all of the emissions and notifications from the Observable.

forEach

There is also a BlockingObservable method called forEach that is somewhat similar. In order to use it, you must first convert your source Observable into a BlockingObservable by means of either the BlockingObservable.from method or the Observable.toBlocking operator.

BlockingObservable.forEach takes a single function as its parameter, and this function behaves much like an onNext function in the subscription to an ordinary Observable. The forEach operator itself blocks until the BlockingObservable completes, and it is by unblocking, rather than by calling a callback function, that it indicates that it is complete. If it encounters an error it will throw a RuntimeException (rather than calling an analogue to the onError callback).

In RxJS, you can subscribe to an Observable in two ways:

  1. subscribe a single function to either the onNext, the onCompleted, or onError notifications from an Observable, with subscribeOnNext, subscribeOnCompleted, or subscribeOnError respectively
  2. subscribe by passing zero to three individual functions, or an object that implements those three functions, into either the subscribe or forEach operator (those operators behave identically).

Sample Code

var source = Rx.Observable.range(0, 3)

var subscription = source.subscribeOnNext(
  function (x) {
    console.log('Next: %s', x);
  });
Next: 0
Next: 1
Next: 2
var source = Rx.Observable.range(0, 3);

var subscription = source.subscribeOnCompleted(
  function () {
    console.log('Completed');
  });
Completed
var source = Rx.Observable.throw(new Error());

var subscription = source.subscribeOnError(
  function (err) {
    console.log('Error: %s', err);
  });
Error: Error
var observer = Rx.Observer.create(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });

var source = Rx.Observable.range(0, 3)

var subscription = source.subscribe(observer);
Next: 0
Next: 1
Next: 2
Completed
var source = Rx.Observable.range(0, 3)

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
Next: 0
Next: 1
Next: 2
Completed

The functions described in this section are all found in each of the following distributions:

  • rx.js
  • rx.all.js
  • rx.all.compat.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

RxPHP implements this operator as subscribe.

© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/operators/subscribe.html