このシリーズの前の2つの記事では、PythonとSQLAlchemyを使用してETLプロセスを実行する方法について説明しました。今日も同じことをしますが、今回はPythonとSQLAlchemyをSQLコマンドなしで使用します テキスト形式で。これにより、接続しているデータベースエンジンに関係なく、SQLAlchemyを使用できるようになります。それでは、始めましょう。
今日は、PythonとSQLAlchemyを使用してETLプロセスを実行する方法について説明します。運用データベースから毎日のデータを抽出して変換し、データウェアハウスにロードするスクリプトを作成します。
これはシリーズの3番目の記事です。最初の2つの記事(ETLプロセスとSQLAlchemyでのPythonとMySQLの使用)をまだ読んでいない場合は、続行する前に読むことを強くお勧めします。
このシリーズ全体は、データウェアハウスシリーズの続きです:
- DWHの作成、パート1:サブスクリプションビジネスデータモデル
- DWHの作成、パート2:サブスクリプションビジネスデータモデル
- データウェアハウスの作成、パート3:サブスクリプションビジネスデータモデル
では、今日のトピックから始めましょう。まず、データモデルを見てみましょう。
データモデル
運用(ライブ)データベースデータモデル
DWHデータモデル
これらは、使用する2つのデータモデルです。データウェアハウス(DWH)の詳細については、次の記事をご覧ください。
- スタースキーマ
- スノーフレークスキーマ
- スタースキーマとスノーフレークスキーマ
なぜSQLAlchemyなのか
SQLAlchemyの背後にある全体的な考え方は、データベースをインポートした後、関連するデータベースエンジンに固有のSQLコードは必要ないということです。代わりに、オブジェクトをSQLAlchemyにインポートし、ステートメントにSQLAlchemy構文を使用できます。これにより、接続しているデータベースエンジンに関係なく、同じ言語を使用できるようになります。ここでの主な利点は、開発者が異なるデータベースエンジン間の違いに注意を払う必要がないことです。別のデータベースエンジンに移行した場合、SQLAlchemyプログラムは(わずかな変更を加えて)まったく同じように機能します。
一時ストレージとの通信や異なるデータベース間の通信には、SQLAlchemyコマンドとPythonリストのみを使用することにしました。この決定の背後にある主な理由は、1)Pythonリストがよく知られていること、および2)Pythonのスキルがなくてもコードが読めることです。
これは、SQLAlchemyが完璧だと言っているのではありません。これには特定の制限がありますが、これについては後で説明します。とりあえず、以下のコードを見てみましょう:
スクリプトの実行と結果
これは、スクリプトを呼び出すために使用されるPythonコマンドです。スクリプトは、運用データベース内のデータをチェックし、値をDWHと比較して、新しい値をインポートします。この例では、2つのディメンションテーブルと1つのファクトテーブルの値を更新しています。スクリプトは適切な出力を返します。スクリプト全体は、1日に複数回実行できるように作成されています。その日の「古い」データを削除し、新しいデータに置き換えます。
スクリプト全体を上から分析してみましょう。
SQLAlchemyのインポート
最初に行う必要があるのは、スクリプトで使用するモジュールをインポートすることです。通常、スクリプトを作成するときにモジュールをインポートします。ほとんどの場合、最初に必要なモジュールが正確にわからないでしょう。
from datetime import date # import SQLAlchemy from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case
Pythonのdatetime
をインポートしました モジュール。日付を処理するクラスを提供します。
次に、sqlalchemy
があります モジュール。モジュール全体をインポートするのではなく、必要なものだけをインポートします。SQLAlchemy(create_engine
に固有のものもあります) 、MetaData
、Table
)、一部のSQLステートメント部分(select
、and_
、case
)、およびfunc
、 count()などの関数を使用できるようにします およびsum() 。
データベースへの接続
サーバー上の2つのデータベースに接続する必要があります。必要に応じて、さまざまなサーバーからより多くのデータベース(MySQL、SQL Server、またはその他)に接続できます。この場合、両方のデータベースはMySQLデータベースであり、ローカルマシンに保存されています。
# connect to databases engine_live = sqlalchemy.create_engine('mysql+pymysql://: @localhost:3306/subscription_live') connection_live = engine_live.connect() engine_dwh = sqlalchemy.create_engine('mysql+pymysql:// : @localhost:3306/subscription_dwh') connection_dwh = engine_dwh.connect() metadata = MetaData(bind=None)
2つのエンジンと2つの接続を作成しました。前回の記事でこのビットについてすでに説明したので、ここでは詳しく説明しません。
dim_timeの更新 寸法
目標:テーブルにまだ挿入されていない場合は、昨日の日付を挿入します。
スクリプトでは、2つのディメンションテーブルを新しい値で更新します。残りは同じパターンに従うので、これについては1回だけ説明します。ほぼ同じコードをあと数回書き留める必要はありません。
アイデアはとてもシンプルです。常にスクリプトを実行して、昨日の新しいデータを挿入します。したがって、その日付がディメンションテーブルに挿入されているかどうかを確認する必要があります。すでにそこにある場合は、何もしません。そうでない場合は、追加します。 dim_time
を更新するコードを見てみましょう テーブル。
まず、日付が存在するかどうかを確認します。存在しない場合は追加します。昨日の日付を変数に保存することから始めます。 Pythonでは、次のようにします。
yesterday = date.fromordinal(date.today().toordinal()-1) yesterday_str = str(yesterday)
最初の行は現在の日付を取得して数値に変換し、その値から1を引いて、その数値を日付に戻します(昨日=今日– 1 )。 2行目は、日付をテキスト形式で保存します。
次に、日付がすでにデータベースにあるかどうかをテストします。
table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh) stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str) result = connection_dwh.execute(stmt).fetchall() date_exists = len(result)
テーブルを読み込んだ後、時刻/日付の値が昨日と等しいディメンションテーブルからすべての行を返すクエリを実行します。結果には、0(テーブルにそのような日付がない)または1行(日付がすでにテーブルにある)が含まれる可能性があります。
日付がまだテーブルにない場合は、insert()コマンドを使用して日付を追加します:
if date_exists == 0: print("New value added.") stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday()) connection_dwh.execute(stmt) else: print("No new values.")
ここで指摘したい新しいことの1つは、の使用法です。 .year
、.month
、.isocalendar()[1]
、および.weekday
日付パーツを取得します。
dim_cityの更新 寸法
目標:新しい都市がある場合は挿入します(つまり、ライブデータベースの都市のリストをDWHの都市のリストと比較し、不足している都市を追加します)。
dim_time
を更新しています 寸法はかなり単純でした。日付がテーブルにあるかどうかをテストし、まだない場合は挿入しました。 DWHデータベースの値をテストするために、Python変数を使用しました(昨日 )。そのプロセスをもう一度使用しますが、今回はリストを使用します。
異なるデータベースのテーブルを単一のSQLAlchemyクエリに組み合わせる簡単な方法はないため、このシリーズのパート1で概説したアプローチを使用することはできません。したがって、これら2つのデータベース間の通信に必要な値を格納するためのオブジェクトが必要になります。リストは一般的であり、機能するため、リストを使用することにしました。
まず、country
およびcity
ライブデータベースから関連オブジェクトへのテーブル。
# dim_city print("\nUpdating... dim_city") table_city = Table('city', metadata, autoload = True, autoload_with = engine_live) table_country = Table('country', metadata, autoload = True, autoload_with = engine_live) table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)
次に、dim_city
DWHからリストへのテーブル:
# load whole dwh table in the list stmt = select([table_dim_city]); table_dim_city_list = connection_dwh.execute(stmt).fetchall()
次に、ライブデータベースの値に対して同じことを行います。テーブルを結合しますcountry
およびcity
したがって、このリストに必要なすべてのデータがあります:
# load all live values in the list stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\ .select_from(table_city\ .join(table_country)) table_city_list = connection_live.execute(stmt).fetchall()
次に、ライブデータベースのデータを含むリストをループします。レコードごとに、値を比較します(city_name
、postal_code
、およびcountry_name
)。そのような値が見つからない場合は、dim_city
に新しいレコードを追加します テーブル。
# loop through live_db table # for each record test if it is missing in the dwh table new_values_added = 0 for city in table_city_list: id = -1; for dim_city in table_dim_city_list: if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]: id = dim_city[0] if id == -1: stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2]) connection_dwh.execute(stmt) new_values_added = 1 if new_values_added == 0: print("No new values.") else: print("New value(s) added.")
値がすでにDWHにあるかどうかを判断するために、一意である必要がある属性の組み合わせをテストしました。 (ライブデータベースの主キーは、ここではあまり役に立ちません。)同様のコードを使用して、他の辞書を更新できます。これは最も優れたソリューションではありませんが、それでもかなりエレガントなソリューションです。そして、それは私たちが必要とすることを正確に実行します。
fact_customer_subscribedの更新 表
目標:昨日の日付の古いデータがある場合は、最初にそれを削除します。前の手順で何かを削除したかどうかに関係なく、昨日のデータをDWHに追加します。
すべてのディメンションテーブルを更新した後、ファクトテーブルを更新する必要があります。スクリプトでは、1つのファクトテーブルのみを更新します。理由は前のセクションと同じです。他のテーブルの更新は同じパターンに従うため、ほとんどの場合、コードを繰り返します。
ファクトテーブルに値を挿入する前に、ディメンションテーブルから関連するキーの値を知る必要があります。そのために、ディメンションをリストに再度読み込み、ライブデータベースの値と比較します。
最初に行うことは、顧客とfact_customer_subscribed
を読み込むことです テーブルをオブジェクトに:
# fact_customer_subscribed print("\nUpdating... fact_customer_subscribed") table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live) table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)
次に、関連する時間ディメンションのキーを見つける必要があります。常に昨日のデータを挿入しているため、dim_time
でその日付を検索します テーブルを作成し、そのIDを使用します。クエリは1行を返し、IDは最初の位置にあります(インデックスは0から始まるため、result[0][0]
):
# find key for the dim_time dimension stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday) result = connection_dwh.execute(stmt).fetchall() dim_time_id = result[0][0]
その間、関連するすべてのレコードをファクトテーブルから削除します:
# delete any existing data in the fact table for that time dimension value stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id) connection_dwh.execute(stmt)
これで、時間ディメンションのIDがdim_time_id
に保存されました。 変数。時間ディメンション値は1つしか持てないため、これは簡単でした。ストーリーは都市の次元によって異なります。まず、すべてを読み込みます 必要な値–都市を一意に表す値(IDではない)と集計値:
# prepare data for insert stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\ .select_from(table_customer\ .join(table_city)\ .join(table_country))\ .group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)
上記のクエリについて強調したいことがいくつかあります:
-
func.sum(...)
SUM(...)です 「標準SQL」から。 -
case(...)
構文はand_
を使用します 条件の前であり、条件の間ではありません。 -
.label(...)
SQLASエイリアスのように機能します。 -
\
を使用しています 次の行に移動して、クエリの読みやすさを向上させます。 (私を信じてください、それはスラッシュなしではほとんど読めません–私はそれを試しました:)) -
.group_by(...)
SQLのGROUPBYの役割を果たします。
次に、前のクエリを使用して返されたすべてのレコードをループします。レコードごとに、都市を一意に定義する値を比較します(city_name
、postal_code
、country_name
)DWH dim_city
から作成されたリストに格納されている値 テーブル。 3つの値がすべて一致する場合は、リストのIDを保存し、新しいデータを挿入するときに使用します。このように、すべてのレコードについて、両方のディメンションのIDがあります:
# loop through all new records # use time dimension # for each record find key for city dimension # insert row new_values = connection_live.execute(stmt).fetchall() for new_value in new_values: dim_city_id = -1; for dim_city in table_dim_city_list: if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]: dim_city_id = dim_city[0] if dim_city_id > 0: stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6]) connection_dwh.execute(stmt_insert) dim_city_id = -1 print("Completed.")
以上です。 DWHを更新しました。すべてのディメンションテーブルとファクトテーブルを更新すると、スクリプトははるかに長くなります。ファクトテーブルがより多くのディメンションテーブルに関連付けられている場合も、複雑さが増します。その場合、 forが必要になります 各ディメンションテーブルのループ。
これは機能しません!
このスクリプトを書いたとき、私は非常にがっかりしましたが、このようなものは機能しないことがわかりました:
stmt = select([table_city.columns.city_name])\ .select_from(table_city\ .outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\ .where(table_dim_city.columns.id.is_(None))
この例では、2つの異なるデータベースのテーブルを使用しようとしています。 2つの別々の接続を確立した場合、最初の接続は別の接続のテーブルを「認識」しません。データベースではなくサーバーに直接接続すると、テーブルを読み込むことができなくなります。
これが変更されるまで(できればすぐに)、2つのデータベース間で通信するために、ある種の構造(たとえば、今日行ったこと)を使用する必要があります。 1つのクエリを2つのリストに置き換え、 for をネストする必要があるため、コードが複雑になります。 ループします。
SQLAlchemyとPythonについての考えを共有する
これがこのシリーズの最後の記事でした。しかし、誰が知っていますか?今後の記事で別のアプローチを試すかもしれませんので、ご期待ください。それまでの間、データベースと組み合わせたSQLAlchemyとPythonについての考えを共有してください。この記事には何が欠けていると思いますか?何を追加しますか?以下のコメントで教えてください。
この記事で使用した完全なスクリプトは、ここからダウンロードできます。
そして、この記事シリーズを推薦してくれたDirk J Bosman(@dirkjobosman)に特に感謝します。