sql >> データベース >  >> RDS >> PostgreSQL

データフレーム/CSVからPostgreSQLデータベースへのバルクデータの挿入または更新

    この特定のケースでは、 copy_expert() 。これは、raw_connection() 。ソースデータがCSVファイルの場合、この場合はパンダはまったく必要ありません。一時ステージングテーブルを作成することから始め、データを一時テーブルにコピーし、競合処理を使用して宛先テーブルに挿入します。

    conn = engine.raw_connection()
    
    try:
        with conn.cursor() as cur:
            cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
                           ON COMMIT DROP""")
    
            with open("your_source.csv") as data:
                cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
                                   FROM STDIN WITH CSV""", data)
    
            cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
                           SELECT itemid, title, street, pincode
                           FROM TEST_STAGING
                           ON CONFLICT ( itemid )
                           DO UPDATE SET title = EXCLUDED.title
                                       , street = EXCLUDED.street
                                       , pincode = EXCLUDED.pincode""")
    
    except:
        conn.rollback()
        raise
    
    else:
        conn.commit()
    
    finally:
        conn.close()
    

    一方、ソースデータがDataFrameの場合 、引き続きCOPYを使用できます 関数をmethod= to to_sql() 。この関数は、上記のすべてのロジックを非表示にすることもできます。

    import csv
    
    from io import StringIO
    from psycopg2 import sql
    
    def psql_upsert_copy(table, conn, keys, data_iter):
        dbapi_conn = conn.connection
    
        buf = StringIO()
        writer = csv.writer(buf)
        writer.writerows(data_iter)
        buf.seek(0)
    
        if table.schema:
            table_name = sql.SQL("{}.{}").format(
                sql.Identifier(table.schema), sql.Identifier(table.name))
        else:
            table_name = sql.Identifier(table.name)
    
        tmp_table_name = sql.Identifier(table.name + "_staging")
        columns = sql.SQL(", ").join(map(sql.Identifier, keys))
    
        with dbapi_conn.cursor() as cur:
            # Create the staging table
            stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
            stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
            cur.execute(stmt)
    
            # Populate the staging table
            stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
            stmt = sql.SQL(stmt).format(tmp_table_name, columns)
            cur.copy_expert(stmt, buf)
    
            # Upsert from the staging table to the destination. First find
            # out what the primary key columns are.
            stmt = """
                   SELECT kcu.column_name
                   FROM information_schema.table_constraints tco
                   JOIN information_schema.key_column_usage kcu 
                   ON kcu.constraint_name = tco.constraint_name
                   AND kcu.constraint_schema = tco.constraint_schema
                   WHERE tco.constraint_type = 'PRIMARY KEY'
                   AND tco.table_name = %s
                   """
            args = (table.name,)
    
            if table.schema:
                stmt += "AND tco.table_schema = %s"
                args += (table.schema,)
    
            cur.execute(stmt, args)
            pk_columns = {row[0] for row in cur.fetchall()}
            # Separate "data" columns from (primary) key columns
            data_columns = [k for k in keys if k not in pk_columns]
            # Build conflict_target
            pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))
    
            set_ = sql.SQL(", ").join([
                sql.SQL("{} = EXCLUDED.{}").format(k, k)
                for k in map(sql.Identifier, data_columns)])
    
            stmt = """
                   INSERT INTO {} ( {} )
                   SELECT {}
                   FROM {}
                   ON CONFLICT ( {} )
                   DO UPDATE SET {}
                   """
    
            stmt = sql.SQL(stmt).format(
                table_name, columns, columns, tmp_table_name, pk_columns, set_)
            cur.execute(stmt)
    

    次に、新しいDataFrameを挿入します 使用

    df.to_sql("test_table", engine,
              method=psql_upsert_copy,
              index=False,
              if_exists="append")
    

    この方法を使用すると、ローカルデータベースを使用するこのマシンで最大1,000,000行をアップサーティングするのに約16秒かかりました。




    1. PHP / MySQL:過去*全*週のエントリを取得しています

    2. 絵文字(UTF8 4バイト文字)をMySQL<5.5に挿入する方法

    3. Oracleクエリの実行時間

    4. MariaDB JSON_LENGTH()の説明