問題はコードにあります。読み込もうとしているテーブルを上書きするため、Sparkが実際にアクセスする前に、すべてのデータを効果的に消去します。
Sparkは怠惰であることを忘れないでください。 Dataset
を作成するとき Sparkは必要なメタデータをフェッチしますが、データをロードしません。したがって、元のコンテンツを保持するマジックキャッシュはありません。データは実際に必要なときにロードされます。これがwrite
を実行するときです アクションを実行し、書き込みを開始すると、フェッチするデータはなくなります。
必要なものは次のようなものです:
Dataset
を作成します 。-
必要な変換を適用し、中間のMySQLテーブルにデータを書き込みます。
-
TRUNCATE
元の入力とINSERT INTO ... SELECT
中間テーブルまたはDROP
から 元のテーブルとRENAME
中間テーブル。
代替の、しかしあまり好ましくないアプローチは、次のようになります:
Dataset
を作成します 。- 必要な変換を適用し、永続的なSparkテーブルにデータを書き込みます(
df.write.saveAsTable(...)
または同等のもの) -
TRUNCATE
元の入力。 - データを読み戻して保存します(
spark.table(...).write.jdbc(...)
) - Sparkテーブルを削除します。
Sparkのcache
を使用することを強調することはできません。 /persist
行く方法ではありません。保守的なStorageLevel
でも (MEMORY_AND_DISK_2
/ MEMORY_AND_DISK_SER_2
)キャッシュされたデータが失われ(ノード障害)、サイレントな正しさのエラーが発生する可能性があります。