これはまだ公開されていませんが、Alpakkaのマスターブランチでは、 MongoSource.apply
タイプパラメータを取ります:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
したがって、Alpakkaの今後の0.18リリースでは、次のことができるようになります。
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
sourceに注意してください ここでは、todoCollection.find()を想定しています Observable[TodoMongo]を返します;必要に応じてタイプを調整します。
それまでの間、上記のコードを手動で追加するだけで済みます。例:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
MyMongoSourceに注意してください akka.stream.alpakka.mongodb.scaladslに存在するように定義されています パッケージ(MongoSourceなど )、 ObservableToPublisher
パッケージプライベートクラスです。 MyMongoSourceを使用します MongoSourceを使用するのと同じ方法で :
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())