これはまだ公開されていませんが、Alpakkaのマスターブランチでは、 MongoSource.apply
タイプパラメータを取ります:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
したがって、Alpakkaの今後の0.18リリースでは、次のことができるようになります。
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
source
に注意してください ここでは、todoCollection.find()
を想定しています Observable[TodoMongo]
を返します;必要に応じてタイプを調整します。
それまでの間、上記のコードを手動で追加するだけで済みます。例:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
MyMongoSource
に注意してください akka.stream.alpakka.mongodb.scaladsl
に存在するように定義されています パッケージ(MongoSource
など )、 ObservableToPublisher
パッケージプライベートクラスです。 MyMongoSource
を使用します MongoSource
を使用するのと同じ方法で :
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())