Sparkでは、RDD
の関数 s(map
など ここで)シリアル化され、処理のためにエグゼキュータに送信されます。これは、これらの操作に含まれるすべての要素がシリアル化可能であることを意味します。
ここでのRedis接続は、作成されたマシンにバインドされているターゲットDBへのTCP接続を開くため、シリアル化できません。
解決策は、ローカル実行コンテキストでエグゼキュータにこれらの接続を作成することです。それを行う方法はいくつかあります。頭に浮かぶ2つは次のとおりです。
-
rdd.mapPartitions
:パーティション全体を一度に処理できるため、接続の作成コストを償却できます) - シングルトン接続マネージャー:エグゼキューターごとに1回接続を作成します
mapPartitions
必要なのはプログラム構造の小さな変更だけなので、より簡単です:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
シングルトン接続マネージャーは、接続への遅延参照を保持するオブジェクトを使用してモデル化できます(注:可変参照も機能します)。
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
このオブジェクトは、ワーカーJVMごとに1つの接続をインスタンス化するために使用でき、Serializable
として使用されます。 操作クロージャ内のオブジェクト。
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
シングルトンオブジェクトを使用する利点は、接続がJVMによって1回だけ作成されるため(RDDパーティションごとに1つではなく)、オーバーヘッドが少なくなることです。
いくつかの欠点もあります:
- 接続のクリーンアップには注意が必要です(シャットダウンフック/タイマー)
- 共有リソースのスレッドセーフを確保する必要があります
(*)説明のために提供されたコード。コンパイルもテストもされていません。