エラーによると、すでに文字列があります(df.selectExpr("CAST(value AS STRING)")
)、したがって、RowイベントをString
として取得してみてください。 、Array[Byte]
ではありません
変更することから始めます
val valueStr = new String(record.getAs[Array[Byte]]("value"))
val valueStr = record.getAs[String]("value")
Sparkコードを実行するクラスターがすでにある可能性があることは理解していますが、それでもを調べることをお勧めします。 KafkaConnectMongoシンクコネクタ そのため、Sparkコードで独自のMongoライターを記述して維持する必要はありません。
または、Sparkデータセットをmongoに直接書き込むこともできます