sql >> データベース >  >> RDS >> Oracle

FORALLを使用して配列からデータを挿入する方法はありますか?

    何が速くなるかに本当に興味があるので、それらを比較するためのいくつかの可能な方法をテストしました:

    • 単純なexecutemany トリックなし。
    • APPEND_VALUESと同じ ステートメント内のヒント。
    • union all 別の質問で試したアプローチ。 非常に大きいを生成するため、これは上記よりも遅くなります。 ステートメント(データ自体よりも多くのネットワークを必要とする可能性があります)。次に、DB側で解析する必要があります。これも多くの時間を消費し、すべての利点を無視します(潜在的なサイズ制限については説明しません)。次に、executemany '10万レコードの単一のステートメントを作成しないようにチャンクでテストするためにそれを編集しました。安全を確保したかったので、ステートメント内で値の連結を使用しませんでした。
    • insert all> 。同じ欠点がありますが、組合はありません。 unionと比較してください バージョン。
    • JSONでデータをシリアル化し、DB側でを使用して逆シリアル化します。 json_table 。 JSONのオーバーヘッドがほとんどなく、単一の短いステートメントと単一のデータ転送でパフォーマンスが向上する可能性があります。
    • 提案された FORALL PL/SQLラッパープロシージャ内。 executemanyと同じである必要があります 同じことをしますが、データベース側です。データのコレクションへの変換のオーバーヘッド。
    • 同じFORALL 、ただし、データを渡すための列アプローチを使用する場合:複合型の代わりに列値の単純なリストを渡します。 FORALLよりもはるかに高速である必要があります データをコレクションのタイプにシリアル化する必要がないため、コレクションを使用します。

    無料のアカウントでOracleCloudのOracleAutonomousDatabaseを使用しました。各メソッドは、100kレコードの同じ入力データセットを使用してループで10回実行され、各テストの前にテーブルが再作成されました。これは私が得た結果です。ここでの準備時間と実行時間は、それぞれクライアント側のDB呼び出し自体でのデータ変換です。

    >>> t = PerfTest(100000)
    >>> t.run("exec_many", 10)
    Method:  exec_many.
        Duration, avg: 2.3083874 s
        Preparation time, avg: 0.0 s
        Execution time, avg: 2.3083874 s
    >>> t.run("exec_many_append", 10)
    Method: exec_many_append.
        Duration, avg: 2.6031369 s
        Preparation time, avg: 0.0 s
        Execution time, avg: 2.6031369 s
    >>> t.run("union_all", 10, 10000)
    Method:  union_all.
        Duration, avg: 27.9444233 s
        Preparation time, avg: 0.0408773 s
        Execution time, avg: 27.8457551 s
    >>> t.run("insert_all", 10, 10000)
    Method: insert_all.
        Duration, avg: 70.6442494 s
        Preparation time, avg: 0.0289269 s
        Execution time, avg: 70.5541995 s
    >>> t.run("json_table", 10)
    Method: json_table.
        Duration, avg: 10.4648237 s
        Preparation time, avg: 9.7907693 s
        Execution time, avg: 0.621006 s
    >>> t.run("forall", 10)
    Method:     forall.
        Duration, avg: 5.5622837 s
        Preparation time, avg: 1.8972456000000002 s
        Execution time, avg: 3.6650380999999994 s
    >>> t.run("forall_columnar", 10)
    Method: forall_columnar.
        Duration, avg: 2.6702698000000002 s
        Preparation time, avg: 0.055710800000000005 s
        Execution time, avg: 2.6105702 s
    >>> 
    

    最速の方法は、executemanyだけです。 、それほど驚くことではありません。ここで興味深いのは、APPEND_VALUES クエリは改善されず、平均してより多くの時間がかかるため、さらに調査が必要です。

    FORALLについて :予想どおり、各列の個々の配列は、データの準備がないため、時間がかかりません。それは多かれ少なかれexecutemanyに匹敵します 、しかし、ここではPL/SQLのオーバーヘッドが何らかの役割を果たしていると思います。

    私にとってもう1つの興味深い部分は、JSONです。ほとんどの時間はデータベースへのLOBの書き込みとシリアル化に費やされましたが、クエリ自体は非常に高速でした。おそらく、書き込み操作は、chuncsizeまたはLOBデータをselectステートメントに渡す別の方法で改善できますが、私のコードでは、executemanyを使用した非常に単純で単純なアプローチとはほど遠いです。 。

    Pythonを使わずにすべきアプローチも考えられます。 外部データのネイティブツールとしては高速ですが、テストはしていません:

    以下は、私がテストに使用したコードです。

    import cx_Oracle as db
    import os, random, json
    import datetime as dt
    
    
    class PerfTest:
      
      def __init__(self, size):
        self._con = db.connect(
          os.environ["ora_cloud_usr"],
          os.environ["ora_cloud_pwd"],
          "test_low",
          encoding="UTF-8"
        )
        self._cur = self._con.cursor()
        self.inp = [(i, "Test {i}".format(i=i), random.random()) for i in range(size)]
      
      def __del__(self):
        if self._con:
          self._con.rollback()
          self._con.close()
     
    #Create objets
      def setup(self):
        try:
          self._cur.execute("drop table rand")
          #print("table dropped")
        except:
          pass
      
        self._cur.execute("""create table rand(
          id int,
          str varchar2(100),
          val number
        )""")
        
        self._cur.execute("""create or replace package pkg_test as
      type ts_test is record (
        id rand.id%type,
        str rand.str%type,
        val rand.val%type
      );
      type tt_test is table of ts_test index by pls_integer;
      
      type tt_ids is table of rand.id%type index by pls_integer;
      type tt_strs is table of rand.str%type index by pls_integer;
      type tt_vals is table of rand.val%type index by pls_integer;
      
      procedure write_data(p_data in tt_test);
      procedure write_data_columnar(
        p_ids in tt_ids,
        p_strs in tt_strs,
        p_vals in tt_vals
      );
    
    end;""")
        self._cur.execute("""create or replace package body pkg_test as
      procedure write_data(p_data in tt_test)
      as
      begin
        forall i in indices of p_data
          insert into rand(id, str, val)
          values (p_data(i).id, p_data(i).str, p_data(i).val)
        ;
        
        commit;
    
      end;
      
      procedure write_data_columnar(
        p_ids in tt_ids,
        p_strs in tt_strs,
        p_vals in tt_vals
      ) as
      begin
        forall i in indices of p_ids
          insert into rand(id, str, val)
          values (p_ids(i), p_strs(i), p_vals(i))
        ;
        
        commit;
        
      end;
    
    end;
    """)
    
     
      def build_union(self, size):
          return """insert into rand(id, str, val)
        select id, str, val from rand where 1 = 0 union all
        """ + """ union all """.join(
          ["select :{}, :{}, :{} from dual".format(i*3+1, i*3+2, i*3+3)
            for i in range(size)]
        )
     
     
      def build_insert_all(self, size):
          return """
          """.join(
          ["into rand(id, str, val) values (:{}, :{}, :{})".format(i*3+1, i*3+2, i*3+3)
            for i in range(size)]
        )
    
    
    #Test case with executemany
      def exec_many(self):
        start = dt.datetime.now()
        self._cur.executemany("insert into rand(id, str, val) values (:1, :2, :3)", self.inp)
        self._con.commit()
        
        return (dt.timedelta(0), dt.datetime.now() - start)
     
     
    #The same as above but with prepared statement (no parsing)
      def exec_many_append(self):
        start = dt.datetime.now()
        self._cur.executemany("insert /*+APPEND_VALUES*/ into rand(id, str, val) values (:1, :2, :3)", self.inp)
        self._con.commit()
        
        return (dt.timedelta(0), dt.datetime.now() - start)
    
    
    #Union All approach (chunked). Should have large parse time
      def union_all(self, size):
    ##Chunked list of big tuples
        start_prepare = dt.datetime.now()
        new_inp = [
          tuple([item for t in r for item in t])
          for r in list(zip(*[iter(self.inp)]*size))
        ]
        new_stmt = self.build_union(size)
        
        dur_prepare = dt.datetime.now() - start_prepare
        
        #Execute unions
        start_exec = dt.datetime.now()
        self._cur.executemany(new_stmt, new_inp)
        dur_exec = dt.datetime.now() - start_exec
    
    ##In case the size is not a divisor
        remainder = len(self.inp) % size
        if remainder > 0 :
          start_prepare = dt.datetime.now()
          new_stmt = self.build_union(remainder)
          new_inp = tuple([
            item for t in self.inp[-remainder:] for item in t
          ])
          dur_prepare += dt.datetime.now() - start_prepare
          
          start_exec = dt.datetime.now()
          self._cur.execute(new_stmt, new_inp)
          dur_exec += dt.datetime.now() - start_exec
    
        self._con.commit()
        
        return (dur_prepare, dur_exec)
    
    
    #The same as union all, but with no need to union something
      def insert_all(self, size):
    ##Chunked list of big tuples
        start_prepare = dt.datetime.now()
        new_inp = [
          tuple([item for t in r for item in t])
          for r in list(zip(*[iter(self.inp)]*size))
        ]
        new_stmt = """insert all
        {}
        select * from dual"""
        dur_prepare = dt.datetime.now() - start_prepare
        
        #Execute
        start_exec = dt.datetime.now()
        self._cur.executemany(
          new_stmt.format(self.build_insert_all(size)),
          new_inp
        )
        dur_exec = dt.datetime.now() - start_exec
    
    ##In case the size is not a divisor
        remainder = len(self.inp) % size
        if remainder > 0 :
          start_prepare = dt.datetime.now()
          new_inp = tuple([
            item for t in self.inp[-remainder:] for item in t
          ])
          dur_prepare += dt.datetime.now() - start_prepare
          
          start_exec = dt.datetime.now()
          self._cur.execute(
            new_stmt.format(self.build_insert_all(remainder)),
            new_inp
          )
          dur_exec += dt.datetime.now() - start_exec
    
        self._con.commit()
        
        return (dur_prepare, dur_exec)
    
        
    #Serialize at server side and do deserialization at DB side
      def json_table(self):
        start_prepare = dt.datetime.now()
        new_inp = json.dumps([
          { "id":t[0], "str":t[1], "val":t[2]} for t in self.inp
        ])
        
        lob_var = self._con.createlob(db.DB_TYPE_CLOB)
        lob_var.write(new_inp)
        
        start_exec = dt.datetime.now()
        self._cur.execute("""
        insert into rand(id, str, val)
        select id, str, val
        from json_table(
          to_clob(:json), '$[*]'
          columns
            id int,
            str varchar2(100),
            val number
        )
        """, json=lob_var)
        dur_exec = dt.datetime.now() - start_exec
        
        self._con.commit()
        
        return (start_exec - start_prepare, dur_exec)
    
    
    #PL/SQL with FORALL
      def forall(self):
        start_prepare = dt.datetime.now()
        collection_type = self._con.gettype("PKG_TEST.TT_TEST")
        record_type = self._con.gettype("PKG_TEST.TS_TEST")
        
        def recBuilder(x):
          rec = record_type.newobject()
          rec.ID = x[0]
          rec.STR = x[1]
          rec.VAL = x[2]
          
          return rec
    
        inp_collection = collection_type.newobject([
          recBuilder(i) for i in self.inp
        ])
        
        start_exec = dt.datetime.now()
        self._cur.callproc("pkg_test.write_data", [inp_collection])
        dur_exec = dt.datetime.now() - start_exec
        
        return (start_exec - start_prepare, dur_exec)
    
    
    #PL/SQL with FORALL and plain collections
      def forall_columnar(self):
        start_prepare = dt.datetime.now()
        ids, strs, vals = map(list, zip(*self.inp))
        start_exec = dt.datetime.now()
        self._cur.callproc("pkg_test.write_data_columnar", [ids, strs, vals])
        dur_exec = dt.datetime.now() - start_exec
        
        return (start_exec - start_prepare, dur_exec)
    
      
    #Run test
      def run(self, method, iterations, *args):
        #Cleanup schema
        self.setup()
    
        start = dt.datetime.now()
        runtime = []
        for i in range(iterations):
          single_run = getattr(self, method)(*args)
          runtime.append(single_run)
        
        dur = dt.datetime.now() - start
        dur_prep_total = sum([i.total_seconds() for i, _ in runtime])
        dur_exec_total = sum([i.total_seconds() for _, i in runtime])
        
        print("""Method: {meth}.
        Duration, avg: {run_dur} s
        Preparation time, avg: {prep} s
        Execution time, avg: {ex} s""".format(
          inp_s=len(self.inp),
          meth=method,
          run_dur=dur.total_seconds() / iterations,
          prep=dur_prep_total / iterations,
          ex=dur_exec_total / iterations
        ))
    
    



    1. MySQLデータを使用した無限スクロール

    2. SQLServer2017にアップグレードする理由

    3. カーソルでフィールド値を取得する

    4. MySQLの日付形式-日付の挿入が難しい