sql >> データベース >  >> NoSQL >> MongoDB

MongoDB変更ストリームを使用したリアルタイムデータストリーミング

    最近、MongoDBはバージョン3.6以降の新機能であるChangeStreamsをリリースしました。これにより、データに瞬時にアクセスできるため、データの変更を常に最新の状態に保つことができます。今日の世界では、誰もが数時間または数分後に通知を受け取るのではなく、即時通知を望んでいます。一部のアプリケーションでは、更新のたびに、サブスクライブしているすべてのユーザーにリアルタイムの通知をプッシュすることが重要です。 MongoDBは、この機能を導入することで、このプロセスを非常に簡単にしました。この記事では、MongoDBの変更ストリームとそのアプリケーションについていくつかの例を挙げて学習します。

    変更ストリームの定義

    変更ストリームは、データベースやコレクション、さらにはデプロイメントで発生する変更のリアルタイムストリームに他なりません。たとえば、特定のコレクションで更新(挿入、更新、削除)が発生するたびに、MongoDBは変更されたすべてのデータを使用して変更イベントをトリガーします。

    $ changeStream演算子とwatch()メソッドを使用して、他の通常の集計演算子と同じように、任意のコレクションに変更ストリームを定義できます。 MongoCollection.watch()メソッドを使用して変更ストリームを定義することもできます。

    db.myCollection.watch()

    ストリーム機能の変更

    • 変更のフィルタリング

      変更をフィルタリングして、一部のターゲットデータのみのイベント通知を取得できます。

      例:

      pipeline = [
         {
           $match: { name: "Bob" }
         } ];
      changeStream = collection.watch(pipeline);

      このコードは、名前がBobと等しいレコードの更新のみを確実に取得します。このようにして、変更ストリームをフィルタリングするためのパイプラインを作成できます。

    • 変更ストリームの再開

      この機能により、障害が発生した場合でもデータが失われることはありません。ストリーム内の各応答には、特定のポイントからストリームを再開するために使用できる再開トークンが含まれています。頻繁に発生するネットワーク障害の場合、mongodbドライバーは、最新の再開トークンを使用してサブスクライバーとの接続を再確立しようとします。ただし、アプリケーションが完全に失敗した場合は、ストリームを再開するためにクライアントが再開トークンを維持する必要があります。

    • 注文された変更ストリーム

      MongoDBは、グローバル論理クロックを使用して、クラスターのすべてのレプリカとシャードにまたがるすべての変更ストリームイベントを順序付けます。そのため、レシーバーは、コマンドがデータベースに適用されたのと同じ順序で常に通知を受信します。

    • 完全なドキュメントを含むイベント

      MongoDBは、デフォルトで一致するドキュメントの一部を返します。ただし、変更ストリーム構成を変更して、完全なドキュメントを受け取ることができます。これを行うには、{fullDocument:“ updateLookup”}をウォッチメソッドに渡します。
      例:

      collection = db.collection("myColl")
      changeStream = collection.watch({ fullDocument: “updateLookup”})
    • 耐久性

      変更ストリームは、レプリカの大部分にコミットされているデータについてのみ通知します。これにより、イベントが多数決の永続性データによって生成され、メッセージの耐久性が確保されます。

    • セキュリティ/アクセス制御

      変更ストリームは非常に安全です。ユーザーは、読み取り権限を持つコレクションでのみ変更ストリームを作成できます。ユーザーの役割に基づいて変更ストリームを作成できます。

    SomeninesがMongoDBDBAになる-MongoDBを本番環境に導入MongoDBDownloadを無料でデプロイ、監視、管理、スケーリングするために知っておくべきことを学びましょう

    変更ストリームの例

    この例では、株価がしきい値を超えたときに通知を受け取るために、株価コレクションに変更ストリームを作成します。

    • クラスターをセットアップする

      変更ストリームを使用するには、最初にレプリカセットを作成する必要があります。次のコマンドを実行して、単一ノードのレプリカセットを作成します。

      mongod --dbpath ./data --replSet “rs”
    • 株式コレクションにいくつかのレコードを挿入します

      var docs = [
       { ticker: "AAPL", price: 210 },
       { ticker: "AAPL", price: 260 },
       { ticker: "AAPL", price: 245 },
       { ticker: "AAPL", price: 255 },
       { ticker: "AAPL", price: 270 }
      ];
      db.Stocks.insert(docs)
    • ノード環境のセットアップと依存関係のインストール

      mkdir mongo-proj && cd mongo-proj
      npm init -y
      npm install mongodb --save
    • 変更をサブスクライブする

      index.jsファイルを1つ作成し、その中に次のコードを配置します。

      const mongo = require("mongodb").MongoClient;
      mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
       console.log("Connected to MongoDB server");
       // Select DB and Collection
       const db = client.db("mydb");
       const collection = db.collection("Stocks");
       pipeline = [
         {
           $match: { "fullDocument.price": { $gte: 250 } }
         }
       ];
       // Define change stream
       const changeStream = collection.watch(pipeline);
       // start listen to changes
       changeStream.on("change", function(event) {
         console.log(JSON.stringify(event));
       });
      });

      次に、このファイルを実行します:

      node index.js
    • 更新を受信するには、dbに新しいレコードを挿入します

      db.Stocks.insert({ ticker: “AAPL”, price: 280 })

      次に、コンソールを確認します。MongoDBから更新を受け取ります。
      応答例:

      {
      "_id":{
      "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
      "operationType":"insert",
      "clusterTime":"6655565945622233089",
      "fullDocument":{
      "_id":"5c5d51f73aca83479b48de6e",
      "ticker":"AAPL",
      "Price":300
      },
      "ns":{"db":"mydb","coll":"Stocks"},
      "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
      }

    ここでは、次の操作でoperationTypeパラメーターの値を変更して、コレクション内のさまざまなタイプの変更をリッスンできます。

    • 挿入
    • 置換(一意のIDを除く)
    • 更新
    • 削除
    • 無効(Mongoが無効なカーソルを返す場合)

    その他の変更モードストリーム

    コレクションに対してと同じ方法で、データベースおよびデプロイメントに対して変更ストリームを開始できます。この機能はMongoDBバージョン4.0からリリースされました。データベースとデプロイメントに対して変更ストリームを開くためのコマンドは次のとおりです。

    Against DB: db.watch()
    Against deployment: Mongo.watch()

    結論

    MongoDB Change Streamsは、フロントエンドとバックエンドの統合をリアルタイムでシームレスに簡素化します。この機能は、PubsubモデルにMongoDBを使用するのに役立つため、KafkaまたはRabbitMQのデプロイを管理する必要がなくなります。アプリケーションでリアルタイムの情報が必要な場合は、MongoDBのこの機能を確認する必要があります。この投稿によって、MongoDBの変更ストリームを開始できることを願っています。


    1. MongoDBコンポジットキー

    2. 2つのmongodbコレクションを比較する方法は?

    3. MongoDBの公式C#ドライバーを使用して「ID」で1つの「ドキュメント」を削除するにはどうすればよいですか?

    4. Booksleeve接続を開く/閉じる頻度はどれくらいですか?