RDDの計算は、クラスター全体に分散されます。 RDD内からRDD操作クロージャの外部で作成された変数を更新することはできません。これらは基本的に2つの異なる場所にあります。変数はSparkドライバーで作成され、ワーカーでアクセスされるため、読み取り専用として扱う必要があります。
Sparkは、この場合に使用できる分散型カミュレーターをサポートしています。 Spark Cummulators
別のオプション(私が好むもの)は、RDDのストリームを目的のデータ形式に変換し、foreachRDD
を使用することです。 それをセカンダリストレージに永続化する方法。これは、問題に取り組むためのより機能的な方法です。おおまかに次のようになります:
val filteredStream = twitterStream.filter(entry => filters.exists(term => entry.getText.getStatus.contains(term)))
val filteredStreamWithTs = filteredStream.map(x => ((DateTime.now.toString(), x)))
filteredStreamWithTs.foreachRdd(rdd => // write to Mongo)