From
convert various other objects and data types into Observables

When you work with Observables, it can be more convenient if all of the data you mean to work with can be represented as Observables, rather than as a mixture of Observables and other types. This allows you to use a single set of operators to govern the entire lifespan of the data stream.
Iterables, for example, can be thought of as a sort of synchronous Observable; Futures, as a sort of Observable that always emits only a single item. By explicitly converting such objects to Observables, you allow them to interact as peers with other Observables.
For this reason, most ReactiveX implementations have methods that allow you to convert certain language-specific objects and data structures into Observables.
See Also
- Just
- Start
- 101 Rx Samples: Observation Operators
- RxJava Tutorial 03: Observable from, just, & create methods
Language-Specific Information
RxGroovy decode from fromAction fromCallable fromFunc0 fromRunnable runAsync

In RxGroovy, the from
operator can convert a Future, an Iterable, or an Array. In the case of an Iterable or an Array, the resulting Observable will emit each item contained in the Iterable or Array.
In the case of a Future, it will emit the single result of the get
call. You may optionally pass the version of from
that accepts a future two additional parameters indicating a timeout span and the units of time that span is denominated in. The resulting Observable will terminate with an error if that span of time passes before the Future responds with a value.
from
does not by default operate on any particular Scheduler, however you can pass the variant that converts a Future a Scheduler as an optional second parameter, and it will use that Scheduler to govern the Future.
- Javadoc:
from(array)
- Javadoc:
from(Iterable)
- Javadoc:
from(Future)
- Javadoc:
from(Future,Scheduler)
- Javadoc:
from(Future,timout,timeUnit)

In addition, in the RxJavaAsyncUtil
package, you have available to you the following operators that convert actions, callables, functions, and runnables into Observables that emit the results of those things:
fromAction
fromCallable
fromFunc0
fromRunnable
See the Start operator for more information about those operators.

Note that there is also a from
operator that is a method of the optional StringObservable
class. It converts a stream of characters or a Reader
into an Observable that emits byte arrays or Strings.
In the separate RxJavaAsyncUtil
package, which is not included by default with RxGroovy, there is also a runAsync
function. Pass runAsync
an Action
and a Scheduler
, and it will return a StoppableObservable
that uses the specified Action
to generate items that it emits.
The Action
accepts an Observer
and a Subscription
. It uses the Subscription
to check for the isUnsubscribed
condition, upon which it will stop emitting items. You can also manually stop a StoppableObservable
at any time by calling its unsubscribe
method (which will also unsubscribe the Subscription
you have associated with the StoppableObservable
).
Because runAsync
immediately invokes the Action
and begins emitting the items, it is possible that some items may be lost in the interval between when you establish the StoppableObservable
with this method and when your Observer
is ready to receive items. If this is a problem, you can use the variant of runAsync
that also accepts a Subject
and pass a ReplaySubject
with which you can retrieve the otherwise-missing items.

The StringObservable
class, which is not a default part of RxGroovy, also includes the decode
operator which converts a stream of multibyte characters into an Observable that emits byte arrays that respect the character boundaries.
RxJava 1․x decode from fromAction fromCallable fromFunc0 fromRunnable runAsync

In RxJava, the from
operator can convert a Future, an Iterable, or an Array. In the case of an Iterable or an Array, the resulting Observable will emit each item contained in the Iterable or Array.
Sample Code
Integer[] items = { 0, 1, 2, 3, 4, 5 }; Observable myObservable = Observable.from(items); myObservable.subscribe( new Action1<Integer>() { @Override public void call(Integer item) { System.out.println(item); } }, new Action1<Throwable>() { @Override public void call(Throwable error) { System.out.println("Error encountered: " + error.getMessage()); } }, new Action0() { @Override public void call() { System.out.println("Sequence complete"); } } );
0 1 2 3 4 5 Sequence complete
In the case of a Future, it will emit the single result of the get
call. You may optionally pass the version of from
that accepts a future two additional parameters indicating a timeout span and the units of time that span is denominated in. The resulting Observable will terminate with an error if that span of time passes before the Future responds with a value.
from
does not by default operate on any particular Scheduler, however you can pass the variant that converts a Future a Scheduler as an optional second parameter, and it will use that Scheduler to govern the Future.
- Javadoc:
from(array)
- Javadoc:
from(Iterable)
- Javadoc:
from(Future)
- Javadoc:
from(Future,Scheduler)
- Javadoc:
from(Future,timout,timeUnit)

In addition, in the RxJavaAsyncUtil
package, you have available to you the following operators that convert actions, callables, functions, and runnables into Observables that emit the results of those things:
fromAction
fromCallable
fromFunc0
fromRunnable
See the Start operator for more information about those operators.

Note that there is also a from
operator that is a method of the optional StringObservable
class. It converts a stream of characters or a Reader
into an Observable that emits byte arrays or Strings.
In the separate RxJavaAsyncUtil
package, which is not included by default with RxJava, there is also a runAsync
function. Pass runAsync
an Action
and a Scheduler
, and it will return a StoppableObservable
that uses the specified Action
to generate items that it emits.
The Action
accepts an Observer
and a Subscription
. It uses the Subscription
to check for the isUnsubscribed
condition, upon which it will stop emitting items. You can also manually stop a StoppableObservable
at any time by calling its unsubscribe
method (which will also unsubscribe the Subscription
you have associated with the StoppableObservable
).
Because runAsync
immediately invokes the Action
and begins emitting the items, it is possible that some items may be lost in the interval between when you establish the StoppableObservable
with this method and when your Observer
is ready to receive items. If this is a problem, you can use the variant of runAsync
that also accepts a Subject
and pass a ReplaySubject
with which you can retrieve the otherwise-missing items.

The StringObservable
class, which is not a default part of RxGroovy, also includes the decode
operator which converts a stream of multibyte characters into an Observable that emits byte arrays that respect the character boundaries.
RxJS from fromCallback fromEvent fromEventPattern fromNodeCallback fromPromise of ofArrayChanges ofObjectChanges ofWithScheduler pairs
There are several, specialized From variants in RxJS:

In RxJS, the from
operator converts an array-like or iterable object into an Observable that emits the items in that array or iterable. A String, in this context, is treated as an array of characters.
This operator also takes three additional, optional parameters:
- a transforming function that takes an item from the array or iterable as input and produces an item to be emitted by the resulting Observable as output
- a second argument to pass into the transforming function as additional context information
- a Scheduler on which this operator should operate
Sample Code
// Array-like object (arguments) to Observable function f() { return Rx.Observable.from(arguments); } f(1, 2, 3).subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 3 Completed
// Any iterable object... // Set var s = new Set(['foo', window]); Rx.Observable.from(s).subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: foo Next: window Completed
// Map var m = new Map([[1, 2], [2, 4], [4, 8]]); Rx.Observable.from(m).subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: [1, 2] Next: [2, 4] Next: [4, 8] Completed
// String Rx.Observable.from("foo").subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: f Next: o Next: o Completed
// Using an arrow function as the map function to manipulate the elements Rx.Observable.from([1, 2, 3], function (x) { return x + x; }).subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 2 Next: 4 Next: 6 Completed
// Generate a sequence of numbers Rx.Observable.from({length: 5}, function(v, k) { return k; }).subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Completed
from
is found in the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js

The fromCallback
operator takes a function as a parameter, calls this function, and emits the value returned from it as its single emission.
This operator also takes two additional, optional parameters:
- a parameter to pass to the callback function
- a tranforming function that takes the return value of the callback function as input and returns an item to be emitted by the resulting Observable
Sample Code
var fs = require('fs'), Rx = require('rx'); // Wrap fs.exists var exists = Rx.Observable.fromCallback(fs.exists); // Check if file.txt exists var source = exists('file.txt'); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: true Completed
fromCallback
is found in the following distributions:
rx.all.js
rx.all.compat.js
-
rx.async.js
(requiresrx.binding.js
and eitherrx.js
orrx.compat.js
) -
rx.async.compat.js
(requiresrx.binding.js
and eitherrx.js
orrx.compat.js
) rx.lite.js
rx.lite.compat.js
There is also a fromNodeCallback
operator, which is specialized for the types of callback functions found in Node.js.
This operator takes three additional, optional parameters:
- a Scheduler on which you want to run the Node.js callback
- a parameter to give to the callback function
- a tranforming function that takes the return value of the callback function as input and returns an item to be emitted by the resulting Observable
Sample Code
var fs = require('fs'), Rx = require('rx'); // Wrap fs.exists var rename = Rx.Observable.fromNodeCallback(fs.rename); // Rename file which returns no parameters except an error var source = rename('file1.txt', 'file2.txt'); var subscription = source.subscribe( function () { console.log('Next: success!'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: success! Completed
fromNodeCallback
is found in the following distributions:
-
rx.async.js
(requiresrx.binding.js
and eitherrx.js
orrx.compat.js
) -
rx.async.compat.js
(requiresrx.binding.js
and eitherrx.js
orrx.compat.js
) rx.lite.js
rx.lite.compat.js

The fromEvent
operator takes an “element” and an event name as parameters, and it then listens for events of that name taking place on that element. It returns an Observable that emits those events. An “element” may be a simple DOM element, or a NodeList, jQuery element, Zepto Element, Angular element, Ember.js element, or EventEmitter.
This operator also takes an optional third parameter: a function that accepts the arguments from the event handler as parameters and returns an item to be emitted by the resulting Observable in place of the event.
Sample Code
// using a jQuery element var input = $('#input'); var source = Rx.Observable.fromEvent(input, 'click'); var subscription = source.subscribe( function (x) { console.log('Next: Clicked!'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); input.trigger('click');
Next: Clicked!
// using a Node.js EventEmitter and the optional third parameter var EventEmitter = require('events').EventEmitter, Rx = require('rx'); var eventEmitter = new EventEmitter(); var source = Rx.Observable.fromEvent( eventEmitter, 'data', function (first, second) { return { foo: first, bar: second }; }); var subscription = source.subscribe( function (x) { console.log('Next: foo -' + x.foo + ', bar -' + x.bar); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); eventEmitter.emit('data', 'baz', 'quux');
Next: foo - baz, bar - quux
fromEvent
is found in the following distributions:
-
rx.async.js
(requiresrx.binding.js
and eitherrx.js
orrx.compat.js
) -
rx.async.compat.js
(requiresrx.binding.js
and eitherrx.js
orrx.compat.js
) rx.lite.js
rx.lite.compat.js
The fromEventPattern
operator is similar, except that instead of taking an element and an event name as parameters, it takes two functions as parameters. The first function attaches an event listener to a variety of events on a variety of elements; the second function removes this set of listeners. In this way you can establish a single Observable that emits items representing a variety of events and a variety of target elements.
Sample Code
var input = $('#input'); var source = Rx.Observable.fromEventPattern( function add (h) { input.bind('click', h); }, function remove (h) { input.unbind('click', h); } ); var subscription = source.subscribe( function (x) { console.log('Next: Clicked!'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); input.trigger('click');
Next: Clicked!

The of
operator accepts a number of items as parameters, and returns an Observable that emits each of these parameters, in order, as its emitted sequence.
Sample Code
var source = Rx.Observable.of(1,2,3); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Next: 1 Next: 2 Next: 3 Completed
of
is found in the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
A variant of this operator, called ofWithScheduler
takes a Scheduler as its first parameter, and operates the resulting Observable on this Scheduler.
There is also a fromPromise
operator that converts a Promise into an Observable, converting its resolve
calls into onNext
notifications, and its reject
calls into onError
notifications.
fromPromise
is found in the following distributions:
-
rx.async.js
(requiresrx.binding.js
and eitherrx.js
orrx.compat.js
) -
rx.async.compat.js
(requiresrx.binding.js
and eitherrx.js
orrx.compat.js
) rx.lite.js
rx.lite.compat.js
Sample Code
var promise = new RSVP.Promise(function (resolve, reject) { resolve(42); }); var source = Rx.Observable.fromPromise(promise); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); });
Next: 42: Completed
var promise = new RSVP.Promise(function (resolve, reject) { reject(new Error('reason')); }); var source = Rx.Observable.fromPromise(promise); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); });
Error: Error: reject
There is also an ofArrayChanges
operator that monitors an Array with the Array.observe
method, and returns an Observable that emits any changes that take place in the array. This operator is found only in the rx.all.js
distribution.
Sample Code
var arr = [1,2,3]; var source = Rx.Observable.ofArrayChanges(arr); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); }); arr.push(4)
Next: {type: "splice", object: Array[4], index: 3, removed: Array[0], addedCount: 1}
A similar operator is ofObjectChanges
. It returns an Observable that emits any changes made to a particular object, as reported by its Object.observe
method. It is also found only in the rx.all.js
distribution.
Sample Code
var obj = {x: 1}; var source = Rx.Observable.ofObjectChanges(obj); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); }); obj.x = 42;
Next: {type: "update", object: Object, name: "x", oldValue: 1}
There is also a pairs
operator. This operator accepts an Object, and returns an Observable that emits, as key/value pairs, the attributes of that object.
Sample Code
var obj = { foo: 42, bar: 56, baz: 78 }; var source = Rx.Observable.pairs(obj); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (e) { console.log('Error: ' + e); }, function ( ) { console.log('Completed'); });
Next: ['foo', 42] Next: ['bar', 56] Next: ['baz', 78] Completed
pairs
is found in the following distributions:
rx.js
rx.all.js
rx.all.compat.js
rx.compat.js
rx.lite.js
rx.lite.compat.js
RxPHP fromArray fromIterator asObservable fromPromise
RxPHP implements this operator as fromArray
.
Converts an array to an observable sequence
Sample Code
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/fromArray/fromArray.php $source = \Rx\Observable::fromArray([1, 2, 3, 4]); $subscription = $source->subscribe($stdoutObserver); //Next value: 1 //Next value: 2 //Next value: 3 //Next value: 4 //Complete!
Next value: 1 Next value: 2 Next value: 3 Next value: 4 Complete!
RxPHP also has an operator fromIterator
.
Converts an Iterator into an observable sequence
Sample Code
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/iterator/iterator.php $generator = function () { for ($i = 1; $i <= 3; $i++) { yield $i; } return 4; }; $source = Rx\Observable::fromIterator($generator()); $source->subscribe($stdoutObserver);
Next value: 1 Next value: 2 Next value: 3 Next value: 4 Complete!
RxPHP also has an operator asObservable
.
Hides the identity of an observable sequence.
Sample Code
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/asObservable/asObservable.php // Create subject $subject = new \Rx\Subject\AsyncSubject(); // Send a value $subject->onNext(42); $subject->onCompleted(); // Hide its type $source = $subject->asObservable(); $source->subscribe($stdoutObserver);
Next value: 42 Complete!
RxPHP also has an operator fromPromise
.
Converts a promise into an observable
Sample Code
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/promise/fromPromise.php $promise = \React\Promise\resolve(42); $source = \Rx\Observable::fromPromise($promise); $subscription = $source->subscribe($stdoutObserver);
Next value: 42 Complete!
RxSwift from toObservable
In Swift, this is implemented using the Observable.from
class method.
Each element of the array is produced as an emission. The difference between this method and Observable.just
is that the latter emits the whole array as one emission.
Sample Code
let numbers = [1,2,3,4,5] let source = Observable.from(numbers) source.subscribe { print($0) }
next(1) next(2) next(3) next(4) next(5) completed
© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/operators/from.html