行を別のRDD要素にマップする場合は、df.map(row => ...)を使用してデータフレームをRDDに変換できます。
例:
val df = Seq(("table1",432),
("table2",567),
("table3",987),
("table1",789)).
toDF("tablename", "Code").toDF()
df.show()
+---------+----+
|tablename|Code|
+---------+----+
| table1| 432|
| table2| 567|
| table3| 987|
| table1| 789|
+---------+----+
val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]
OR
val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd //Type: RDD[(String,String)]
AnalysisException:ストリーミングソースを使用したクエリは、writeStream.start()を使用して実行する必要があります。
queryを使用してクエリが終了するのを待つ必要があります。awaitTermination() クエリがアクティブなときにプロセスが終了しないようにするため。