ReQL command: changes
Command syntax
stream.changes() → stream singleSelection.changes() → stream
Description
Turn a query into a changefeed, an infinite stream of objects representing changes to the query’s results as they occur. A changefeed may return changes to a table or an individual document (a “point” changefeed). Commands such as filter
or map
may be used before the changes
command to transform or filter the output, and many commands that operate on sequences can be chained after changes
.
You may specify one of six optional arguments via optArg.
-
squash
: Controls how change notifications are batched. Acceptable values aretrue
,false
and a numeric value:-
true
: When multiple changes to the same document occur before a batch of notifications is sent, the changes are “squashed” into one change. The client receives a notification that will bring it fully up to date with the server. -
false
: All changes will be sent to the client verbatim. This is the default. -
n
: A numeric value (floating point). Similar totrue
, but the server will waitn
seconds to respond in order to squash as many changes together as possible, reducing network traffic. The first batch will always be returned immediately.
-
-
changefeed_queue_size
: the number of changes the server will buffer between client reads before it starts dropping changes and generates an error (default: 100,000). -
include_initial
: iftrue
, the changefeed stream will begin with the current contents of the table or selection being monitored. These initial results will havenew_val
fields, but noold_val
fields. The initial results may be intermixed with actual changes, as long as an initial result for the changed document has already been given. If an initial result for a document has been sent and a change is made to that document that would move it to the unsent part of the result set (e.g., a changefeed monitors the top 100 posters, the first 50 have been sent, and poster 48 has become poster 52), an “uninitial” notification will be sent, with anold_val
field but nonew_val
field. -
include_states
: iftrue
, the changefeed stream will include special status documents consisting of the fieldstate
and a string indicating a change in the feed’s state. These documents can occur at any point in the feed between the notification documents described below. Ifinclude_states
isfalse
(the default), the status documents will not be sent. -
include_offsets
: iftrue
, a changefeed stream on anorderBy.limit
changefeed will includeold_offset
andnew_offset
fields in status documents that includeold_val
andnew_val
. This allows applications to maintain ordered lists of the stream’s result set. Ifold_offset
is set and notnull
, the element atold_offset
is being deleted; ifnew_offset
is set and notnull
, thennew_val
is being inserted atnew_offset
. Settinginclude_offsets
totrue
on a changefeed that does not support it will raise an error. -
include_types
: iftrue
, every result on a changefeed will include atype
field with a string that indicates the kind of change the result represents:add
,remove
,change
,initial
,uninitial
,state
. Defaults tofalse
.
There are currently two states:
-
{state: 'initializing'}
indicates the following documents represent initial values on the feed rather than changes. This will be the first document of a feed that returns initial values. -
{state: 'ready'}
indicates the following documents represent changes. This will be the first document of a feed that does not return initial values; otherwise, it will indicate the initial values have all been sent.
If the table becomes unavailable, the changefeed will be disconnected, and a runtime exception will be thrown by the driver.
Changefeed notifications take the form of a two-field object:
{
"old_val": <document before change>,
"new_val": <document after change>
}
When include_types
is true
, there will be three fields:
{
"old_val": <document before change>,
"new_val": <document after change>,
"type": <result type>
}
When a document is deleted, new_val
will be null
; when a document is inserted, old_val
will be null
.
Certain document transformation commands can be chained before changefeeds. For more information, read the discussion of changefeeds in the “Query language” documentation.
Note: Changefeeds ignore the
read_mode
flag torun
, and always behave as if it is set tosingle
(i.e., the values they return are in memory on the primary replica, but have not necessarily been written to disk yet). For more details read Consistency guarantees.
The server will buffer up to changefeed_queue_size
elements (default 100,000). If the buffer limit is hit, early changes will be discarded, and the client will receive an object of the form {error: "Changefeed cache over array size limit, skipped X elements."}
where X
is the number of elements skipped.
Commands that operate on streams (such as filter or map) can usually be chained after changes
. However, since the stream produced by changes
has no ending, commands that need to consume the entire stream before returning (such as reduce or count) cannot.
Example: Subscribe to the changes on a table.
Start monitoring the changefeed in one client:
Result<Object> changes = r.table("games").changes().run(conn);
for (Object change : changes) {
System.out.println(change);
}
As these queries are performed in a second client, the first client would receive and print the following objects:
r.table("games").insert(r.hashMap("id", 1)).run(conn);
{"old_val": null, "new_val": {"id": 1}}
r.table("games").get(1).update(r.hashMap("player1", "Bob")).run(conn);
{"old_val": {"id": 1}, "new_val": {"id": 1, "player1": "Bob"}}
r.table("games").get(1).replace(
r.hashMap("id", 1).with("player1", "Bob").with("player2", "Alice")
).run(conn);
{"old_val": {"id": 1, "player1": "Bob"},
"new_val": {"id": 1, "player1": "Bob", "player2": "Alice"}}
r.table("games").get(1).delete().run(conn);
{"old_val": {"id": 1, "player1": "Bob", "player2": "Alice"}, "new_val": null}
r.tableDrop("games").run(conn);
ReqlRuntimeError: Changefeed aborted (table unavailable)
Example: Return all the changes that increase a player’s score.
r.table("test").changes().filter(
row -> row.g("new_val").g("score").gt(row.g("old_val").g("score"))
).run(conn);
Example: Return all the changes to a specific player’s score that increase it past 10.
r.table("test").get(1).filter(row -> row.g("score").gt(10)).changes().run(conn);
Example: Return all the inserts on a table.
r.table("test").changes().filter(
row -> row.g("old_val").eq(null)
).run(conn);
Example: Return all the changes to game 1, with state notifications and initial values.
r.table("games").get(1).changes()
.optArg("include_initial", true).optArg("include_states", true).run(conn);
Result returned on changefeed:
{"state": "initializing"}
{"new_val": {"id": 1, "score": 12, "arena": "Hobbiton Field"}}
{"state": "ready"}
{
"old_val": {"id": 1, "score": 12, "arena": "Hobbiton Field"},
"new_val": {"id": 1, "score": 14, "arena": "Hobbiton Field"}
}
{
"old_val": {"id": 1, "score": 14, "arena": "Hobbiton Field"},
"new_val": {"id": 1, "score": 17, "arena": "Hobbiton Field", "winner": "Frodo"}
}
Example: Return all the changes to the top 10 games. This assumes the presence of a score
secondary index on the games
table.
r.table("games").orderBy().optArg("index", r.desc("score"))
.limit(10).changes().run(conn);
Example: Maintain the state of a list based on a changefeed.
Result<Object> changes = r.table("data").changes()
.optArg("include_initial", true)
.optArg("include_offsets", true)
.run((conn);
for (Object change : changes) {
// Delete item at old_offset before inserting at new_offset
if (change.old_offset != null) {
myList.remove(change.old_offset);
}
if (change.new_offset != null) {
myList.add(change.new_offset, change_new.val);
}
};
(This is a simplistic implementation. For a more sophisticated example, see the applyChange
function in Horizon’s client/src/ast.js source; it’s written in JavaScript, but the principles apply to all languages.)
© RethinkDB contributors
Licensed under the Creative Commons Attribution-ShareAlike 3.0 Unported License.
https://rethinkdb.com/api/java/changes/