sql >> データベース >  >> RDS >> PostgreSQL

ApacheSparkの主キー

    スカラ

    必要なのが一意の番号だけの場合は、zipWithUniqueIdを使用できます DataFrameを再作成します。最初にいくつかのインポートとダミーデータ:

    import sqlContext.implicits._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructType, StructField, LongType}
    
    val df = sc.parallelize(Seq(
        ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
    

    さらに使用するためにスキーマを抽出します:

    val schema = df.schema
    

    IDフィールドの追加:

    val rows = df.rdd.zipWithUniqueId.map{
       case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
    

    DataFrameの作成:

    val dfWithPK = sqlContext.createDataFrame(
      rows, StructType(StructField("id", LongType, false) +: schema.fields))
    

    Pythonでも同じです :

    from pyspark.sql import Row
    from pyspark.sql.types import StructField, StructType, LongType
    
    row = Row("foo", "bar")
    row_with_index = Row(*["id"] + df.columns)
    
    df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
    
    def make_row(columns):
        def _make_row(row, uid):
            row_dict = row.asDict()
            return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
        return _make_row
    
    f = make_row(df.columns)
    
    df_with_pk = (df.rdd
        .zipWithUniqueId()
        .map(lambda x: f(*x))
        .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
    

    連続番号が必要な場合は、zipWithUniqueIdを置き換えることができます zipWithIndexを使用 しかし、それは少し高価です。

    DataFrameを使用して直接 API

    (ほぼ同じ構文のユニバーサルScala、Python、Java、R)

    以前、monotonicallyIncreasingIdを見逃していました 連続した番号を必要としない限り、問題なく機能する関数:

    import org.apache.spark.sql.functions.monotonicallyIncreasingId
    
    df.withColumn("id", monotonicallyIncreasingId).show()
    // +---+----+-----------+
    // |foo| bar|         id|
    // +---+----+-----------+
    // |  a|-1.0|17179869184|
    // |  b|-2.0|42949672960|
    // |  c|-3.0|60129542144|
    // +---+----+-----------+
    

    monotonicallyIncreasingIdは便利ですが 非決定論的です。 IDは実行ごとに異なる可能性があるだけでなく、後続の操作にフィルターが含まれている場合、追加のトリックなしでは行を識別するために使用できません。

    rowNumberを使用することもできます ウィンドウ関数:

    from pyspark.sql.window import Window
    from pyspark.sql.functions import rowNumber
    
    w = Window().orderBy()
    df.withColumn("id", rowNumber().over(w)).show()
    

    残念ながら:

    警告ウィンドウ:ウィンドウ操作用にパーティションが定義されていません!すべてのデータを単一のパーティションに移動すると、パフォーマンスが大幅に低下する可能性があります。

    したがって、データを分割して一意性を確保する自然な方法がない限り、現時点では特に有用ではありません。



    1. 'ANY'を含むPostgreSQLクエリが機能していません

    2. 知っておくべきSQLServerシノニムに関する7つの事実

    3. データベーススキーム、自動インクリメント

    4. SQLServerのサブクエリを使用してクエリを更新する