Take
emit only the first n items emitted by an Observable
You can emit only the first n items emitted by an Observable and then complete while ignoring the remainder, by modifying the Observable with the Take operator.
See Also
- First
- Skip
- SkipLast
- SkipUntil
- SkipWhile
- TakeLast
- TakeUntil
- TakeWhile
- Introduction to Rx: Skip and Take
- RxMarbles:
take
Language-Specific Information
RxGroovy limit take
In RxGroovy, this operator is implemented as take.
If you use the take(n) operator (or its synonym, limit(n)) on an Observable, and that Observable emits fewer than n items before completing, the new, take-modified Observable will not throw an exception or invoke onError, but will merely emit this same fewer number of items before it completes.
Sample Code
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8]);
numbers.take(3).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
); 1 2 3 Sequence complete
This variant of take does not by default operate on any particular Scheduler.
- Javadoc:
take(int)
There is also a variant of take that takes a temporal duration rather than a quantity of items. It results in an Observable that emits only those items that are emitted during that initial duration of the source Observable’s lifespan. You set this duration by passing in a length of time and the time units this length is denominated in as parameters to take.
This variant of take by default operates on the computation Scheduler, but you may also pass in a Scheduler of your choosing as an optional third parameter.
- Javadoc:
take(long,TimeUnit) - Javadoc:
take(long,TimeUnit,Scheduler)
RxJava 1․x limit take
In RxJava, this operator is implemented as take.
If you use the take(n) operator (or its synonym, limit(n)) on an Observable, and that Observable emits fewer than n items before completing, the new, take-modified Observable will not throw an exception or invoke onError, but will merely emit this same fewer number of items before it completes.
Sample Code
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.take(4)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
}); Next: 1 Next: 2 Next: 3 Next: 4 Sequence complete.
This variant of take does not by default operate on any particular Scheduler.
- Javadoc:
take(int)
There is also a variant of take that takes a temporal duration rather than a quantity of items. It results in an Observable that emits only those items that are emitted during that initial duration of the source Observable’s lifespan. You set this duration by passing in a length of time and the time units this length is denominated in as parameters to take.
This variant of take by default operates on the computation Scheduler, but you may also pass in a Scheduler of your choosing as an optional third parameter.
- Javadoc:
take(long,TimeUnit) - Javadoc:
take(long,TimeUnit,Scheduler)
RxJS take takeUntilWithTime
RxJS implements the take operator.
Sample Code
var source = Rx.Observable.range(0, 5)
.take(3);
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); }); Next: 0 Next: 1 Next: 2 Completed
For the special case of take(0) you can also pass as a second parameter a Scheduler that take will use to immediately schedule a call to onCompleted.
take is found in each of the following distributions:
rx.jsrx.all.jsrx.all.compat.jsrx.compat.jsrx.lite.jsrx.lite.compat.js
RxJS also implements a takeUntilWithTime operator, which is like take except that rather than taking a particular quantity of items, it takes all of the items that are emitted during an initial period of time. You establish this period of by passing in a parameter to takeUntilWithTime, in either of these formats:
- a number
- mirrors items from the source Observable until this many milliseconds have passed since the Observable was subscribed to
- a
Date - mirrors items from the source Observable until this absolute time
You may also, optionally, pass in a Scheduler as a second parameter, and the timer will operate on that Scheduler (takeUntilWithTime uses the timeout Scheduler by default).
Sample Code
var source = Rx.Observable.timer(0, 1000)
.takeUntilWithTime(5000);
var subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); }); Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Completed
takeUntilWithTime is found in each of the following distributions:
rx.all.jsrx.all.compat.js-
rx.time.js(requiresrx.jsorrx.compat.js) rx.lite.jsrx.lite.compat.js
RxPHP take takeUntil
RxPHP implements this operator as take.
Returns a specified number of contiguous elements from the start of an observable sequence
Sample Code
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/take.php
$observable = Rx\Observable::fromArray([21, 42, 63]);
$observable
->take(2)
->subscribe($stdoutObserver); Next value: 21 Next value: 42 Complete!
RxPHP also has an operator takeUntil.
Returns the values from the source observable sequence until the other observable sequence produces a value.
Sample Code
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/take/takeUntil.php
$source = \Rx\Observable::interval(105)
->takeUntil(\Rx\Observable::timer(1000));
$subscription = $source->subscribe($stdoutObserver); Next value: 0 Next value: 1 Next value: 2 Next value: 3 Next value: 4 Next value: 5 Next value: 6 Next value: 7 Next value: 8 Complete!
© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/operators/take.html