私はmongodbの専門家ではありませんが、これまでに見た例に基づいて、これは私が試すパターンです。
データ以外のイベントの抑制が主な関心事であると思われるため、データ以外のイベントは省略しました。
var cursor = db.collection('mycollection').find({});
const cursorNext = new Rx.BehaviourSubject('next'); // signal first batch then wait
const nextBatch = () => {
if(cursor.hasNext()) {
cursorNext.next('next');
}
});
cursorNext
.switchMap(() => // wait for cursorNext to signal
Rx.Observable.fromPromise(cursor.next()) // get a single doc
.repeat() // get another
.takeWhile(() => cursor.hasNext() ) // stop taking if out of data
.take(batchSize) // until full batch
.toArray() // combine into a single emit
)
.map(docsBatch => {
// do something with the batch
// return docsBatch or modified doscBatch
})
... // other operators?
.subscribe(x => {
...
nextBatch();
});
私はmongodbなしでこのRxフローのテストをまとめようとしていますが、それまでの間、これはあなたにいくつかのアイデアを与えるかもしれません。