スカラ :
必要なのが一意の番号だけの場合は、zipWithUniqueId
を使用できます DataFrameを再作成します。最初にいくつかのインポートとダミーデータ:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
さらに使用するためにスキーマを抽出します:
val schema = df.schema
IDフィールドの追加:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
DataFrameの作成:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
Pythonでも同じです :
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
連続番号が必要な場合は、zipWithUniqueId
を置き換えることができます zipWithIndex
を使用 しかし、それは少し高価です。
DataFrame
を使用して直接 API :
(ほぼ同じ構文のユニバーサルScala、Python、Java、R)
以前、monotonicallyIncreasingId
を見逃していました 連続した番号を必要としない限り、問題なく機能する関数:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
monotonicallyIncreasingId
は便利ですが 非決定論的です。 IDは実行ごとに異なる可能性があるだけでなく、後続の操作にフィルターが含まれている場合、追加のトリックなしでは行を識別するために使用できません。
注 :
rowNumber
を使用することもできます ウィンドウ関数:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
残念ながら:
警告ウィンドウ:ウィンドウ操作用にパーティションが定義されていません!すべてのデータを単一のパーティションに移動すると、パフォーマンスが大幅に低下する可能性があります。
したがって、データを分割して一意性を確保する自然な方法がない限り、現時点では特に有用ではありません。