Buffer

periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time
Buffer

The Buffer operator transforms an Observable that emits items into an Observable that emits buffered collections of those items. There are a number of variants in the various language-specific implementations of Buffer that differ in how they choose which items go in which buffers.

Note that if the source Observable issues an onError notification, Buffer will pass on this notification immediately without first emitting the buffer it is in the process of assembling, even if that buffer contains items that were emitted by the source Observable before it issued the error notification.

The Window operator is similar to Buffer but collects items into separate Observables rather than into data structures before reemitting them.

See Also

Language-Specific Information

RxCpp implements two variants of Buffer:

buffer(count)

buffer(count)

buffer(count) emits non-overlapping buffers in the form of vectors, each of which contains at most count items from the source Observable (the final emitted vector may have fewer than count items).

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) creates a new buffer starting with the first emitted item from the source Observable, and every skip items thereafter, and fills each buffer with count items: the initial item and count-1 subsequent ones. It emits these buffers as vectors. Depending on the values of count and skip these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).

In RxGroovy there are several variants of Buffer:

buffer(count)

buffer(count)

buffer(count) emits non-overlapping buffers in the form of Lists, each of which contains at most count items from the source Observable (the final emitted List may have fewer than count items).

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) creates a new buffer starting with the first emitted item from the source Observable, and every skip items thereafter, and fills each buffer with count items: the initial item and count-1 subsequent ones. It emits these buffers as Lists. Depending on the values of count and skip these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

When it subscribes to the source Observable, buffer(bufferClosingSelector) begins to collect its emissions into a List, and it also calls bufferClosingSelector to generate a second Observable. When this second Observable emits an TClosing object, buffer emits the current List and repeats this process: beginning a new List and calling bufferClosingSelector to create a new Observable to monitor. It will do this until the source Observable terminates.

buffer(boundary[, initialCapacity])

buffer(boundary)

buffer(boundary) monitors an Observable, boundary. Each time that Observable emits an item, it creates a new List to begin collecting items emitted by the source Observable and emits the previous List.

buffer(bufferOpenings, bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) monitors an Observable, bufferOpenings, that emits BufferOpening objects. Each time it observes such an emitted item, it creates a new List to begin collecting items emitted by the source Observable and it passes the bufferOpenings Observable into the closingSelector function. That function returns an Observable. buffer monitors that Observable and when it detects an emitted item from it, it closes the List and emits it as its own emission.

buffer(timespan, unit[, scheduler])

buffer(timespan,unit)

buffer(timespan, unit) emits a new List of items periodically, every timespan amount of time, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the computation Scheduler.

buffer(timespan, unit, count[, scheduler])

buffer(timespan,unit,count)

buffer(timespan, unit, count) emits a new List of items for every count items emitted by the source Observable, or, if timespan has elapsed since its last bundle emission, it emits a bundle of however many items the source Observable has emitted in that span, even if this is fewer than count. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the computation scheduler.

buffer(timespan, timeshift, unit[, scheduler])

buffer(timespan,timeshift,unit)

buffer(timespan, timeshift, unit) creates a new List of items every timeshift period of time, and fills this bundle with every item emitted by the source Observable from that time until timespan time has passed since the bundle’s creation, before emitting this List as its own emission. If timespan is longer than timeshift, the emitted bundles will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the computation scheduler.

You can use the Buffer operator to implement backpressure (that is, to cope with an Observable that may produce items too quickly for its observer to consume).

Buffer as a backpressure strategy

Buffer can reduce a sequence of many items to a sequence of fewer buffers-of-items, making them more manageable. You could, for example, close and emit a buffer of items from a bursty Observable periodically, at a regular interval of time.

Sample Code

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
Buffer as a backpressure strategy

Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the Debounce operator to emit a buffer closing indicator to the buffer operator.

Sample Code

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

In RxJava there are several variants of Buffer:

buffer(count)

buffer(count)

buffer(count) emits non-overlapping buffers in the form of Lists, each of which contains at most count items from the source Observable (the final emitted List may have fewer than count items).

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) creates a new buffer starting with the first emitted item from the source Observable, and every skip items thereafter, and fills each buffer with count items: the initial item and count-1 subsequent ones. It emits these buffers as Lists. Depending on the values of count and skip these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

When it subscribes to the source Observable, buffer(bufferClosingSelector) begins to collect its emissions into a List, and it also calls bufferClosingSelector to generate a second Observable. When this second Observable emits an TClosing object, buffer emits the current List and repeats this process: beginning a new List and calling bufferClosingSelector to create a new Observable to monitor. It will do this until the source Observable terminates.

buffer(boundary)

buffer(boundary)

buffer(boundary) monitors an Observable, boundary. Each time that Observable emits an item, it creates a new List to begin collecting items emitted by the source Observable and emits the previous List.

buffer(bufferOpenings, bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) monitors an Observable, bufferOpenings, that emits BufferOpening objects. Each time it observes such an emitted item, it creates a new List to begin collecting items emitted by the source Observable and it passes the bufferOpenings Observable into the closingSelector function. That function returns an Observable. buffer monitors that Observable and when it detects an emitted item from it, it closes the List and emits it as its own emission.

buffer(timespan, unit[, scheduler])

buffer(timespan,unit)

buffer(timespan, unit) emits a new List of items periodically, every timespan amount of time, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the computation scheduler.

buffer(timespan, unit, count[, scheduler])

buffer(timespan,unit,count)

buffer(timespan, unit, count) emits a new List of items for every count items emitted by the source Observable, or, if timespan has elapsed since its last bundle emission, it emits a bundle of however many items the source Observable has emitted in that span, even if this is fewer than count. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the computation scheduler.

buffer(timespan, timeshift, unit[, scheduler])

buffer(timespan,timeshift,unit)

buffer(timespan, timeshift, unit) creates a new List of items every timeshift period of time, and fills this bundle with every item emitted by the source Observable from that time until timespan time has passed since the bundle’s creation, before emitting this List as its own emission. If timespan is longer than timeshift, the emitted bundles will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the computation scheduler.

You can use the Buffer operator to implement backpressure (that is, to cope with an Observable that may produce items too quickly for its observer to consume).

Buffer as a backpressure strategy

Buffer can reduce a sequence of many items to a sequence of fewer buffers-of-items, making them more manageable. You could, for example, close and emit a buffer of items from a bursty Observable periodically, at a regular interval of time.

Sample Code

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
Buffer as a backpressure strategy

Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the Debounce operator to emit a buffer closing indicator to the buffer operator.

Sample Code

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

See Also

RxJS has four Buffer operators — buffer, bufferWithCount, bufferWithTime, and bufferWithTimeOrCount — each of which has variants that have different ways of governing which source Observable items are emitted as part of which buffers.

buffer(bufferBoundaries)

buffer(bufferBoundaries)

buffer(bufferBoundaries) monitors an Observable, bufferBoundaries. Each time that Observable emits an item, it creates a new collection to begin collecting items emitted by the source Observable and emits the previous collection.

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

When it subscribes to the source Observable, buffer(bufferClosingSelector) begins to collect its emissions into a collection, and it also calls bufferClosingSelector to generate a second Observable. When this second Observable emits an item, buffer emits the current collection and repeats this process: beginning a new collection and calling bufferClosingSelector to create a new Observable to monitor. It will do this until the source Observable terminates.

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) monitors an Observable, bufferOpenings, that emits BufferOpening objects. Each time it observes such an emitted item, it creates a new collection to begin collecting items emitted by the source Observable and it passes the bufferOpenings Observable into the bufferClosingSelector function. That function returns an Observable. buffer monitors that Observable and when it detects an emitted item from it, it emits the current collection and begins a new one.

buffer is found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.coincidence.js

buffer requires one of the following distributions:

  • rx.js
  • rx.compat.js
  • rx.lite.js
  • rx.lite.compat.js

bufferWithCount(count)

bufferWithCount(count)

bufferWithCount(count) emits non-overlapping buffers, each of which contains at most count items from the source Observable (the final emitted buffer may contain fewer than count items).

bufferWithCount(count, skip)

bufferWithCount(count,skip)

bufferWithCount(count, skip) creates a new buffer starting with the first emitted item from the source Observable, and a new one for every skip items thereafter, and fills each buffer with count items: the initial item and count-1 subsequent ones, emitting each buffer when it is complete. Depending on the values of count and skip these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).

bufferWithCount is found in each of the following distributions:

  • rx.js
  • rx.compat.js
  • rx.all.js
  • rx.all.compat.js
  • rx.lite.extras.js

bufferWithTime(timeSpan)

bufferWithTime(timeSpan)

bufferWithTime(timeSpan) emits a new collection of items periodically, every timeSpan milliseconds, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the timeout scheduler.

bufferWithTime(timeSpan, timeShift)

bufferWithTime(timeSpan,timeShift)

bufferWithTime(timeSpan, timeShift) creates a new collection of items every timeShift milliseconds, and fills this bundle with every item emitted by the source Observable from that time until timeSpan milliseconds has passed since the collection’s creation, before emitting this collection as its own emission. If timeSpan is longer than timeShift, the emitted bundles will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the timeout scheduler.

bufferWithTimeOrCount(timeSpan, count)

bufferWithTimeOrCount(timeSpan,count)

bufferWithTimeOrCount(timeSpan, count) emits a new collection of items for every count items emitted by the source Observable, or, if timeSpan milliseconds have elapsed since its last collection emission, it emits a collection of however many items the source Observable has emitted in that span, even if this is fewer than count. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the timeout scheduler.

bufferWithTime and bufferWithTimeOrCount are found in each of the following distributions:

  • rx.all.js
  • rx.all.compat.js
  • rx.time.js

bufferWithTime and bufferWithTimeOrCount require one of the following distributions:

  • rx.time.js requires rx.js or rx.compat.js
  • otherwise: rx.lite.js or rx.lite.compat.js

In RxKotlin there are several variants of Buffer:

buffer(count)

buffer(count)

buffer(count) emits non-overlapping buffers in the form of Lists, each of which contains at most count items from the source Observable (the final emitted List may have fewer than count items).

buffer(count, skip)

buffer(count,skip)

buffer(count, skip) creates a new buffer starting with the first emitted item from the source Observable, and every skip items thereafter, and fills each buffer with count items: the initial item and count-1 subsequent ones. It emits these buffers as Lists. Depending on the values of count and skip these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).

buffer(bufferClosingSelector)

buffer(bufferClosingSelector)

When it subscribes to the source Observable, buffer(bufferClosingSelector) begins to collect its emissions into a List, and it also calls bufferClosingSelector to generate a second Observable. When this second Observable emits an TClosing object, buffer emits the current List and repeats this process: beginning a new List and calling bufferClosingSelector to create a new Observable to monitor. It will do this until the source Observable terminates.

buffer(boundary)

buffer(boundary)

buffer(boundary) monitors an Observable, boundary. Each time that Observable emits an item, it creates a new List to begin collecting items emitted by the source Observable and emits the previous List.

buffer(bufferOpenings, bufferClosingSelector)

buffer(bufferOpenings,bufferClosingSelector)

buffer(bufferOpenings, bufferClosingSelector) monitors an Observable, bufferOpenings, that emits BufferOpening objects. Each time it observes such an emitted item, it creates a new List to begin collecting items emitted by the source Observable and it passes the bufferOpenings Observable into the closingSelector function. That function returns an Observable. buffer monitors that Observable and when it detects an emitted item from it, it closes the List and emits it as its own emission.

buffer(timespan, unit[, scheduler])

buffer(timespan,unit)

buffer(timespan, unit) emits a new List of items periodically, every timespan amount of time, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the computation scheduler.

buffer(timespan, unit, count[, scheduler])

buffer(timespan,unit,count)

buffer(timespan, unit, count) emits a new List of items for every count items emitted by the source Observable, or, if timespan has elapsed since its last bundle emission, it emits a bundle of however many items the source Observable has emitted in that span, even if this is fewer than count. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the computation scheduler.

buffer(timespan, timeshift, unit[, scheduler])

buffer(timespan,timeshift,unit)

buffer(timespan, timeshift, unit) creates a new List of items every timeshift period of time, and fills this bundle with every item emitted by the source Observable from that time until timespan time has passed since the bundle’s creation, before emitting this List as its own emission. If timespan is longer than timeshift, the emitted bundles will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan; by default this variant uses the computation scheduler.

In Rx.NET there are several variants of Buffer. For each variety you can either pass in the source Observable as the first parameter, or you can call it as an instance method of the source Observable (in which case you can omit that parameter):

Buffer(count)

Buffer(count)

Buffer(count) emits non-overlapping buffers in the form of ILists, each of which contains at most count items from the source Observable (the final emitted IList may have fewer than count items).

Buffer(count, skip)

Buffer(count,skip)

Buffer(count, skip) creates a new buffer starting with the first emitted item from the source Observable, and every skip items thereafter, and fills each buffer with count items: the initial item and count-1 subsequent ones. It emits these buffers as ILists. Depending on the values of count and skip these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).

Buffer(bufferClosingSelector)

Buffer(bufferClosingSelector)

When it subscribes to the source Observable, Buffer(bufferClosingSelector) begins to collect its emissions into an IList, and it also calls bufferClosingSelector to generate a second Observable. When this second Observable emits an TBufferClosing object, Buffer emits the current IList and repeats this process: beginning a new IList and calling bufferClosingSelector to create a new Observable to monitor. It will do this until the source Observable terminates.

Buffer(bufferOpenings,bufferClosingSelector)

Buffer(bufferOpenings,bufferClosingSelector)

Buffer(bufferOpenings, bufferClosingSelector) monitors an Observable, BufferOpenings, that emits TBufferOpening objects. Each time it observes such an emitted item, it creates a new IList to begin collecting items emitted by the source Observable and it passes the TBufferOpening object into the bufferClosingSelector function. That function returns an Observable. Buffer monitors that Observable and when it detects an emitted item from it, it closes the IList and emits it as its own emission.

Buffer(timeSpan)

Buffer(timeSpan)

Buffer(timeSpan) emits a new IList of items periodically, every timeSpan amount of time, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first list, since the subscription to the source Observable. There is also a version of this variant of the operator that takes an IScheduler as a parameter and uses it to govern the timespan.

Buffer(timeSpan, count)

Buffer(timeSpan,count)

Buffer(timeSpan, count) emits a new IList of items for every count items emitted by the source Observable, or, if timeSpan has elapsed since its last list emission, it emits a list of however many items the source Observable has emitted in that span, even if this is fewer than count. There is also a version of this variant of the operator that takes an IScheduler as a parameter and uses it to govern the timespan.

Buffer(timeSpan, timeShift)

Buffer(timeSpan,timeShift)

Buffer(timeSpan, timeShift) creates a new IList of items every timeShift period of time, and fills this list with every item emitted by the source Observable from that time until timeSpan time has passed since the list’s creation, before emitting this IList as its own emission. If timeSpan is longer than timeShift, the emitted lists will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes an IScheduler as a parameter and uses it to govern the timespan.

RxPHP implements this operator as bufferWithCount.

Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.

Sample Code

//from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCount.php

$source = Rx\Observable::range(1, 6)
    ->bufferWithCount(2)
    ->subscribe($stdoutObserver);
Next value: [1,2]
Next value: [3,4]
Next value: [5,6]
Complete!
//from https://github.com/ReactiveX/RxPHP/blob/master/demo/bufferWithCount/bufferWithCountAndSkip.php

$source = Rx\Observable::range(1, 6)
    ->bufferWithCount(2, 1)
    ->subscribe($stdoutObserver);
Next value: [1,2]
Next value: [2,3]
Next value: [3,4]
Next value: [4,5]
Next value: [5,6]
Next value: [6]
Complete!

RxPY has several Buffer variants: buffer, buffer_with_count, buffer_with_time, and buffer_with_time_or_count. For each of these variants there are optional parameters that change the behavior of the operator. As always in RxPY, when an operator may take more than one optional parameter, be sure to name the parameter in the parameter list when you call the operator so as to avoid ambiguity.

buffer(buffer_openings)

buffer(buffer_openings)

buffer(buffer_openings=boundaryObservable) monitors an Observable, buffer_openings. Each time that Observable emits an item, it creates a new array to begin collecting items emitted by the source Observable and emits the previous array.

buffer(closing_selector)

buffer(closing_selector)

buffer(closing_selector=closingSelector) begins collecting items emitted by the source Observable immediately upon subscription, and also calls the closing_selector function to generate a second Observable. It monitors this new Observable and, when it completes or emits an item, it emits the current array, begins a new array to collect items from the source Observable, and calls closing_selector again to generate a new Observable to monitor in order to determine when to emit the new array. It repeats this process until the source Observable terminates, whereupon it emits the final array.

buffer(closing_selector,buffer_closing_selector)

buffer(closing_selector=openingSelector, buffer_closing_selector=closingSelector) begins by calling closing_selector to get an Observable. It monitors this Observable, and, whenever it emits an item, buffer creates a new array, begins to collect items subsequently emitted by the source Observable into this array, and calls buffer_closing_selector to get a new Observable to govern the closing of that array. When this new Observable emits an item or terminates, buffer closes and emits the array that the Observable governs.

buffer_with_count(count)

buffer_with_count(count)

buffer_with_count(count) emits non-overlapping buffers in the form of arrays, each of which contains at most count items from the source Observable (the final emitted array may have fewer than count items).

buffer_with_count(count, skip)

buffer_with_count(count,skip)

buffer_with_count(count, skip=skip) creates a new buffer starting with the first emitted item from the source Observable, and every skip items thereafter, and fills each buffer with count items: the initial item and count-1 subsequent ones. It emits these buffers as arrays. Depending on the values of count and skip these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).

buffer_with_time(timespan)

buffer_with_time(timespan)

buffer_with_time(timespan) emits a new array of items periodically, every timespan milliseconds, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. There is also a version of this variant of the operator that takes a scheduler parameter and uses it to govern the timespan; by default this variant uses the timeout scheduler.

buffer_with_time(timespan, timeshift)

buffer_with_time(timespan,timeshift)

buffer(timespan, timeshift=timeshift) creates a new array of items every timeshift milliseconds, and fills this array with every item emitted by the source Observable from that time until timespan milliseconds have passed since the array’s creation, before emitting this array as its own emission. If timespan is longer than timeshift, the emitted arrays will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a scheduler parameter and uses it to govern the timespan; by default this variant uses the timeout scheduler.

buffer_with_time_or_count(timespan, count)

buffer_with_time_or_count(timespan,count)

buffer_with_time_or_count(timespan, count) emits a new array of items for every count items emitted by the source Observable, or, if timespan milliseconds have elapsed since its last bundle emission, it emits an array of however many items the source Observable has emitted in that span, even if this is fewer than count. There is also a version of this variant of the operator that takes a scheduler parameter and uses it to govern the timespan; by default this variant uses the timeout scheduler.

Rx.rb has three variants of the Buffer operator:

buffer_with_count(count)

buffer_with_count(count)

buffer_with_count(count) emits non-overlapping buffers in the form of arrays, each of which contains at most count items from the source Observable (the final emitted array may have fewer than count items).

buffer_with_count(count,skip)

buffer_with_count(count,skip)

buffer_with_count(count, skip=skip) creates a new buffer starting with the first emitted item from the source Observable, and every skip items thereafter, and fills each buffer with count items: the initial item and count-1 subsequent ones. It emits these buffers as arrays. Depending on the values of count and skip these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).

buffer_with_time(timespan)

buffer_with_time(timespan)

buffer_with_time(timespan) emits a new array of items periodically, every timespan milliseconds, containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable.

RxScala has two varieties of BufferslidingBuffer and tumblingBuffer — each of which has variants with different ways of assembling the buffers they emit:

slidingBuffer(count, skip)

slidingBuffer(count,skip)

slidingBuffer(count, skip) creates a new buffer starting with the first emitted item from the source Observable, and every skip items thereafter, and fills each buffer with count items: the initial item and count-1 subsequent ones. It emits these buffers as Seqs. Depending on the values of count and skip these buffers may overlap (multiple buffers may contain the same item), or they may have gaps (where items emitted by the source Observable are not represented in any buffer).

slidingBuffer(timespan, timeshift)

slidingBuffer(timespan,timeshift)

slidingBuffer(timespan, timeshift) creates a new Seq of items every timeshift (a Duration), and fills this buffer with every item emitted by the source Observable from that time until timespan (also a Duration) has passed since the buffer’s creation, before emitting this Seq as its own emission. If timespan is longer than timeshift, the emitted arrays will represent time periods that overlap and so they may contain duplicate items. There is also a version of this variant of the operator that takes a Scheduler as a parameter and uses it to govern the timespan.

slidingBuffer(openings, closings)

slidingBuffer(openings,closings)

slidingBuffer(openings,closings) monitors the openings Observable, and, whenever it emits an Opening item, slidingBuffer creates a new Seq, begins to collect items subsequently emitted by the source Observable into this buffer, and calls closings to get a new Observable to govern the closing of that buffer. When this new Observable emits an item or terminates, slidingBuffer closes and emits the Seqthat the Observable governs.

tumblingBuffer(count)

tumblingBuffer(count)

tumblingBuffer(count) emits non-overlapping buffers in the form of Seqs, each of which contains at most count items from the source Observable (the final emitted buffer may have fewer than count items).

tumblingBuffer(boundary)

tumblingBuffer(boundary)

tumblingBuffer(boundary) monitors an Observable, boundary. Each time that Observable emits an item, it creates a new Seq to begin collecting items emitted by the source Observable and emits the previous Seq. This variant of the operator has an optional second parameter, initialCapacity with which you can indicate the expected size of these buffers so as to make memory allocation more efficient.

tumblingBuffer(timespan)

tumblingBuffer(timespan)

tumblingBuffer(timespan) emits a new Seq of items periodically, every timespan (a Duration), containing all items emitted by the source Observable since the previous bundle emission or, in the case of the first bundle, since the subscription to the source Observable. This variant of the operator has an optional second parameter, scheduler, with which you can set the Scheduler that you want to govern the timespan calculation.

tumblingBuffer(timespan, count)

tumblingBuffer(timespan,count)

tumblingBuffer(timespan, count) emits a new Seq of items for every count items emitted by the source Observable, or, if timespan (a Duration) has elapsed since its last bundle emission, it emits a Seq containing however many items the source Observable emitted in that span, even if this is fewer than count. This variant of the operator has an optional third parameter, scheduler, with which you can set the Scheduler that you want to govern the timespan calculation.

© ReactiveX contributors
Licensed under the Apache License 2.0.
http://reactivex.io/documentation/operators/buffer.html