ReQL command: changes
Command syntax
stream.changes([options]) → stream singleSelection.changes([options]) → stream
Description
Turn a query into a changefeed, an infinite stream of objects representing changes to the query’s results as they occur. A changefeed may return changes to a table or an individual document (a “point” changefeed). Commands such as filter
or map
may be used before the changes
command to transform or filter the output, and many commands that operate on sequences can be chained after changes
.
There are six optional arguments to changes
.
-
squash
: Controls how change notifications are batched. Acceptable values areTrue
,False
and a numeric value:-
True
: When multiple changes to the same document occur before a batch of notifications is sent, the changes are “squashed” into one change. The client receives a notification that will bring it fully up to date with the server. -
False
: All changes will be sent to the client verbatim. This is the default. -
n
: A numeric value (floating point). Similar toTrue
, but the server will waitn
seconds to respond in order to squash as many changes together as possible, reducing network traffic. The first batch will always be returned immediately.
-
-
changefeed_queue_size
: the number of changes the server will buffer between client reads before it starts dropping changes and generates an error (default: 100,000). -
include_initial
: ifTrue
, the changefeed stream will begin with the current contents of the table or selection being monitored. These initial results will havenew_val
fields, but noold_val
fields. The initial results may be intermixed with actual changes, as long as an initial result for the changed document has already been given. If an initial result for a document has been sent and a change is made to that document that would move it to the unsent part of the result set (e.g., a changefeed monitors the top 100 posters, the first 50 have been sent, and poster 48 has become poster 52), an “uninitial” notification will be sent, with anold_val
field but nonew_val
field. -
include_states
: ifTrue
, the changefeed stream will include special status documents consisting of the fieldstate
and a string indicating a change in the feed’s state. These documents can occur at any point in the feed between the notification documents described below. Ifinclude_states
isFalse
(the default), the status documents will not be sent. -
include_offsets
: ifTrue
, a changefeed stream on anorder_by.limit
changefeed will includeold_offset
andnew_offset
fields in status documents that includeold_val
andnew_val
. This allows applications to maintain ordered lists of the stream’s result set. Ifold_offset
is set and notNone
, the element atold_offset
is being deleted; ifnew_offset
is set and notNone
, thennew_val
is being inserted atnew_offset
. Settinginclude_offsets
toTrue
on a changefeed that does not support it will raise an error. -
include_types
: ifTrue
, every result on a changefeed will include atype
field with a string that indicates the kind of change the result represents:add
,remove
,change
,initial
,uninitial
,state
. Defaults toFalse
.
There are currently two states:
-
{"state": "initializing"}
indicates the following documents represent initial values on the feed rather than changes. This will be the first document of a feed that returns initial values. -
{"state": "ready"}
indicates the following documents represent changes. This will be the first document of a feed that does not return initial values; otherwise, it will indicate the initial values have all been sent.
Starting with RethinkDB 2.2, state documents will only be sent if the
include_states
option istrue
, even on point changefeeds. Initial values will only be sent ifinclude_initial
istrue
. Ifinclude_states
istrue
andinclude_initial
is false, the first document on the feed will be{'state': 'ready'}
.
If the table becomes unavailable, the changefeed will be disconnected, and a runtime exception will be thrown by the driver.
Changefeed notifications take the form of a two-field object:
{
"old_val": <document before change>,
"new_val": <document after change>
}
When include_types
is True
, there will be three fields:
{
"old_val": <document before change>,
"new_val": <document after change>,
"type": <result type>
}
When a document is deleted, new_val
will be None
; when a document is inserted, old_val
will be None
.
Certain document transformation commands can be chained before changefeeds. For more information, read the discussion of changefeeds in the “Query language” documentation.
Note: Changefeeds ignore the
read_mode
flag torun
, and always behave as if it is set tosingle
(i.e., the values they return are in memory on the primary replica, but have not necessarily been written to disk yet). For more details read Consistency guarantees.
The server will buffer up to 100,000 elements. If the buffer limit is hit, early changes will be discarded, and the client will receive an object of the form {"error": "Changefeed cache over array size limit, skipped X elements."}
where X
is the number of elements skipped.
Commands that operate on streams (such as filter or map) can usually be chained after changes
. However, since the stream produced by changes
has no ending, commands that need to consume the entire stream before returning (such as reduce or count) cannot.
Example: Subscribe to the changes on a table.
Start monitoring the changefeed in one client:
for change in r.table('games').changes().run(conn):
print change
As these queries are performed in a second client, the first client would receive and print the following objects:
> r.table('games').insert({'id': 1}).run(conn)
{'old_val': None, 'new_val': {'id': 1}}
> r.table('games').get(1).update({'player1': 'Bob'}).run(conn)
{'old_val': {'id': 1}, 'new_val': {'id': 1, 'player1': 'Bob'}}
> r.table('games').get(1).replace({'id': 1, 'player1': 'Bob', 'player2': 'Alice'}).run(conn)
{'old_val': {'id': 1, 'player1': 'Bob'},
'new_val': {'id': 1, 'player1': 'Bob', 'player2': 'Alice'}}
> r.table('games').get(1).delete().run(conn)
{'old_val': {'id': 1, 'player1': 'Bob', 'player2': 'Alice'}, 'new_val': None}
> r.table_drop('games').run(conn)
ReqlRuntimeError: Changefeed aborted (table unavailable)
Example: Return all the changes that increase a player’s score.
r.table('test').changes().filter(
r.row['new_val']['score'] > r.row['old_val']['score']
).run(conn)
Example: Return all the changes to a specific player’s score that increase it past 10.
r.table('test').get(1).filter(r.row['score'].gt(10)).changes().run(conn)
Example: Return all the inserts on a table.
r.table('test').changes().filter(r.row['old_val'].eq(None)).run(conn)
Example: Return all the changes to game 1, with state notifications and initial values.
r.table('games').get(1).changes(include_initial=True, include_states=True).run(conn)
# result returned on changefeed
{"state": "initializing"}
{"new_val": {"id": 1, "score": 12, "arena": "Hobbiton Field"}}
{"state": "ready"}
{
"old_val": {"id": 1, "score": 12, "arena": "Hobbiton Field"},
"new_val": {"id": 1, "score": 14, "arena": "Hobbiton Field"}
}
{
"old_val": {"id": 1, "score": 14, "arena": "Hobbiton Field"},
"new_val": {"id": 1, "score": 17, "arena": "Hobbiton Field", "winner": "Frodo"}
}
Example: Return all the changes to the top 10 games. This assumes the presence of a score
secondary index on the games
table.
r.table('games').order_by(index=r.desc('score')).limit(10).changes().run(conn)
Example: Maintain the state of an array based on a changefeed.
for change in r.table('data').changes(include_initial=True, include_offsets=True).run(conn):
# delete item at old_offset before inserting at new_offset
if change.old_offset != None:
my_array.pop(change.old_offset)
if change.new_offset != None:
my_array.insert(change.new_offset, change.new_val);
(This is a simplistic implementation, and in production you should use an asynchronous event model defined with set_loop_type. For a more sophisticated example, see the applyChange
function in Horizon’s client/src/ast.js source; it’s written in JavaScript, but the principles apply to all languages.)
© RethinkDB contributors
Licensed under the Creative Commons Attribution-ShareAlike 3.0 Unported License.
https://rethinkdb.com/api/python/changes/