headers
を取得することで、fast-csvでそれを行うことができます 解析された行を「オブジェクト」として返すスキーマ定義から。実際にいくつかの不一致があるので、修正でマークしました:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
スキーマが実際に提供されたCSVに一致している限り、問題はありません。これらは私が見ることができる修正ですが、実際のフィールド名を別の方法で配置する必要がある場合は、調整する必要があります。しかし、基本的にはNumber
がありました String
がある位置に 本質的には、CSVの空白のフィールドであると私が推測している追加のフィールドです。
一般的なことは、スキーマからフィールド名の配列を取得し、それをcsvパーサーインスタンスを作成するときにオプションに渡すことです。
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
実際にそれを行うと、配列の代わりに「オブジェクト」が返されます:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
Mongooseはスキーマに従って値をキャストするため、「タイプ」について心配する必要はありません。
残りはdata
のハンドラー内で発生します イベント。効率を最大化するために、insertMany()
を使用しています。 10,000行ごとに1回だけデータベースに書き込みます。それが実際にサーバーとプロセスにどのように送られるかはMongoDBのバージョンによって異なりますが、メモリ使用量と書き込みの「トレードオフ」の観点から、単一のコレクションにインポートするフィールドの平均数に基づいて、10,000はかなり合理的です。合理的なネットワーク要求。必要に応じて数値を小さくしてください。
重要な部分は、これらの呼び出しをasync
としてマークすることです。 関数とawait
insertMany()
の結果 続行する前に。また、pause()
する必要があります ストリームとresume()
そうしないと、各アイテムでbuffer
を上書きするリスクがあります。 実際に送信される前に挿入するドキュメントの数。 pause()
およびresume()
パイプに「背圧」をかけるために必要です。そうしないと、アイテムは「出てきて」data
を起動し続けます。 イベント。
当然、10,000エントリの制御では、バッファを空にして残りのドキュメントをサーバーに送信するために、各反復とストリーム完了の両方でチェックする必要があります。
data
の「すべての」反復の両方でサーバーへの非同期要求を実行したくないので、これは本当にやりたいことです。 イベントまたは基本的に各リクエストの完了を待たずに。 「非常に小さいファイル」についてはチェックしないで済みますが、実際の負荷では、まだ完了していない「飛行中」の非同期呼び出しのために、コールスタックを超えることが確実です。
参考までに-package.json
使用済み。 mz
近代化されたPromise
であるため、オプションです。 私が単に使用することに慣れている標準ノードの「組み込み」ライブラリの有効なライブラリ。もちろん、コードはfs
と完全に互換性があります。 モジュール。
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
実際、Node v8.9.x以降では、AsyncIterator
を実装することで、これをはるかに簡単にすることもできます。 stream-to-iterator
を介して モジュール。まだIterator<Promise<T>>
にあります モードですが、ノードv10.xが安定したLTSになるまで実行する必要があります:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
基本的に、ストリームの「イベント」の処理と一時停止および再開はすべて、単純なfor
に置き換えられます。 ループ:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
簡単!これは、後のノード実装でfor..await..of
を使用してクリーンアップされます。 それがより安定したとき。ただし、上記は指定されたバージョン以降で正常に実行されます。