sql >> データベース >  >> RDS >> Sqlserver

SQLServerで変更データキャプチャを使用してインクリメンタルロードを実装する

    この記事は、データ統合に対処しなければならないことが多い人にとって興味深いものになります。

    はじめに

    ユーザーが常にデータを変更(更新または削除)するデータベースがあると想定します。おそらく、このデータベースは、テーブル構造の変更を許可しない大規模なアプリケーションによって使用されます。タスクは、このデータベースから別のサーバー上の別のデータベースにデータを時々ロードすることです。この問題に取り組む最も簡単な方法は、ターゲットデータベースを事前にクリーンアップして、ソースデータベースからターゲットデータベースに新しいデータをロードすることです。この方法は、データの読み込み時間が許容範囲内であり、事前に設定された期限を超えない限り使用できます。データのロードに数日かかる場合はどうなりますか?さらに、不安定な通信チャネルは、データのロードが停止して再開する状況につながります。これらの障害に直面した場合は、「データの再読み込み」アルゴリズムの1つを検討することをお勧めします。これは、最新のロードがロードされてからデータ変更のみが発生したことを意味します。

    CDC

    SQL Server 2008で、MicrosoftはChange Data Capture(CDC)と呼ばれるデータ追跡メカニズムを導入しました。大まかに言えば、このメカニズムの目的は、任意のデータベーステーブルに対してCDCを有効にすると、元のテーブルと同じ名前のシステムテーブルが同じデータベースに作成されることです(スキーマは次のようになります:プレフィックスとしての「cdc」と古いスキーマ名に「_」を加え、最後に「_CT」を付けます。たとえば、元のテーブルはdbo.Exampleであるため、システムテーブルの名前はcdc.dbo_Example_CTになります。変更されたすべてのデータが保存されます。

    実際、CDCをさらに深く掘り下げるために、例を考えてみましょう。ただし、最初に、CDCを使用するSQLエージェントがSQLServerテストインスタンスで機能することを確認してください。

    さらに、データベースとテストテーブルを作成し、このテーブルにデータを入力して、このテーブルのCDCを有効にするスクリプトを検討します。

    タスクを理解して簡素化するために、ソースデータベースとターゲットデータベースを異なるサーバーに分散せずに、1つのSQLServerインスタンスを使用します。

    use master
    go
    -- create a source database
    if not exists (select * from sys.databases where name = 'db_src_cdc')
    	create database db_src_cdc
    go
    use db_src_cdc
    go
    -- enable CDC if it is disabled
    if not exists (select * from sys.databases where name = db_name() and is_cdc_enabled=1)
    	exec sys.sp_cdc_enable_db
    go
    -- create a role for tables with CDC
    if not exists(select * from sys.sysusers where name = 'CDC_Reader' and issqlrole=1)
    	create role CDC_Reader
    go
    -- create a table
    if object_id('dbo.Example','U') is null
    	create table dbo.Example
    	(
    		ID int identity constraint PK_Example primary key,
    		Title varchar(200) not null
    	)
    go
    -- populate the table
    insert dbo.Example (Title) values
    ('One'),('Two'),('Three'),('Four'),('Five');
    go
    -- enable CDC for the table
    if not exists (select * from sys.tables where is_tracked_by_cdc = 1 and name = 'Example')
    	exec sys.sp_cdc_enable_table
    		@source_schema = 'dbo',
    		@source_name = 'Example',
    		@role_name = 'CDC_Reader'
    go
    -- populate the table with some data. We will change or delete something
    update dbo.Example
    set Title = reverse(Title)
    where ID in (2,3,4);
    
    delete from dbo.Example where ID in (1,2);
    
    set identity_insert dbo.Example on;
    insert dbo.Example (ID, Title) values
    (1,'One'),(6,'Six');
    set identity_insert dbo.Example off;
    go

    ここで、dbo.Exampleテーブルとcdc.dbo_Example_CTテーブルでこのスクリプトを実行した後の状態を見てみましょう(CDCは非同期であることに注意してください。データはテーブルに入力され、一定期間後に変更追跡が保存されます。 。

    select * from dbo.Example;
    ID Title
    ---- ----------------------
       1 One
       3 eerhT 
       4 ruoF 
       5 Five 
       6 Six
    select
    	row_number() over
    		(
    			partition by ID
    			order by
    				__$start_lsn desc,
    				__$seqval desc
    		) as __$rn,
    	*
    from cdc.dbo_Example_CT;
    __$rn __$start_lsn           __$end_lsn  __$seqval              __$operation __$update_mask    ID Title
    ------ ---------------------- ----------- ---------------------- ------------ ---------------- --- -----------
         1 0x0000003A000000580005 NULL        0x0000003A000000580003            2 0x03               1 One
         2 0x0000003A000000560006 NULL        0x0000003A000000560002            1 0x03               1 One
         1 0x0000003A000000560006 NULL        0x0000003A000000560005            1 0x03               2 owT
         2 0x0000003A000000540005 NULL        0x0000003A000000540002            3 0x02               2 Two
         3 0x0000003A000000540005 NULL        0x0000003A000000540002            4 0x02               2 owT
         1 0x0000003A000000540005 NULL        0x0000003A000000540003            3 0x02               3 Three
         2 0x0000003A000000540005 NULL        0x0000003A000000540003            4 0x02               3 eerhT
         1 0x0000003A000000540005 NULL        0x0000003A000000540004            3 0x02               4 Four
         2 0x0000003A000000540005 NULL        0x0000003A000000540004            4 0x02               4 ruoF
         1 0x0000003A000000580005 NULL        0x0000003A000000580004            2 0x03

    変更追跡が格納されているテーブル構造を詳細に検討してください。 __$start_lsnフィールドと__$seqvalフィールドは、それぞれLSN(データベース内のログシーケンス番号)とトランザクション内のトランザクション番号です。これらのフィールドには重要な特性があります。つまり、LSNが高いレコードが後で実行されることを確認できます。このプロパティにより、クエリ内の各レコードの最新の状態を簡単に取得し、条件で選択をフィルタリングできます。ここで、__ $ rn=1です。

    __ $ operationフィールドには、トランザクションコードが含まれています:

    • 1 –レコードが削除されます
    • 2 –レコードが挿入されます
    • 3、4 –レコードが更新されます。更新前の古いデータは3、新しいデータは4です。

    プレフィックス«__$»のサービスフィールドに加えて、元のテーブルのフィールドは完全に複製されます。この情報は、増分ロードに進むのに十分です。

    データをロードするためのデータベースのセットアップ

    テストターゲットデータベースに、データがロードされるテーブルと、ロードログに関するデータを格納するための追加のテーブルを作成します。

    use master
    go
    -- create a target database 
    if not exists (select * from sys.databases where name = 'db_dst_cdc')
    	create database db_dst_cdc
    go
    use db_dst_cdc
    go
    -- create a table
    if object_id('dbo.Example','U') is null
    	create table dbo.Example
    	(
    		ID int constraint PK_Example primary key,
    		Title varchar(200) not null
    	)
    go
    -- create a table to store the load log
    if object_id('dbo.log_cdc','U') is null
    	create table dbo.log_cdc
    	(
    		table_name nvarchar(512) not null,
    		dt datetime not null default getdate(),
    		lsn binary(10) not null default(0x0),
    		constraint pk_log_cdc primary key (table_name,dt desc)
    	)
    go

    LOG_CDCテーブルのフィールドに注意を向けたいと思います:

    • TABLE_NAMEは、ロードされたテーブルに関する情報を格納します(将来、異なるデータベースから、または異なるサーバーからでも、複数のテーブルをロードできます。テーブルの形式は「SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME」です
    • DTは、ロード日時のフィールドであり、増分ロードではオプションです。ただし、読み込みの監査には役立ちます。
    • LSN –テーブルがロードされた後、必要に応じて、次のロードを開始する場所に関する情報を保存する必要があります。したがって、ロードするたびに、最新の(最大)__$start_lsnをこの列に追加します。

    データ読み込みのアルゴリズム

    上記のように、クエリを使用すると、ウィンドウ関数を使用してテーブルの最新の状態を取得できます。最新のロードのLSNがわかっている場合、次にロードするときに、少なくとも1つの完全な前のロードがあった場合、その変更が保存されたLSNよりも大きいすべてのデータをソースからフィルタリングできます。

    with incr_Example as
    (
    	select
    		row_number() over
    			(
    				partition by ID
    				order by
    					__$start_lsn desc,
    					__$seqval desc
    			) as __$rn,
    		*
    	from db_src_cdc.cdc.dbo_Example_CT
    	where
    		__$operation <> 3 and
    		__$start_lsn > @lsn
    )
    select * from incr_Example

    次に、ロードLSNが保存されていない場合、完全なロードのすべてのレコードを取得できます。

    with incr_Example as
    (
    	select
    		row_number() over
    			(
    				partition by ID
    				order by
    					__$start_lsn desc,
    					__$seqval desc
    			) as __$rn,
    		*
    	from db_src_cdc.cdc.dbo_Example_CT
    	where
    		__$operation <> 3 and
    		__$start_lsn > @lsn
    )
    , full_Example as
    (
    	select *
    	from db_src_cdc.dbo.Example
    	where @lsn is null
    )
    select
    	ID, Title, __$operation
    from incr_Example
    where __$rn = 1
    union all
    select
    	ID, Title, 2 as __$operation
    from full_Example

    したがって、@ LSN値に応じて、このクエリは、ステータスが削除されたかどうかに関係なく、すべての最新の変更(暫定的な変更をバイパス)、またはステータス2(新しいレコード)を追加した元のテーブルのすべてのデータを表示します–このフィールド2つの選択を統合するためにのみ使用されます。このクエリを使用すると、MERGEコマンド(SQL 2008バージョン以降)を使用して、フルロードまたはリロードを簡単に実装できます。

    代替プロセスを作成する可能性のあるボトルネックを回避し、異なるテーブルから一致するデータをロードするために(将来、複数のテーブルをロードし、場合によってはそれらの間に関係関係がある可能性があります)、ソースデータベースでDBスナップショットを使用することをお勧めします(別のSQL2008機能)。

    ロードの全文は次のとおりです。

    [expand title =” Code”]

    /*
    	Algorithm of data loading
    */
    -- create a database snapshot
    if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
    	drop database db_src_cdc_ss;
    
    declare
    	@query nvarchar(max);
    
    select
    	@query = N'create database db_src_cdc_ss on ( name = N'''+name+
    		''', filename = N'''+[filename]+'.ss'' ) as snapshot of db_src_cdc'
    from db_src_cdc.sys.sysfiles where groupid = 1;
    
    exec ( @query );
    
    -- read LSN from the previous load
    declare @lsn binary(10) =
    	(select max(lsn) from db_dst_cdc.dbo.log_cdc
    	where table_name = 'localhost.db_src_cdc.dbo.Example');
    
    -- clear a table before the complete load
    if @lsn is null truncate table db_dst_cdc.dbo.Example;
    
    -- load process
    with incr_Example as
    (
    	select
    		row_number() over
    			(
    				partition by ID
    				order by
    					__$start_lsn desc,
    					__$seqval desc
    			) as __$rn,
    		*
    	from db_src_cdc_ss.cdc.dbo_Example_CT
    	where
    		__$operation <> 3 and
    		__$start_lsn > @lsn
    )
    , full_Example as
    (
    	select *
    	from db_src_cdc_ss.dbo.Example
    	where @lsn is null
    )
    , cte_Example as
    (
    	select
    		ID, Title, __$operation
    	from incr_Example
    	where __$rn = 1
    	union all
    	select
    		ID, Title, 2 as __$operation
    	from full_Example
    )
    merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.ID
    when matched and __$operation = 1 then delete
    when matched and __$operation <> 1 then update set trg.Title = src.Title
    when not matched by target and __$operation <> 1 then insert (ID, Title) values (src.ID, src.Title);
    
    -- mark the end of the load process and the latest LSN
    insert db_dst_cdc.dbo.log_cdc (table_name, lsn)
    values ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0))
    
    -- delete the database snapshot
    if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
    	drop database db_src_cdc_ss

    [/エキスパンド]


    1. PostgreSQLの関数名は大文字と小文字を区別しませんか?

    2. SQL Serverで特定の日付より後のすべての日付をクエリするにはどうすればよいですか?

    3. ETLプロセスでのPythonとMySQLの使用:SQLAlchemy

    4. 12c廃止された機能