MongoDB 3.6 Change Stream Example with NoSQLBooster

By Qinghai | January 15, 2018

MongoDB 3.6 have a new change notification API, called a “change stream.” Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a collection and immediately react to them.

This post is a step by step tutorial on how to open and configure change streams with NoSQLBooster.

Change streams are available for replica sets or sharded clusters with replica set shards. You cannot open a change stream against a standalone mongod. The example below assume that you have connected to a MongoDB 3.6 replica set and have accessed a database. If you want to deploy a replica set in a development or test environment, please refer to this link.


Prepare Demo Data

Insert the following demo data to MongoDB. Open a shell tab Ctrl-T and execute the following script to get the unicorns collection.

1
2
3
4
5
6
7
8
9
use test

db.unicorns.insert([
{ "name" : "Horny", "weight" : 600 },
{ "name" : "Aurora", "weight" : 450},
{ "name" : "Unicrom", "weight" : 984},
{ "name" : "Roooooodles", "weight" : 590},
{ "name" : "Solnara", "weight" : 550}
])

Setup Watch on Collection

Here is the most important part. Let's setup watch on collection, so that we can listen for any changes in the operation that occur in the collection, such as insert, delete, replace, update, invalidate and so on.

Right-click the collection node in the connection tree and select "Watch Collection" item like this:

Watch Collection Menu Item

NoSQLBooster also provides a code snippet for watching collection. Open a shell tab Ctrl-T and enter "db.unicorn.wa" like this, select "watch" snippet (not method) in the auto-complete list.

Watch Collection Code Snippet

NoSQLBooster will generate code and run it immediately. Let's press Command+F2 to stop it and explore the generated code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
db.unicorns.watch([{
$match: {
operationType: "update", //update|insert|delete|replace|update|invalidate
}
}],
{
fullDocument: "updateLookup", //default|updateLookup
})
.on("change", (data) => {
console.log(tojson(data))
});

while (true) { //
sleep(1000)
}

In the example above, we subscribed to change stream events "change" as asynchronous events.

The pipeline list includes a single $match stage that filters any operations where the operationType is update, if you want listen any operations, please comment this line.

And, the fullDocument option is set to "updateLookup", that means all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation. The other option is default which indicates you don’t need any extra data. Omitting the fullDocument option is equivalent to specifying the value default.

OK, Let's presss "Command+Enter" or "F5" to run the code.

Simulate Data Update

Let's update some records. Open a shell tab Ctrl-T and execute the following script to regularly update some data randomly.

1
2
3
4
5
6
7
while (true) {
db.unicorns.find({}).forEach((it)=>{
db.unicorns.update({_id:it._id},{$set:{weight: _.random(500,1000)}})

sleep(1000);
})
}

Run it and back to "watch collection" tab. Now you should see, as time passes, every record will get updated into Unicorn collection and we will get every update happening on our screen.

Watch Collection

Here is an example of the output of the result.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"_id" : {
"_data" : BinData(0,"glpcHXoAAAABRmRfaWQAZFpcClATiDecpJhQLwBaEAQNyDr9S2FNdpXxKqqHY5WcBA==")
},
"operationType" : "update",
"fullDocument" : {
"_id" : ObjectId("5a5c0a501388379ca498502f"),
"name" : "Solnara",
"weight" : 751
},
"ns" : {
"db" : "test",
"coll" : "unicorns"
},
"documentKey" : {
"_id" : ObjectId("5a5c0a501388379ca498502f")
},
"updateDescription" : {
"updatedFields" : {
"weight" : 751
},
"removedFields" : [ ]
}
}

Thank you!

Please visit our feedback page or click the “Feedback” button in the app. Feel free to suggest improvements to our product or service. Users can discuss your suggestion and vote for and against it. We’ll look at it too.