何が速くなるかに本当に興味があるので、それらを比較するためのいくつかの可能な方法をテストしました:
- 単純な
executemany
トリックなし。 -
APPEND_VALUES
と同じ ステートメント内のヒント。 union all
別の質問で試したアプローチ。 非常に大きいを生成するため、これは上記よりも遅くなります。 ステートメント(データ自体よりも多くのネットワークを必要とする可能性があります)。次に、DB側で解析する必要があります。これも多くの時間を消費し、すべての利点を無視します(潜在的なサイズ制限については説明しません)。次に、executemany
'10万レコードの単一のステートメントを作成しないようにチャンクでテストするためにそれを編集しました。安全を確保したかったので、ステートメント内で値の連結を使用しませんでした。-
insert all
> 。同じ欠点がありますが、組合はありません。union
と比較してください バージョン。 - JSONでデータをシリアル化し、DB側でを使用して逆シリアル化します。
json_table
。 JSONのオーバーヘッドがほとんどなく、単一の短いステートメントと単一のデータ転送でパフォーマンスが向上する可能性があります。 - 提案された
FORALL
PL/SQLラッパープロシージャ内。executemany
と同じである必要があります 同じことをしますが、データベース側です。データのコレクションへの変換のオーバーヘッド。 - 同じ
FORALL
、ただし、データを渡すための列アプローチを使用する場合:複合型の代わりに列値の単純なリストを渡します。FORALL
よりもはるかに高速である必要があります データをコレクションのタイプにシリアル化する必要がないため、コレクションを使用します。
無料のアカウントでOracleCloudのOracleAutonomousDatabaseを使用しました。各メソッドは、100kレコードの同じ入力データセットを使用してループで10回実行され、各テストの前にテーブルが再作成されました。これは私が得た結果です。ここでの準備時間と実行時間は、それぞれクライアント側のDB呼び出し自体でのデータ変換です。
>>> t = PerfTest(100000)
>>> t.run("exec_many", 10)
Method: exec_many.
Duration, avg: 2.3083874 s
Preparation time, avg: 0.0 s
Execution time, avg: 2.3083874 s
>>> t.run("exec_many_append", 10)
Method: exec_many_append.
Duration, avg: 2.6031369 s
Preparation time, avg: 0.0 s
Execution time, avg: 2.6031369 s
>>> t.run("union_all", 10, 10000)
Method: union_all.
Duration, avg: 27.9444233 s
Preparation time, avg: 0.0408773 s
Execution time, avg: 27.8457551 s
>>> t.run("insert_all", 10, 10000)
Method: insert_all.
Duration, avg: 70.6442494 s
Preparation time, avg: 0.0289269 s
Execution time, avg: 70.5541995 s
>>> t.run("json_table", 10)
Method: json_table.
Duration, avg: 10.4648237 s
Preparation time, avg: 9.7907693 s
Execution time, avg: 0.621006 s
>>> t.run("forall", 10)
Method: forall.
Duration, avg: 5.5622837 s
Preparation time, avg: 1.8972456000000002 s
Execution time, avg: 3.6650380999999994 s
>>> t.run("forall_columnar", 10)
Method: forall_columnar.
Duration, avg: 2.6702698000000002 s
Preparation time, avg: 0.055710800000000005 s
Execution time, avg: 2.6105702 s
>>>
最速の方法は、executemany
だけです。 、それほど驚くことではありません。ここで興味深いのは、APPEND_VALUES
クエリは改善されず、平均してより多くの時間がかかるため、さらに調査が必要です。
FORALL
について :予想どおり、各列の個々の配列は、データの準備がないため、時間がかかりません。それは多かれ少なかれexecutemany
に匹敵します 、しかし、ここではPL/SQLのオーバーヘッドが何らかの役割を果たしていると思います。
私にとってもう1つの興味深い部分は、JSONです。ほとんどの時間はデータベースへのLOBの書き込みとシリアル化に費やされましたが、クエリ自体は非常に高速でした。おそらく、書き込み操作は、chuncsizeまたはLOBデータをselectステートメントに渡す別の方法で改善できますが、私のコードでは、executemany
を使用した非常に単純で単純なアプローチとはほど遠いです。 。
Pythonを使わずにすべきアプローチも考えられます。 外部データのネイティブツールとしては高速ですが、テストはしていません:
以下は、私がテストに使用したコードです。
import cx_Oracle as db
import os, random, json
import datetime as dt
class PerfTest:
def __init__(self, size):
self._con = db.connect(
os.environ["ora_cloud_usr"],
os.environ["ora_cloud_pwd"],
"test_low",
encoding="UTF-8"
)
self._cur = self._con.cursor()
self.inp = [(i, "Test {i}".format(i=i), random.random()) for i in range(size)]
def __del__(self):
if self._con:
self._con.rollback()
self._con.close()
#Create objets
def setup(self):
try:
self._cur.execute("drop table rand")
#print("table dropped")
except:
pass
self._cur.execute("""create table rand(
id int,
str varchar2(100),
val number
)""")
self._cur.execute("""create or replace package pkg_test as
type ts_test is record (
id rand.id%type,
str rand.str%type,
val rand.val%type
);
type tt_test is table of ts_test index by pls_integer;
type tt_ids is table of rand.id%type index by pls_integer;
type tt_strs is table of rand.str%type index by pls_integer;
type tt_vals is table of rand.val%type index by pls_integer;
procedure write_data(p_data in tt_test);
procedure write_data_columnar(
p_ids in tt_ids,
p_strs in tt_strs,
p_vals in tt_vals
);
end;""")
self._cur.execute("""create or replace package body pkg_test as
procedure write_data(p_data in tt_test)
as
begin
forall i in indices of p_data
insert into rand(id, str, val)
values (p_data(i).id, p_data(i).str, p_data(i).val)
;
commit;
end;
procedure write_data_columnar(
p_ids in tt_ids,
p_strs in tt_strs,
p_vals in tt_vals
) as
begin
forall i in indices of p_ids
insert into rand(id, str, val)
values (p_ids(i), p_strs(i), p_vals(i))
;
commit;
end;
end;
""")
def build_union(self, size):
return """insert into rand(id, str, val)
select id, str, val from rand where 1 = 0 union all
""" + """ union all """.join(
["select :{}, :{}, :{} from dual".format(i*3+1, i*3+2, i*3+3)
for i in range(size)]
)
def build_insert_all(self, size):
return """
""".join(
["into rand(id, str, val) values (:{}, :{}, :{})".format(i*3+1, i*3+2, i*3+3)
for i in range(size)]
)
#Test case with executemany
def exec_many(self):
start = dt.datetime.now()
self._cur.executemany("insert into rand(id, str, val) values (:1, :2, :3)", self.inp)
self._con.commit()
return (dt.timedelta(0), dt.datetime.now() - start)
#The same as above but with prepared statement (no parsing)
def exec_many_append(self):
start = dt.datetime.now()
self._cur.executemany("insert /*+APPEND_VALUES*/ into rand(id, str, val) values (:1, :2, :3)", self.inp)
self._con.commit()
return (dt.timedelta(0), dt.datetime.now() - start)
#Union All approach (chunked). Should have large parse time
def union_all(self, size):
##Chunked list of big tuples
start_prepare = dt.datetime.now()
new_inp = [
tuple([item for t in r for item in t])
for r in list(zip(*[iter(self.inp)]*size))
]
new_stmt = self.build_union(size)
dur_prepare = dt.datetime.now() - start_prepare
#Execute unions
start_exec = dt.datetime.now()
self._cur.executemany(new_stmt, new_inp)
dur_exec = dt.datetime.now() - start_exec
##In case the size is not a divisor
remainder = len(self.inp) % size
if remainder > 0 :
start_prepare = dt.datetime.now()
new_stmt = self.build_union(remainder)
new_inp = tuple([
item for t in self.inp[-remainder:] for item in t
])
dur_prepare += dt.datetime.now() - start_prepare
start_exec = dt.datetime.now()
self._cur.execute(new_stmt, new_inp)
dur_exec += dt.datetime.now() - start_exec
self._con.commit()
return (dur_prepare, dur_exec)
#The same as union all, but with no need to union something
def insert_all(self, size):
##Chunked list of big tuples
start_prepare = dt.datetime.now()
new_inp = [
tuple([item for t in r for item in t])
for r in list(zip(*[iter(self.inp)]*size))
]
new_stmt = """insert all
{}
select * from dual"""
dur_prepare = dt.datetime.now() - start_prepare
#Execute
start_exec = dt.datetime.now()
self._cur.executemany(
new_stmt.format(self.build_insert_all(size)),
new_inp
)
dur_exec = dt.datetime.now() - start_exec
##In case the size is not a divisor
remainder = len(self.inp) % size
if remainder > 0 :
start_prepare = dt.datetime.now()
new_inp = tuple([
item for t in self.inp[-remainder:] for item in t
])
dur_prepare += dt.datetime.now() - start_prepare
start_exec = dt.datetime.now()
self._cur.execute(
new_stmt.format(self.build_insert_all(remainder)),
new_inp
)
dur_exec += dt.datetime.now() - start_exec
self._con.commit()
return (dur_prepare, dur_exec)
#Serialize at server side and do deserialization at DB side
def json_table(self):
start_prepare = dt.datetime.now()
new_inp = json.dumps([
{ "id":t[0], "str":t[1], "val":t[2]} for t in self.inp
])
lob_var = self._con.createlob(db.DB_TYPE_CLOB)
lob_var.write(new_inp)
start_exec = dt.datetime.now()
self._cur.execute("""
insert into rand(id, str, val)
select id, str, val
from json_table(
to_clob(:json), '$[*]'
columns
id int,
str varchar2(100),
val number
)
""", json=lob_var)
dur_exec = dt.datetime.now() - start_exec
self._con.commit()
return (start_exec - start_prepare, dur_exec)
#PL/SQL with FORALL
def forall(self):
start_prepare = dt.datetime.now()
collection_type = self._con.gettype("PKG_TEST.TT_TEST")
record_type = self._con.gettype("PKG_TEST.TS_TEST")
def recBuilder(x):
rec = record_type.newobject()
rec.ID = x[0]
rec.STR = x[1]
rec.VAL = x[2]
return rec
inp_collection = collection_type.newobject([
recBuilder(i) for i in self.inp
])
start_exec = dt.datetime.now()
self._cur.callproc("pkg_test.write_data", [inp_collection])
dur_exec = dt.datetime.now() - start_exec
return (start_exec - start_prepare, dur_exec)
#PL/SQL with FORALL and plain collections
def forall_columnar(self):
start_prepare = dt.datetime.now()
ids, strs, vals = map(list, zip(*self.inp))
start_exec = dt.datetime.now()
self._cur.callproc("pkg_test.write_data_columnar", [ids, strs, vals])
dur_exec = dt.datetime.now() - start_exec
return (start_exec - start_prepare, dur_exec)
#Run test
def run(self, method, iterations, *args):
#Cleanup schema
self.setup()
start = dt.datetime.now()
runtime = []
for i in range(iterations):
single_run = getattr(self, method)(*args)
runtime.append(single_run)
dur = dt.datetime.now() - start
dur_prep_total = sum([i.total_seconds() for i, _ in runtime])
dur_exec_total = sum([i.total_seconds() for _, i in runtime])
print("""Method: {meth}.
Duration, avg: {run_dur} s
Preparation time, avg: {prep} s
Execution time, avg: {ex} s""".format(
inp_s=len(self.inp),
meth=method,
run_dur=dur.total_seconds() / iterations,
prep=dur_prep_total / iterations,
ex=dur_exec_total / iterations
))