MongoDBの集計操作を使用すると、データレコードを処理してグループ化し、計算結果を返すことができます。 MongoDBは、次の3種類の集計操作をサポートしています。
- 単一目的の集計コマンド
- Map-Reduce
- 集約パイプライン
このMongoDB比較ドキュメントを使用して、ニーズに合ったものを確認できます。
集約パイプライン
集約パイプラインは、データ処理パイプラインを介したデータ集約を提供するMongoDBフレームワークです。つまり、ドキュメントはマルチステップパイプラインを介して送信され、各ステップでドキュメントのフィルタリング、グループ化、その他の変換が行われます。 SQL「GROUPBY…」を提供します。データベース自体で実行されるMongoDBの構造のタイプ。集約ドキュメントは、そのようなパイプラインを作成するための便利な例を提供します。
セカンダリで集計を実行する理由
アグリゲーションパイプラインはリソースを大量に消費する操作です。少し古くなったデータを操作しても問題がない場合は、MongoDBレプリカセットのセカンダリにアグリゲーションジョブをオフロードするのが理にかなっています。これは通常、「バッチ」操作に当てはまります。これは、最新のデータで実行されることを想定していないためです。出力をコレクションに書き込む必要がある場合、MongoDBで書き込み可能なのはプライマリのみであるため、集計ジョブはプライマリでのみ実行されます。
この投稿では、mongoシェルとJavaの両方からセカンダリで集約パイプラインが確実に実行されるようにする方法を紹介します。
MongoDBでMongoShellとJavaからセカンダリでアグリゲーションパイプラインを実行するClickToTweet注:MongoDBが提供するサンプルデータセットを郵便番号集計の例で使用して、例を示します。例の指示に従ってダウンロードできます。
レプリカセットの集約パイプライン
MongoDBシェル
読み取りプリファレンスをセカンダリに設定します mongoシェルから集計ジョブを実行するときにトリックを実行します。人口が1,000万人を超えるすべての州を取得してみましょう(郵便番号の例の最初の集計)。シェルとサーバーの両方でMongoDBバージョン3.2.10が実行されています。
mongo -u admin -p <pwd> --authenticationDatabase admin --host RS-repl0-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 RS-repl0-0:PRIMARY> use test switched to db test RS-repl0-0:PRIMARY> db.setSlaveOk() // Ok to run commands on a slave RS-repl0-0:PRIMARY> db.getMongo().setReadPref('secondary') // Set read pref RS-repl0-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-repl0-0:PRIMARY> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 } { "_id" : "TX", "totalPop" : 16984601 }>
セカンダリでMongoDBログ(コマンドに対してロギングが有効になっている)を調べると、集計が実際にセカンダリで実行されたことがわかります。
... 2016-12-05T06:20:14.783+0000 I COMMAND [conn200] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, { $match: { totalPop: { $gte: 10000000.0 } } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:229 reslen:338 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquire Count: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_command 49ms ...
Java
MongoDB Javaドライバーから、読み取りプリファレンスを再度設定するとうまくいきます。ドライバーバージョン3.2.2を使用した例を次に示します。
public class AggregationChecker { /* * Data and code inspired from: * https://docs.mongodb.com/v3.2/tutorial/aggregation-zip-code-data-set/#return-states-with-populations-above-10-million */ private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-repl0-0"; private static final String COL_NAME = "zips"; private static final String DEF_DB = "test"; public AggregationChecker() { } public static void main(String[] args) { AggregationChecker writer = new AggregationChecker(); writer.aggregationJob(); } private void aggregationJob() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); try { final DB db = client.getDB(DEF_DB); final DBCollection coll = db.getCollection(COL_NAME); // Avg city pop by state: https://docs.mongodb.com/manual/tutorial/aggregation-zip-code-data-set/#return-average-city-population-by-state Iterable iterable = coll.aggregate( Arrays.asList( new BasicDBObject("$group", new BasicDBObject("_id", new BasicDBObject("state", "$state").append("city", "$city")).append("pop", new BasicDBObject("$sum", "$pop"))), new BasicDBObject("$group", new BasicDBObject("_id", "$_id.state").append("avgCityPop", new BasicDBObject("$avg", "$pop"))))).results(); for (DBObject entry : iterable) { printer(entry.toString()); } } finally { client.close(); } printer("Done..."); } ... }
セカンダリにログオンします:
... 2016-12-01T10:54:18.667+0000 I COMMAND [conn4113] command test.zips command: aggregate { aggregate: "zipcodes", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } } ] } keyUpdates:0 writeConflicts:0 numYields:229 reslen:2149 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquireCount: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_query 103ms ...
プライマリで操作は記録されませんでした。
シャードクラスター上の集約パイプライン
集約パイプラインは、シャーディングされたクラスターでサポートされます。詳細な動作は、ドキュメントで説明されています。実装に関しては、アグリゲーションパイプラインを使用する場合、レプリカセットとシャードクラスターの間にほとんど違いはありません。
MongoDBのシャードクラスターにアグリゲーションパイプラインを設定する方法クリックしてツイートMongoDBシェル
シャーディングされたクラスターにデータをインポートする前に、コレクションでシャーディングを有効にします。
mongos> sh.enableSharding("test") mongos> sh.shardCollection("test.zips", { "_id" : "hashed" } )
その後の操作はレプリカセットと同じです:
mongos> db.setSlaveOk() mongos> db.getMongo().setReadPref('secondary') mongos> db.getMongo().getReadPrefMode() secondary mongos> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "TX", "totalPop" : 16984601 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 }
セカンダリの1つからのログ:
... 2016-12-02T05:46:24.627+0000 I COMMAND [conn242] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44258973083 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:46:24.641+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44258973083 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:51 reslen:1601 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 13ms ...
Java
レプリカセットに適用できるものと同じコードが、シャードクラスターで正常に機能します。レプリカセットの接続文字列をシャードクラスターの接続文字列に置き換えるだけです。セカンダリからのログは、ジョブが実際にセカンダリで実行されたことを示しています:
... 2016-12-02T05:39:12.339+0000 I COMMAND [conn130] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44228970872 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:39:12.371+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44228970872 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:12902 reslen:741403 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 30ms ...
このコンテンツは役に立ちましたか? @scaledgridioにツイートしてお知らせください。いつものように、ご不明な点がございましたら、以下のコメントでお知らせください。ああ、そして!長期的なMongoDB®ホスティングコストを最大40%節約できるMongoDBホスティング製品をチェックすることを忘れないでください。