前の2つのパートでは、サブスクリプションベースのビジネスのライブデータベースモデルと、レポートに使用できるデータウェアハウス(DWH)について説明しました。これらが連携する必要があることは明らかですが、これら2つのモデルの間に関連性はありませんでした。今日は、その次のステップに進み、ライブデータベースからDWHにデータを転送するためのコードを記述します。
データモデル
コードに飛び込む前に、使用する2つのモデルを思い出してみましょう。 1つは、リアルタイムデータの保存に使用するトランザクションデータモデルです。サブスクリプションベースのビジネスを運営していることを考慮して、顧客とサブスクリプションの詳細、顧客の注文、および注文ステータスを保存する必要があります。
支払いの追跡や履歴データ(特に顧客データとサブスクリプションデータの変更)の保存など、このモデルに追加できることは本当にたくさんあります。ただし、ETL(抽出、変換、および読み込み)プロセスを強調するために、このモデルをできるだけ単純に保ちたいと思います。
トランザクションデータモデルをレポートデータベースとして使用すると、場合によっては機能する可能性がありますが、すべての場合に機能するとは限りません。すでに述べましたが、繰り返す価値があります。レポートタスクをリアルタイムプロセスから分離する場合は、ある種のレポートデータベースを作成する必要があります。データウェアハウスは1つのソリューションです。
私たちのDWHは、4つのファクトテーブルを中心としています。最初の2つは、毎日のレベルで顧客とサブスクリプションの数を追跡します。残りの2つは、配達の数とこれらの配達に含まれる製品を追跡します。
私の想定では、ETLプロセスを1日1回実行します。まず、ディメンションテーブルに新しい値を入力します(必要な場合)。その後、ファクトテーブルにデータを入力します。
不必要な繰り返しを避けるために、最初の2つのディメンションテーブルと最初の2つのファクトテーブルにデータを入力するコードのみを示します。残りのテーブルは、非常によく似たコードを使用して入力できます。自分でコードを書き留めておくことをお勧めします。何か新しいことを学ぶには、それを試す以外に良い方法はありません。
アイデア:ディメンションテーブル
一般的な考え方は、DWH(ディメンションテーブルとファクトテーブル)にデータを入力するために定期的に使用できるストアドプロシージャを作成することです。これらの手順は、同じサーバー上の2つのデータベース間でデータを転送します。これは、これらのプロシージャ内の一部のクエリが両方のデータベースのテーブルを使用することを意味します。これは予想されることです。 DWHの状態をライブDBと比較し、ライブDBで何が起こっているかに応じてDWHに変更を加える必要があります。
DWHには4つのディメンションテーブルがあります:dim_time
、dim_city
、dim_product
、およびdim_delivery_status
。
時間ディメンションは、前の日付を追加することによって入力されます。主な前提は、営業終了後、この手順を毎日実行することです。
都市と製品のディメンションは、city
およびproduct
ライブデータベースの辞書。これらのディクショナリに何かを追加すると、次のDWH更新時に新しい値がディメンションテーブルに追加されます。
最後のディメンションテーブルはdim_delivery_status
テーブル。デフォルト値が3つしかないため、更新されません。配達は輸送中、キャンセル、または配達されています。
アイデア:ファクトテーブル
ファクトテーブルへの入力は、実際には実際の仕事です。ライブデータベースのディクショナリにはタイムスタンプ属性が含まれていませんが、操作の結果としてデータが挿入されたテーブルには含まれています。 time_inserted
およびtime_updated
、データモデル内。
繰り返しになりますが、DWHインポートは1日1回正常に実行されると想定しています。これにより、データを毎日のレベルで集約できます。アクティブな顧客とキャンセルされた顧客とサブスクリプションの数、およびその日の配信と配信された製品の数をカウントします。
COB(営業終了)の後に挿入手順を実行すると、ライブモデルが適切に機能します。それでも、柔軟性を高めたい場合は、モデルにいくつかの変更を加える必要があります。そのような変更の1つは、顧客またはサブスクリプションに関連するデータが変更された正確な瞬間を追跡するための個別の履歴テーブルを持つことです。現在の組織では、変更が行われたことはわかりますが、この変更の前に変更があったかどうかはわかりません(たとえば、顧客が昨日キャンセルし、深夜にアカウントを再開し、今日再びキャンセルした場合など) 。
ディメンションテーブルの作成
前述のように、DWHインポートを1日に1回だけ実行することを前提としています。そうでない場合は、ディメンションテーブルとファクトテーブルから新しく挿入されたデータを削除するための追加のコードが必要になります。ディメンションテーブルの場合、これは指定された日付の削除に限定されます。
まず、指定された日付がdim_time
テーブル。そうでない場合は、テーブルに新しい行を追加します。もしそうなら、私たちは何もする必要はありません。ほとんどの場合、すべての日付は最初の実稼働展開中に挿入されます。ただし、この例は教育目的で使用します。
dim_city
およびdim_product
ディメンション、city
およびproduct
テーブル。以前に挿入された値はファクトテーブルで参照される可能性があるため、削除は行いません。ソフト削除を使用できます。例:オンとオフを切り替えることができる「アクティブ」フラグがあります。
最後のテーブルでは、dim_delivery_status
、常に同じ3つの値が含まれるため、何もしません。
以下のコードは、ディメンションテーブルにデータを入力するプロシージャを作成しますdim_time
およびdim_city
。
時間ディメンションについては、昨日の日付を追加します。 ETLプロセスは深夜0時過ぎに開始されると想定しています。そのディメンションがすでに存在するかどうかを確認し、存在しない場合は、テーブルに新しい日付を追加します。
都市のディメンションについては、LEFT JOINを使用して、ライブデータベースとDWHデータベースのデータを結合し、欠落している行を特定します。次に、不足しているデータのみをディメンションテーブルに追加します。データが変更されたかどうかを確認する方法はいくつかあります。このプロセスは、変更データキャプチャまたはCDCと呼ばれます。一般的な方法は、更新されたタイムスタンプまたはバージョンをチェックすることです。追加の方法はいくつかありますが、この記事の範囲外です。
ここで、MySQL構文を使用して記述された
DROP PROCEDURE IF EXISTS p_update_dimensions// CREATE PROCEDURE p_update_dimensions () BEGIN SET @time_exists = 0; SET @time_date = DATE_ADD(DATE(NOW()), INTERVAL -1 DAY); -- procedure populates dimension tables with new values -- dim_time SET @time_exists = (SELECT COUNT(*) FROM subscription_dwh.dim_time dim_time WHERE dim_time.time_date = @time_date); IF (@time_exists = 0) THEN INSERT INTO subscription_dwh.`dim_time`(`time_date`, `time_year`, `time_month`, `time_week`, `time_weekday`, `ts`) SELECT @time_date AS time_date, YEAR(@time_date) AS time_year, MONTH(@time_date) AS time_month, WEEK(@time_date) AS time_week, WEEKDAY(@time_date) AS time_weekday, NOW() AS ts; END IF; -- dim_city INSERT INTO subscription_dwh.`dim_city`(`city_name`, `postal_code`, `country_name`, `ts`) SELECT city_live.city_name, city_live.postal_code, country_live.country_name, Now() FROM subscription_live.city city_live INNER JOIN subscription_live.country country_live ON city_live.country_id = country_live.id LEFT JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name WHERE city_dwh.id IS NULL; END// -- CALL p_update_dimensions ()
このプロシージャの実行-コメント付きプロシージャ CALLを使用して実行します --新しい日付と欠落しているすべての都市をディメンションテーブルに挿入します。独自のコードを追加して、残りの2つのディメンションテーブルに新しい値を入力してみてください。
データウェアハウスのETLプロセス
データウェアハウジングの背後にある主なアイデアは、必要な形式で集約されたデータを含めることです。もちろん、倉庫の建設を始める前に、そのフォーマットを知っておく必要があります。計画どおりにすべてを実行した場合、DWHが提供するすべてのメリットを享受できます。主な利点は、クエリを実行するときのパフォーマンスが向上することです。私たちのクエリは、より少ないレコードで機能し(集約されているため)、(ライブデータベースではなく)レポートデータベースで実行されます。
ただし、クエリを実行する前に、データベースにファクトを保存する必要があります。それを行う方法は、後でデータをどのように処理する必要があるかによって異なります。 DWHの構築を開始する前に全体像がよくわからない場合は、すぐに問題が発生する可能性があります。すぐに。
このプロセスの名前はETLです。E=抽出、T =変換、L=ロード。データを取得し、DWH構造に合わせて変換し、DWHにロードします。正確には、使用する実際のプロセスはELT:抽出、ロード、変換です。ストアドプロシージャを使用しているため、データを抽出してロードし、ニーズに合わせて変換します。 ETLとELTは少し異なりますが、これらの用語は同じ意味で使用される場合があることを知っておくとよいでしょう。
ファクトテーブルへの入力
ファクトテーブルにデータを入力することが、私たちが本当にここにいる理由です。今日は、fact_customer_subscribed
テーブルとfact_subscription_status
テーブル。残りの2つのファクトテーブルは、宿題として試すことができます。
ファクトテーブルの入力に進む前に、ディメンションテーブルに新しい値が入力されていると想定する必要があります。ファクトテーブルへの入力は同じパターンに従います。構造が同じなので、両方を一緒に説明します。
データは、時間と都市の2つのディメンションでグループ化されています。時間ディメンションは昨日に設定され、関連するレコードのIDはdim_time
日付を比較して表を作成します(両方のクエリの最後の内部結合)。
dim_city
ディメンションテーブルでUNIQUEの組み合わせを形成するすべての属性(都市名、郵便番号、国名)を結合することによって抽出されます。
このクエリでは、CASEを使用して値をテストしてから、それらを合計します。アクティブなお客様と非アクティブなお客様については、日付をテストしていません。ただし、これらのフィールドには現状のままの値を選択しました。新規およびキャンセルされたアカウントについて、更新された時間をテストしました。
DROP PROCEDURE IF EXISTS p_update_facts// CREATE PROCEDURE p_update_facts () BEGIN SET @time_date = DATE_ADD(DATE(NOW()), INTERVAL -1 DAY); -- procedure populates fact tables with new values -- fact_customer_subscribed INSERT INTO `fact_customer_subscribed`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN customer_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN customer_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN customer_live.active = 1 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN customer_live.active = 0 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts FROM subscription_live.`customer` customer_live INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = @time_date GROUP BY city_dwh.id, time_dwh.id; -- fact_subscription_status INSERT INTO `fact_subscription_status`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN subscription_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN subscription_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN subscription_live.active = 1 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN subscription_live.active = 0 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts FROM subscription_live.`customer` customer_live INNER JOIN subscription_live.`subscription` subscription_live ON subscription_live.customer_id = customer_live.id INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = @time_date GROUP BY city_dwh.id, time_dwh.id; END// -- CALL p_update_facts ()
もう一度、最後の行をコメントアウトしました。コメントを削除すると、この行を使用してプロシージャを呼び出し、新しい値を挿入できます。既存の古い値を削除していないことに注意してください。その日付と都市の値がすでにある場合、この手順は機能しません。これは、挿入前に削除を実行することで解決できます。
DWHの残りのファクトテーブルにデータを入力する必要があることを忘れないでください。自分で試してみることをお勧めします!
私が絶対にお勧めするもう1つのことは、プロセス全体をトランザクション内に配置することです。これにより、すべての挿入が成功するか、何も挿入されないことが保証されます。これは、データが部分的に挿入されないようにする場合に非常に重要です。ディメンションとファクトを挿入するための複数の手順があり、それらの一部が機能し、他の手順が失敗した場合。
どう思いますか?
今日は、ELT / ETLプロセスを実行し、ライブデータベースからデータウェアハウスにデータを読み込む方法を見てきました。私たちが示したプロセスはかなり単純化されていますが、データをE(抽出)し、適切な形式にT(変換)し、最後にDWHにL(oad)するために必要なすべての要素が含まれています。どう思いますか?以下のコメントであなたの経験を教えてください。