ストリーミングへようこそ。本当に必要なのは、入力を「一度に1チャンク」処理する「イベントストリーム」です。もちろん、理想的には、現在使用している「改行」文字などの一般的な区切り文字によって処理します。
本当に効率的なものについては、MongoDB "BulkAPI"> マシンのメモリやCPUサイクルをすべて消費することなく、ロードを可能な限り高速化するためのインサート。
利用可能なさまざまなソリューションがあるため、推奨していませんが、ここにline-を利用したリストがあります。 input-streamパッケージ 「ラインターミネータ」の部分をシンプルにするため。
「例」のみによるスキーマ定義:
var LineInputStream = require("line-input-stream"),
fs = require("fs"),
async = require("async"),
mongoose = require("mongoose"),
Schema = mongoose.Schema;
var entrySchema = new Schema({},{ strict: false })
var Entry = mongoose.model( "Schema", entrySchema );
var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));
stream.setDelimiter("\n");
mongoose.connection.on("open",function(err,conn) {
// lower level method, needs connection
var bulk = Entry.collection.initializeOrderedBulkOp();
var counter = 0;
stream.on("error",function(err) {
console.log(err); // or otherwise deal with it
});
stream.on("line",function(line) {
async.series(
[
function(callback) {
var row = line.split(","); // split the lines on delimiter
var obj = {};
// other manipulation
bulk.insert(obj); // Bulk is okay if you don't need schema
// defaults. Or can just set them.
counter++;
if ( counter % 1000 == 0 ) {
stream.pause();
bulk.execute(function(err,result) {
if (err) callback(err);
// possibly do something with result
bulk = Entry.collection.initializeOrderedBulkOp();
stream.resume();
callback();
});
} else {
callback();
}
}
],
function (err) {
// each iteration is done
}
);
});
stream.on("end",function() {
if ( counter % 1000 != 0 )
bulk.execute(function(err,result) {
if (err) throw err; // or something
// maybe look at result
});
});
});
したがって、一般的に、そこでの「ストリーム」インターフェースは、「一度に1行」を処理するために「入力を分解」します。これにより、すべてを一度に読み込むことができなくなります。
主な部分は、 "Bulk OperationsAPI" です。 MongoDBから。これにより、実際にサーバーに送信する前に、一度に多くの操作を「キューに入れる」ことができます。したがって、この場合、「モジュロ」を使用すると、書き込みは、処理された1000エントリごとにのみ送信されます。 16MBのBSON制限までは実際に何でもできますが、管理しやすくしてください。
一括処理される操作に加えて、async> 図書館。これは実際には必須ではありませんが、これにより、ドキュメントの「モジュロ制限」を超えないようにすることができます。一般的なバッチの「挿入」にはメモリ以外のIOコストはかかりませんが、「実行」呼び出しはIOが処理中であることを意味します。だから私たちはもっと多くのものを待ち行列に入れるのではなく待つのです。
これがそうであるように見える「ストリーム処理」CSVタイプのデータのためにあなたが見つけることができる確かにより良い解決策があります。しかし、一般的に、これにより、CPUサイクルも消費せずにメモリ効率の高い方法でこれを行う方法の概念が得られます。