Map-reduceは、MongoDBがサポートする集計操作の中でおそらく最も用途が広いものです。
Map-Reduceは、大量のデータを並行して処理および集約するためにGoogleで開発された人気のあるプログラミングモデルです。 Map-Reduceの詳細な説明はこの記事の範囲外ですが、基本的には複数ステップの集計プロセスです。最も重要な2つのステップは、マップステージ(各ドキュメントを処理して結果を出力する)とリデュースステージ(マップステージ中に出力された結果を照合する)です。
MongoDBは、Map-Reduce、集計パイプライン、単一目的の集計コマンドの3種類の集計操作をサポートしています。このMongoDB比較ドキュメントを使用して、ニーズに合ったものを確認できます。https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/
前回の投稿では、例を挙げて、セカンダリでアグリゲーションパイプラインを実行する方法を説明しました。この投稿では、MongoDBセカンダリレプリカでのMap-Reduceジョブの実行について説明します。
MongoDBMap-Reduce
MongoDBは、データベースサーバーでのMap-Reduceジョブの実行をサポートしています。これにより、集約パイプラインを介して簡単に実行できない複雑な集約タスクを柔軟に作成できます。 MongoDBを使用すると、カスタムマップを記述し、Mongoシェルまたはその他のクライアントを介してデータベースに渡すことができるJavascriptの関数を減らすことができます。大規模で絶えず増大するデータセットでは、増分Map-Reduceジョブを実行して、毎回古いデータを処理しないようにすることも検討できます。
歴史的に、mapとreduceメソッドはシングルスレッドコンテキストで実行されていました。ただし、その制限はバージョン2.4で削除されました。
セカンダリでMap-Reduceジョブを実行する理由
他の集計ジョブと同様に、Map-Reduceもリソースを大量に消費する「バッチ」ジョブであるため、読み取り専用レプリカでの実行に適しています。その際の注意点は次のとおりです。
1)少し古いデータを使用しても問題ありません。または、書き込みの問題を微調整して、レプリカが常にプライマリと同期するようにすることもできます。この2番目のオプションは、書き込みパフォーマンスに影響を与えることが許容できることを前提としています。
2)Map-Reduceジョブの出力は、データベース内の別のコレクションに書き込まれるのではなく、アプリケーションに返される必要があります(つまり、データベースへの書き込みはありません)。
mongoシェルとJavaドライバーの両方から、例を使用してこれを行う方法を見てみましょう。
レプリカセットでのMap-Reduce
データセット
説明のために、かなり単純なデータセットを使用します。小売業者からの毎日のトランザクションレコードダンプです。サンプルエントリは次のようになります:
RS-replica-0:PRIMARY> use test switched to db test RS-replica-0:PRIMARY> show tables txns RS-replica-0:PRIMARY> db.txns.findOne() { "_id" : ObjectId("584a3b71cdc1cb061957289b"), "custid" : "cust_66", "txnval" : 100, "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...], ... }
この例では、特定の顧客のその日の総支出を計算します。したがって、スキーマを考えると、mapメソッドとreduceメソッドは次のようになります。
var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid
スキーマが確立されたら、Map-Reduceの動作を見てみましょう。
MongoDBシェル
Map-Reduceジョブがセカンダリで確実に実行されるようにするには、読み取りプリファレンスをセカンダリに設定する必要があります。 。上で述べたように、Map-Reduceをセカンダリで実行するには、結果の出力がインラインである必要があります。 (実際、セカンダリで許可される唯一のアウトバリューです)。それがどのように機能するか見てみましょう。
$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 MongoDB shell version: 3.2.10 connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test 2016-12-09T08:15:19.347+0000 I NETWORK [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017 2016-12-09T08:15:19.349+0000 I NETWORK [ReplicaSetMonitorWatcher] starting RS-replica-0:PRIMARY> db.setSlaveOk() RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary') RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); } RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); } RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }}) { "results" : [ { "_id" : "cust_0", "value" : 72734 }, { "_id" : "cust_1", "value" : 67737 }, ... ] "timeMillis" : 215, "counts" : { "input" : 10000, "emit" : 10000, "reduce" : 909, "output" : 101 }, "ok" : 1 }
セカンダリのログを確認すると、ジョブが実際にセカンダリで実行されたことが確認されます。
... 2016-12-09T08:17:24.842+0000 D COMMAND [conn344] mr ns: test.txns 2016-12-09T08:17:24.843+0000 I COMMAND [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:17:24.865+0000 I COMMAND [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:17:25.063+0000 I COMMAND [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms ...
Java
それでは、Javaアプリケーションから読み取ったレプリカに対してMap-Reduceジョブを実行してみましょう。 MongoDB Javaドライバーでは、読み取りプリファレンスを設定するとうまくいきます。出力はデフォルトでインラインであるため、追加のパラメーターを渡す必要はありません。ドライバーバージョン3.2.2を使用した例を次に示します。
public class MapReduceExample { private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0"; private static final String COL_NAME = "txns"; private static final String DEF_DB = "test"; public MapReduceExample() { } public static void main(String[] args) { MapReduceExample writer = new MapReduceExample(); writer.mapReduce(); } public static final String mapfunction = "function() { emit(this.custid, this.txnval); }"; public static final String reducefunction = "function(key, values) { return Array.sum(values); }"; private void mapReduce() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); MongoDatabase database = client.getDatabase(DEF_DB); MongoCollection collection = database.getCollection(COL_NAME); MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default MongoCursor cursor = iterable.iterator(); while (cursor.hasNext()) { Document result = cursor.next(); printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value")); } printer("Done..."); } ... }
ログから明らかなように、ジョブはセカンダリで実行されました:
... 2016-12-09T08:32:31.419+0000 D COMMAND [conn371] mr ns: test.txns 2016-12-09T08:32:31.420+0000 I COMMAND [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:32:31.444+0000 I COMMAND [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:32:31.890+0000 I COMMAND [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms ...
MongoDBMap-シャードクラスターでReduce
MongoDBは、シャーディングされたコレクションがMap-Reduceジョブの入力である場合と出力である場合の両方で、シャーディングされたクラスターでMap-Reduceをサポートします。 ただし、MongoDBは現在、シャーディングされたクラスターのセカンダリでのmap-reduceジョブの実行をサポートしていません。 したがって、outオプションであっても インラインに設定されています 、Map-Reduceジョブは、常にシャーディングされたクラスターのプライマリで実行されます。この問題は、このJIRAバグを通じて追跡されています。
シャードクラスターでMap-Reduceジョブを実行する構文は、レプリカセットの構文と同じです。したがって、上記のセクションで提供された例が当てはまります。上記のJavaの例をシャーディングされたクラスターで実行すると、コマンドがそこで実行されたことを示すログメッセージがプライマリに表示されます。
... 2016-11-24T08:46:30.828+0000 I COMMAND [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing: { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu ireCount: { r: 1 } } } protocol:op_command 115ms 2016-11-24T08:46:30.830+0000 I COMMAND [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0 ...
豊富な機能リストについては、MongoDB製品ページにアクセスしてください。