sql >> データベース >  >> RDS >> PostgreSQL

psycopg2でバイナリCOPYテーブルFROMを使用します

    これは、Python3のCOPYFROMに相当するバイナリです:

    from io import BytesIO
    from struct import pack
    import psycopg2
    
    # Two rows of data; "id" is not in the upstream data source
    # Columns: node, ts, val1, val2
    data = [(23253, 342, -15.336734, 2494627.949375),
            (23256, 348, 43.23524, 2494827.949375)]
    
    conn = psycopg2.connect("dbname=mydb user=postgres")
    curs = conn.cursor()
    
    # Determine starting value for sequence
    curs.execute("SELECT nextval('num_data_id_seq')")
    id_seq = curs.fetchone()[0]
    
    # Make a binary file object for COPY FROM
    cpy = BytesIO()
    # 11-byte signature, no flags, no header extension
    cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))
    
    # Columns: id, node, ts, val1, val2
    # Zip: (column position, format, size)
    row_format = list(zip(range(-1, 4),
                          ('i', 'i', 'h', 'f', 'd'),
                          ( 4,   4,   2,   4,   8 )))
    for row in data:
        # Number of columns/fields (always 5)
        cpy.write(pack('!h', 5))
        for col, fmt, size in row_format:
            value = (id_seq if col == -1 else row[col])
            cpy.write(pack('!i' + fmt, size, value))
        id_seq += 1  # manually increment sequence outside of database
    
    # File trailer
    cpy.write(pack('!h', -1))
    
    # Copy data to database
    cpy.seek(0)
    curs.copy_expert("COPY num_data FROM STDIN WITH BINARY", cpy)
    
    # Update sequence on database
    curs.execute("SELECT setval('num_data_id_seq', %s, false)", (id_seq,))
    conn.commit()
    

    更新

    COPY用のファイルを書くための上記のアプローチを書き直しました。 Pythonの私のデータはNumPy配列にあるので、これらを使用するのは理にかなっています。 dataの例を次に示します。 100万行、7列の場合:

    import psycopg2
    import numpy as np
    from struct import pack
    from io import BytesIO
    from datetime import datetime
    
    conn = psycopg2.connect("dbname=mydb user=postgres")
    curs = conn.cursor()
    
    # NumPy record array
    shape = (7, 2000, 500)
    print('Generating data with %i rows, %i columns' % (shape[1]*shape[2], shape[0]))
    
    dtype = ([('id', 'i4'), ('node', 'i4'), ('ts', 'i2')] +
             [('s' + str(x), 'f4') for x in range(shape[0])])
    data = np.empty(shape[1]*shape[2], dtype)
    data['id'] = np.arange(shape[1]*shape[2]) + 1
    data['node'] = np.tile(np.arange(shape[1]) + 1, shape[2])
    data['ts'] = np.repeat(np.arange(shape[2]) + 1, shape[1])
    data['s0'] = np.random.rand(shape[1]*shape[2]) * 100
    prv = 's0'
    for nxt in data.dtype.names[4:]:
        data[nxt] = data[prv] + np.random.rand(shape[1]*shape[2]) * 10
        prv = nxt
    

    私のデータベースには、次のような2つのテーブルがあります。

    CREATE TABLE num_data_binary
    (
      id integer PRIMARY KEY,
      node integer NOT NULL,
      ts smallint NOT NULL,
      s0 real,
      s1 real,
      s2 real,
      s3 real,
      s4 real,
      s5 real,
      s6 real
    ) WITH (OIDS=FALSE);
    

    num_data_textという名前の別の同様のテーブル 。

    NumPyレコード配列の情報を使用してCOPY(テキスト形式とバイナリ形式の両方)用のデータを準備するための簡単なヘルパー関数を次に示します。

    def prepare_text(dat):
        cpy = BytesIO()
        for row in dat:
            cpy.write('\t'.join([repr(x) for x in row]) + '\n')
        return(cpy)
    
    def prepare_binary(dat):
        pgcopy_dtype = [('num_fields','>i2')]
        for field, dtype in dat.dtype.descr:
            pgcopy_dtype += [(field + '_length', '>i4'),
                             (field, dtype.replace('<', '>'))]
        pgcopy = np.empty(dat.shape, pgcopy_dtype)
        pgcopy['num_fields'] = len(dat.dtype)
        for i in range(len(dat.dtype)):
            field = dat.dtype.names[i]
            pgcopy[field + '_length'] = dat.dtype[i].alignment
            pgcopy[field] = dat[field]
        cpy = BytesIO()
        cpy.write(pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))
        cpy.write(pgcopy.tostring())  # all rows
        cpy.write(pack('!h', -1))  # file trailer
        return(cpy)
    

    これは、ヘルパー関数を使用して2つのCOPY形式の方法をベンチマークする方法です。

    def time_pgcopy(dat, table, binary):
        print('Processing copy object for ' + table)
        tstart = datetime.now()
        if binary:
            cpy = prepare_binary(dat)
        else:  # text
            cpy = prepare_text(dat)
        tendw = datetime.now()
        print('Copy object prepared in ' + str(tendw - tstart) + '; ' +
              str(cpy.tell()) + ' bytes; transfering to database')
        cpy.seek(0)
        if binary:
            curs.copy_expert('COPY ' + table + ' FROM STDIN WITH BINARY', cpy)
        else:  # text
            curs.copy_from(cpy, table)
        conn.commit()
        tend = datetime.now()
        print('Database copy time: ' + str(tend - tendw))
        print('        Total time: ' + str(tend - tstart))
        return
    
    time_pgcopy(data, 'num_data_text', binary=False)
    time_pgcopy(data, 'num_data_binary', binary=True)
    

    これが最後の2つのtime_pgcopyからの出力です コマンド:

    Processing copy object for num_data_text
    Copy object prepared in 0:01:15.288695; 84355016 bytes; transfering to database
    Database copy time: 0:00:37.929166
            Total time: 0:01:53.217861
    Processing copy object for num_data_binary
    Copy object prepared in 0:00:01.296143; 80000021 bytes; transfering to database
    Database copy time: 0:00:23.325952
            Total time: 0:00:24.622095
    

    したがって、NumPy→ファイルとファイル→データベースの両方のステップは、バイナリアプローチの方がはるかに高速です。明らかな違いは、PythonがCOPYファイルを準備する方法です。これはテキストに対して非常に低速です。一般的に、バイナリ形式は、このスキーマのテキスト形式として2/3の時間でデータベースに読み込まれます。

    最後に、データベース内の両方のテーブルの値を比較して、数値が異なるかどうかを確認しました。行の約1.46%は、列s0に対して異なる値を持っています 、この割合はs6で6.17%に増加します (おそらく私が使用したランダムな方法に関連しています)。すべての70M32ビットフロート値間のゼロ以外の絶対差は、9.3132257e-010から7.6293945e-006の範囲です。テキストとバイナリの読み込み方法のこれらの小さな違いは、テキスト形式の方法に必要なfloat→text→float変換による精度の低下によるものです。




    1. OracleINTERSECTオペレーターの説明

    2. DEFAULT NULLのMySQL列-文体の選択ですか、それともそうですか?

    3. これらのメトリックを使用してSQLServerのパフォーマンスを測定していますか?

    4. OracleでSQL実行プランを確認するにはどうすればよいですか?