シリアルインサート
最も簡単な方法は、挿入 を実行することです。 <内code> Sink.foreach 。
スキーマコード生成 を使用したと仮定します さらに、テーブルの名前が「NumberTable」であると仮定します
//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow}
val numberTableDB = Database forConfig "NumberTableConfig"
挿入を行う関数を書くことができます
def insertIntoDb(num : Int) =
numberTableDB run (Numbertable += NumbertableRow(num))
そして、その関数はシンクに配置できます
val insertSink = Sink[Int] foreach insertIntoDb
Source(0 to 100) runWith insertSink
バッチインサート
一度にN個のインサートをバッチ処理することで、シンク手法をさらに拡張できます。
def batchInsertIntoDb(nums : Seq[Int]) =
numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
このバッチ処理されたシンクは、 Flow
によって供給できます。 バッチグループ化を行います:
val batchSize = 10
Source(0 to 100).via(Flow[Int].grouped(batchSize))
.runWith(batchInsertSink)