sql >> データベース >  >> NoSQL >> MongoDB

マングーススキーマを使用してCSVをインポートする

    headersを取得することで、fast-csvでそれを行うことができます 解析された行を「オブジェクト」として返すスキーマ定義から。実際にいくつかの不一致があるので、修正でマークしました:

    const fs = require('mz/fs');
    const csv = require('fast-csv');
    
    const { Schema } = mongoose = require('mongoose');
    
    const uri = 'mongodb://localhost/test';
    
    mongoose.Promise = global.Promise;
    mongoose.set('debug', true);
    
    const rankSchema = new Schema({
      serverid: Number,
      resetid: Number,
      rank: Number,
      name: String,
      land: String,         // <-- You have this as Number but it's a string
      networth: Number,
      tag: String,
      stuff: String,        // the empty field in the csv
      gov: String,
      gdi: Number,
      protection: Number,
      vacation: Number,
      alive: Number,
      deleted: Number
    });
    
    const Rank = mongoose.model('Rank', rankSchema);
    
    const log = data => console.log(JSON.stringify(data, undefined, 2));
    
    (async function() {
    
      try {
        const conn = await mongoose.connect(uri);
    
        await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
    
        let headers = Object.keys(Rank.schema.paths)
          .filter(k => ['_id','__v'].indexOf(k) === -1);
    
        console.log(headers);
    
        await new Promise((resolve,reject) => {
    
          let buffer = [],
              counter = 0;
    
          let stream = fs.createReadStream('input.csv')
            .pipe(csv({ headers }))
            .on("error", reject)
            .on("data", async doc => {
              stream.pause();
              buffer.push(doc);
              counter++;
              log(doc);
              try {
                if ( counter > 10000 ) {
                  await Rank.insertMany(buffer);
                  buffer = [];
                  counter = 0;
                }
              } catch(e) {
                stream.destroy(e);
              }
    
              stream.resume();
    
            })
            .on("end", async () => {
              try {
                if ( counter > 0 ) {
                  await Rank.insertMany(buffer);
                  buffer = [];
                  counter = 0;
                  resolve();
                }
              } catch(e) {
                stream.destroy(e);
              }
            });
    
        });
    
    
      } catch(e) {
        console.error(e)
      } finally {
        process.exit()
      }
    
    
    })()
    

    スキーマが実際に提供されたCSVに一致している限り、問題はありません。これらは私が見ることができる修正ですが、実際のフィールド名を別の方法で配置する必要がある場合は、調整する必要があります。しかし、基本的にはNumberがありました Stringがある位置に 本質的には、CSVの空白のフィールドであると私が推測している追加のフィールドです。

    一般的なことは、スキーマからフィールド名の配列を取得し、それをcsvパーサーインスタンスを作成するときにオプションに渡すことです。

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);
    
    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }))
    

    実際にそれを行うと、配列の代わりに「オブジェクト」が返されます:

    {
      "serverid": "9",
      "resetid": "1557",
      "rank": "358",
      "name": "286",
      "land": "Mutantville",
      "networth": "4368",
      "tag": "2358026",
      "stuff": "",
      "gov": "M",
      "gdi": "0",
      "protection": "0",
      "vacation": "0",
      "alive": "1",
      "deleted": "0"
    }
    

    Mongooseはスキーマに従って値をキャストするため、「タイプ」について心配する必要はありません。

    残りはdataのハンドラー内で発生します イベント。効率を最大化するために、insertMany()を使用しています。 10,000行ごとに1回だけデータベースに書き込みます。それが実際にサーバーとプロセスにどのように送られるかはMongoDBのバージョンによって異なりますが、メモリ使用量と書き込みの「トレードオフ」の観点から、単一のコレクションにインポートするフィールドの平均数に基づいて、10,000はかなり合理的です。合理的なネットワーク要求。必要に応じて数値を小さくしてください。

    重要な部分は、これらの呼び出しをasyncとしてマークすることです。 関数とawait insertMany()の結果 続行する前に。また、pause()する必要があります ストリームとresume() そうしないと、各アイテムでbufferを上書きするリスクがあります。 実際に送信される前に挿入するドキュメントの数。 pause() およびresume() パイプに「背圧」をかけるために必要です。そうしないと、アイテムは「出てきて」dataを起動し続けます。 イベント。

    当然、10,000エントリの制御では、バッファを空にして残りのドキュメントをサーバーに送信するために、各反復とストリーム完了の両方でチェックする必要があります。

    dataの「すべての」反復の両方でサーバーへの非同期要求を実行したくないので、これは本当にやりたいことです。 イベントまたは基本的に各リクエストの完了を待たずに。 「非常に小さいファイル」についてはチェックしないで済みますが、実際の負荷では、まだ完了していない「飛行中」の非同期呼び出しのために、コールスタックを超えることが確実です。

    参考までに-package.json 使用済み。 mz 近代化されたPromiseであるため、オプションです。 私が単に使用することに慣れている標準ノードの「組み込み」ライブラリの有効なライブラリ。もちろん、コードはfsと完全に互換性があります。 モジュール。

    {
      "description": "",
      "main": "index.js",
      "dependencies": {
        "fast-csv": "^2.4.1",
        "mongoose": "^5.1.1",
        "mz": "^2.7.0"
      },
      "keywords": [],
      "author": "",
      "license": "ISC"
    }
    

    実際、Node v8.9.x以降では、AsyncIteratorを実装することで、これをはるかに簡単にすることもできます。 stream-to-iteratorを介して モジュール。まだIterator<Promise<T>>にあります モードですが、ノードv10.xが安定したLTSになるまで実行する必要があります:

    const fs = require('mz/fs');
    const csv = require('fast-csv');
    const streamToIterator = require('stream-to-iterator');
    
    const { Schema } = mongoose = require('mongoose');
    
    const uri = 'mongodb://localhost/test';
    
    mongoose.Promise = global.Promise;
    mongoose.set('debug', true);
    
    const rankSchema = new Schema({
      serverid: Number,
      resetid: Number,
      rank: Number,
      name: String,
      land: String,
      networth: Number,
      tag: String,
      stuff: String,        // the empty field
      gov: String,
      gdi: Number,
      protection: Number,
      vacation: Number,
      alive: Number,
      deleted: Number
    });
    
    const Rank = mongoose.model('Rank', rankSchema);
    
    const log = data => console.log(JSON.stringify(data, undefined, 2));
    
    (async function() {
    
      try {
        const conn = await mongoose.connect(uri);
    
        await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
    
        let headers = Object.keys(Rank.schema.paths)
          .filter(k => ['_id','__v'].indexOf(k) === -1);
    
        //console.log(headers);
    
        let stream = fs.createReadStream('input.csv')
          .pipe(csv({ headers }));
    
        const iterator = await streamToIterator(stream).init();
    
        let buffer = [],
            counter = 0;
    
        for ( let docPromise of iterator ) {
          let doc = await docPromise;
          buffer.push(doc);
          counter++;
    
          if ( counter > 10000 ) {
            await Rank.insertMany(buffer);
            buffer = [];
            counter = 0;
          }
        }
    
        if ( counter > 0 ) {
          await Rank.insertMany(buffer);
          buffer = [];
          counter = 0;
        }
    
      } catch(e) {
        console.error(e)
      } finally {
        process.exit()
      }
    
    })()
    

    基本的に、ストリームの「イベント」の処理と一時停止および再開はすべて、単純なforに置き換えられます。 ループ:

    const iterator = await streamToIterator(stream).init();
    
    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      // ... The things in the loop
    }
    

    簡単!これは、後のノード実装でfor..await..ofを使用してクリーンアップされます。 それがより安定したとき。ただし、上記は指定されたバージョン以降で正常に実行されます。



    1. Javaでシリアル化されたRedisキーを修正する方法

    2. 16mbサイズを超えるドキュメントのMongoDB回避策?

    3. Pythonでセロリタスクバックエンドを設定する際の問題

    4. MurmurHash-それは何ですか?