最近、MongoDBはバージョン3.6以降の新機能であるChangeStreamsをリリースしました。これにより、データに瞬時にアクセスできるため、データの変更を常に最新の状態に保つことができます。今日の世界では、誰もが数時間または数分後に通知を受け取るのではなく、即時通知を望んでいます。一部のアプリケーションでは、更新のたびに、サブスクライブしているすべてのユーザーにリアルタイムの通知をプッシュすることが重要です。 MongoDBは、この機能を導入することで、このプロセスを非常に簡単にしました。この記事では、MongoDBの変更ストリームとそのアプリケーションについていくつかの例を挙げて学習します。
変更ストリームの定義
変更ストリームは、データベースやコレクション、さらにはデプロイメントで発生する変更のリアルタイムストリームに他なりません。たとえば、特定のコレクションで更新(挿入、更新、削除)が発生するたびに、MongoDBは変更されたすべてのデータを使用して変更イベントをトリガーします。
$ changeStream演算子とwatch()メソッドを使用して、他の通常の集計演算子と同じように、任意のコレクションに変更ストリームを定義できます。 MongoCollection.watch()メソッドを使用して変更ストリームを定義することもできます。
例
db.myCollection.watch()
ストリーム機能の変更
-
変更のフィルタリング
変更をフィルタリングして、一部のターゲットデータのみのイベント通知を取得できます。
例:
pipeline = [ { $match: { name: "Bob" } } ]; changeStream = collection.watch(pipeline);
このコードは、名前がBobと等しいレコードの更新のみを確実に取得します。このようにして、変更ストリームをフィルタリングするためのパイプラインを作成できます。
-
変更ストリームの再開
この機能により、障害が発生した場合でもデータが失われることはありません。ストリーム内の各応答には、特定のポイントからストリームを再開するために使用できる再開トークンが含まれています。頻繁に発生するネットワーク障害の場合、mongodbドライバーは、最新の再開トークンを使用してサブスクライバーとの接続を再確立しようとします。ただし、アプリケーションが完全に失敗した場合は、ストリームを再開するためにクライアントが再開トークンを維持する必要があります。
-
注文された変更ストリーム
MongoDBは、グローバル論理クロックを使用して、クラスターのすべてのレプリカとシャードにまたがるすべての変更ストリームイベントを順序付けます。そのため、レシーバーは、コマンドがデータベースに適用されたのと同じ順序で常に通知を受信します。
-
完全なドキュメントを含むイベント
MongoDBは、デフォルトで一致するドキュメントの一部を返します。ただし、変更ストリーム構成を変更して、完全なドキュメントを受け取ることができます。これを行うには、{fullDocument:“ updateLookup”}をウォッチメソッドに渡します。
例:collection = db.collection("myColl") changeStream = collection.watch({ fullDocument: “updateLookup”})
-
耐久性
変更ストリームは、レプリカの大部分にコミットされているデータについてのみ通知します。これにより、イベントが多数決の永続性データによって生成され、メッセージの耐久性が確保されます。
-
セキュリティ/アクセス制御
変更ストリームは非常に安全です。ユーザーは、読み取り権限を持つコレクションでのみ変更ストリームを作成できます。ユーザーの役割に基づいて変更ストリームを作成できます。
変更ストリームの例
この例では、株価がしきい値を超えたときに通知を受け取るために、株価コレクションに変更ストリームを作成します。
-
クラスターをセットアップする
変更ストリームを使用するには、最初にレプリカセットを作成する必要があります。次のコマンドを実行して、単一ノードのレプリカセットを作成します。
mongod --dbpath ./data --replSet “rs”
-
株式コレクションにいくつかのレコードを挿入します
var docs = [ { ticker: "AAPL", price: 210 }, { ticker: "AAPL", price: 260 }, { ticker: "AAPL", price: 245 }, { ticker: "AAPL", price: 255 }, { ticker: "AAPL", price: 270 } ]; db.Stocks.insert(docs)
-
ノード環境のセットアップと依存関係のインストール
mkdir mongo-proj && cd mongo-proj npm init -y npm install mongodb --save
-
変更をサブスクライブする
index.jsファイルを1つ作成し、その中に次のコードを配置します。
const mongo = require("mongodb").MongoClient; mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => { console.log("Connected to MongoDB server"); // Select DB and Collection const db = client.db("mydb"); const collection = db.collection("Stocks"); pipeline = [ { $match: { "fullDocument.price": { $gte: 250 } } } ]; // Define change stream const changeStream = collection.watch(pipeline); // start listen to changes changeStream.on("change", function(event) { console.log(JSON.stringify(event)); }); });
次に、このファイルを実行します:
node index.js
-
更新を受信するには、dbに新しいレコードを挿入します
db.Stocks.insert({ ticker: “AAPL”, price: 280 })
次に、コンソールを確認します。MongoDBから更新を受け取ります。
応答例:{ "_id":{ "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"}, "operationType":"insert", "clusterTime":"6655565945622233089", "fullDocument":{ "_id":"5c5d51f73aca83479b48de6e", "ticker":"AAPL", "Price":300 }, "ns":{"db":"mydb","coll":"Stocks"}, "documentKey":{"_id":"5c5d51f73aca83479b48de6e"} }
ここでは、次の操作でoperationTypeパラメーターの値を変更して、コレクション内のさまざまなタイプの変更をリッスンできます。
- 挿入
- 置換(一意のIDを除く)
- 更新
- 削除
- 無効(Mongoが無効なカーソルを返す場合)
その他の変更モードストリーム
コレクションに対してと同じ方法で、データベースおよびデプロイメントに対して変更ストリームを開始できます。この機能はMongoDBバージョン4.0からリリースされました。データベースとデプロイメントに対して変更ストリームを開くためのコマンドは次のとおりです。
Against DB: db.watch()
Against deployment: Mongo.watch()
結論
MongoDB Change Streamsは、フロントエンドとバックエンドの統合をリアルタイムでシームレスに簡素化します。この機能は、PubsubモデルにMongoDBを使用するのに役立つため、KafkaまたはRabbitMQのデプロイを管理する必要がなくなります。アプリケーションでリアルタイムの情報が必要な場合は、MongoDBのこの機能を確認する必要があります。この投稿によって、MongoDBの変更ストリームを開始できることを願っています。