MongoDB 3.6已经GA有一段时间,网络上对于该版本新特性的详细介绍文章比较少为此借机会对部分新特性做一个相对详细的介绍。基于早期MongoDB版本实现如跨平台数据同步、消息通知、ETL及oplog备份等服务时大多依赖于 Tailable Cursors 的方式。当然这样的实现一来相对复杂同时也存在着一些风险(如不同版本oplog兼容性及过滤特定操作类型等)。
Change streams(暂且叫变更流)的出现不仅为业务提供了实时获取数据库数据变化的简易接口,同时又避免了原来使用tail oplog 的复杂和风险性。下面我们来看看如何来正确使用 Change stream 。
PS:丰富的一线技术、多元化的表现形式,尽在“HULK一线技术杂谈”,点关注哦!
Change stream
使用条件限制
只用于 replica sets 和 sharded clusters ,单节点因为没有oplog故不支持。
复制协议必须是pv1 存储引擎必须是 WiredTiger
驱动实现接口
MongoDB Shell 接口说明
MongoDB 3.6 版本只实现了集合粒度的 change stream 具体方法如下:
db.collection.watch(pipeline, options)
该方法实际上是在集合collection上开启一个change stream的游标。
测试用例(mongo shell环境+副本集primary节点):
1创建一个简单 Change Stream 游标并进行循环迭代
创建一个简单 Change Stream 游标并进行循环迭代
// 在test库的test集合上创建一个名为watchCursor 的change stream 游标
watchCursor = db.getSiblingDB("test").test.watch();
// 对游标watchCursor进行循环迭代(其中当游标关闭或游标迭代没有文档时isExhausted()返回true)
while (!watchCursor.isExhausted()){
if (watchCursor.hasNext()){
a=watchCursor.next();
printjson(a);
}
}
// 开启另一个会话在test库下的test集合执行update操作
db.test.update({x:100},{$set:{age:80}},{upsert:true});
输出结果及详细说明如下:
{
"_id" : { // 表示更新操作的token 值(映射至对应操作的oplog)
"_data":BinData(0,"glsn32QAAAABRmRfaWQAZFsn3vA
7Q4yjQzA+1wBaEAQwkZh988FJS5yreqLRyy/wBA==")
},
"operationType" : "update", // 捕获的具体操作类型
// 输出更新后整个文档的详细信息
// 前提条件是在创建ChangeStream游标是指定了fullDocument : "updateLookup"
"fullDocument" : {
"_id" : ObjectId("5b27e2453b438ca343304236"),
"x" : 100,
"age" : 80,
"name" : "li"
},
"ns" : {
"db" : "test",// 对应的库名
"coll" : "test"// 对应的集合
},
"documentKey" : {
// 操作对应记录的_id,如果是分片集合此处还会输出对应的分片key
"_id" : ObjectId("5b27def03b438ca343303ed7")
},
"updateDescription" : { // 描述了操作后记录影响的具体增量信息
"updatedFields" : { // 增量操作(这里是update)所影响的字段
"age" : 80 // 增量操作(这里是更新后)具体字段的值
},
"removedFields" : [ ] //该字段描述了update操作后被删除的字段信息
}
}
2创建一个只匹配 insert 操作类型的 Change Stream 游标
watchCursor=db.getSiblingDB("test").test.watch(
[
{ $match : {"operationType" : "insert" } }// 只匹配insert 操作的变更
]
);
游标创建后通过对游标进行迭代,只能获取test集合上insert操作类型的信息。其他支持的操作类型update、delete、replaceOne 及输出信息详细说明可参见:Change Events -https://docs.mongodb.com/manual/reference/change-events/
3ChangeStream 的”断线恢复”功能
ChangeStream还支持”断线恢复”功能即当游标因为意外情况关闭后可以通过之前的token信息进行恢复(前提条件是token对应的oplog没有被覆盖),具体使用如下:
var resumeToken={
// 该token 信息可是是之前任意有效操作的输出
"_data" : BinData(0,"glsn32QAAAABRmRfaWQAZFsn3vA7Q
4yjQzA+1wBaEAQwkZh988FJS5yreqLRyy/wBA==")};
var resumedWatchCursor=db.getSiblingDB("test").test.watch(
[],
{ resumeAfter : resumeToken } // 指定对应的token之后开始恢复游标
);
其他的使用场景,读者自行测试即可。
注意事项
1.尝试在单节点(非副本集节点)上创建ChangeStream游标会报如下错误:
command failed: {
"ok" : 0,
"errmsg" : "The $changeStream stage is only supported on replica sets",
"code" : 40573,
"codeName" : "Location40573"
}
2. ChangeStream 只发布持久化到大多数(majority-committed)节点的数据变化通知
3.要想在集合上创建ChangeStream游标用户必须对集合具有读权限
4.对于分片集合带有multi:true 的更新操作可能会导致发布孤立文档的变更消息
5.对于如创建索引的操作游标迭代时直接忽略该操作但是如果 dropDatabase 或对集合进行 rename、drop 操作则会触发游标退出并输出如下信息:
{
"_id" : {
"_data":BinData(0,"glsn6TQAAAABFFoQBI7fTw7hk0LHgHqi0QTIvq0E")
},
"operationType" : "invalidate" // 表示无效或叫非法操作
}
6. 当 ChangeStream 游标因特定操作导致退出后,Mongo Shell 下不会自动恢复,而对于3.6版本系列的各语言驱动则会尝试一次自动恢复。
7. 当对应的 token 信息对应的 oplog 不存在然后尝试恢复ChangeStream 游标时不会报错但尝试对集合进行数据操作后会报如下错:
getMore command failed:{
"operationTime" : Timestamp(1528994552, 1),
"ok" : 0,
"errmsg" : "resume of change stream was not possible, as the resume token was not found
….
}
MongoDB 4.0 的变化
因为4.0版本需要支持集群及库级别的ChangeStream 故会增加如下的pipeline 命令行语法:
// 集群粒度 对应MongoDB Shell Mongo.watch()
{
aggregate: 1
pipeline: [{$changeStream: {allChangesForCluster: true, ...}}, ...],
...
}
// 库粒度 对应MongoDB Shell db.watch()
{
aggregate: 1
pipeline: [{$changeStream: {...}}, ...],
...
}
另外,4.0版本在游标恢复时增加了一个 startAtOperationTime(表示操作时间)参数该参数指定从哪个操作的时间点开始恢复游标,可以通过事件的输出clusterTime 字段获得(其实对应了oplog里的操作时间),值得注意的是该参数不能和resumeAfter同时使用。
再则,4.0版本为了支持多文档事务在事件输出文档中增加了另外两个参数
txnNumber 和
lsid 分别表示事务号及会话ID ,需要注意的是同一个会话内事务ID从0开始自增。
ChangeStream 的介绍都到此为止,因为时间和精力有限难免有些错误还请及时反馈,祝各位玩得开心。
参考链接:
- https://docs.mongodb.com/manual/changeStreams/
- https://docs.mongodb.com/manual/reference/method/db.collection.watch/#db.collection.watch
- https://docs.mongodb.com/manual/administration/change-streams-production-recommendations/
- https://docs.mongodb.com/manual/reference/change-events/
- https://docs.mongodb.com/manual/reference/method/cursor.isExhausted/#cursor.isExhausted
- https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst