この記事は、データ統合に対処しなければならないことが多い人にとって興味深いものになります。
はじめに
ユーザーが常にデータを変更(更新または削除)するデータベースがあると想定します。おそらく、このデータベースは、テーブル構造の変更を許可しない大規模なアプリケーションによって使用されます。タスクは、このデータベースから別のサーバー上の別のデータベースにデータを時々ロードすることです。この問題に取り組む最も簡単な方法は、ターゲットデータベースを事前にクリーンアップして、ソースデータベースからターゲットデータベースに新しいデータをロードすることです。この方法は、データの読み込み時間が許容範囲内であり、事前に設定された期限を超えない限り使用できます。データのロードに数日かかる場合はどうなりますか?さらに、不安定な通信チャネルは、データのロードが停止して再開する状況につながります。これらの障害に直面した場合は、「データの再読み込み」アルゴリズムの1つを検討することをお勧めします。これは、最新のロードがロードされてからデータ変更のみが発生したことを意味します。
CDC
SQL Server 2008で、MicrosoftはChange Data Capture(CDC)と呼ばれるデータ追跡メカニズムを導入しました。大まかに言えば、このメカニズムの目的は、任意のデータベーステーブルに対してCDCを有効にすると、元のテーブルと同じ名前のシステムテーブルが同じデータベースに作成されることです(スキーマは次のようになります:プレフィックスとしての「cdc」と古いスキーマ名に「_」を加え、最後に「_CT」を付けます。たとえば、元のテーブルはdbo.Exampleであるため、システムテーブルの名前はcdc.dbo_Example_CTになります。変更されたすべてのデータが保存されます。
実際、CDCをさらに深く掘り下げるために、例を考えてみましょう。ただし、最初に、CDCを使用するSQLエージェントがSQLServerテストインスタンスで機能することを確認してください。
さらに、データベースとテストテーブルを作成し、このテーブルにデータを入力して、このテーブルのCDCを有効にするスクリプトを検討します。
タスクを理解して簡素化するために、ソースデータベースとターゲットデータベースを異なるサーバーに分散せずに、1つのSQLServerインスタンスを使用します。
use master
go
-- create a source database
if not exists (select * from sys.databases where name = 'db_src_cdc')
create database db_src_cdc
go
use db_src_cdc
go
-- enable CDC if it is disabled
if not exists (select * from sys.databases where name = db_name() and is_cdc_enabled=1)
exec sys.sp_cdc_enable_db
go
-- create a role for tables with CDC
if not exists(select * from sys.sysusers where name = 'CDC_Reader' and issqlrole=1)
create role CDC_Reader
go
-- create a table
if object_id('dbo.Example','U') is null
create table dbo.Example
(
ID int identity constraint PK_Example primary key,
Title varchar(200) not null
)
go
-- populate the table
insert dbo.Example (Title) values
('One'),('Two'),('Three'),('Four'),('Five');
go
-- enable CDC for the table
if not exists (select * from sys.tables where is_tracked_by_cdc = 1 and name = 'Example')
exec sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'Example',
@role_name = 'CDC_Reader'
go
-- populate the table with some data. We will change or delete something
update dbo.Example
set Title = reverse(Title)
where ID in (2,3,4);
delete from dbo.Example where ID in (1,2);
set identity_insert dbo.Example on;
insert dbo.Example (ID, Title) values
(1,'One'),(6,'Six');
set identity_insert dbo.Example off;
go ここで、dbo.Exampleテーブルとcdc.dbo_Example_CTテーブルでこのスクリプトを実行した後の状態を見てみましょう(CDCは非同期であることに注意してください。データはテーブルに入力され、一定期間後に変更追跡が保存されます。 。
select * from dbo.Example;
ID Title ---- ---------------------- 1 One 3 eerhT 4 ruoF 5 Five 6 Six
select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Title
------ ---------------------- ----------- ---------------------- ------------ ---------------- --- -----------
1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 One
2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 One
1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 owT
2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two
3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT
1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three
2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT
1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Four
2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF
1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0x03 変更追跡が格納されているテーブル構造を詳細に検討してください。 __$start_lsnフィールドと__$seqvalフィールドは、それぞれLSN(データベース内のログシーケンス番号)とトランザクション内のトランザクション番号です。これらのフィールドには重要な特性があります。つまり、LSNが高いレコードが後で実行されることを確認できます。このプロパティにより、クエリ内の各レコードの最新の状態を簡単に取得し、条件で選択をフィルタリングできます。ここで、__ $ rn=1です。
__ $ operationフィールドには、トランザクションコードが含まれています:
- 1 –レコードが削除されます
- 2 –レコードが挿入されます
- 3、4 –レコードが更新されます。更新前の古いデータは3、新しいデータは4です。
プレフィックス«__$»のサービスフィールドに加えて、元のテーブルのフィールドは完全に複製されます。この情報は、増分ロードに進むのに十分です。
データをロードするためのデータベースのセットアップ
テストターゲットデータベースに、データがロードされるテーブルと、ロードログに関するデータを格納するための追加のテーブルを作成します。
use master
go
-- create a target database
if not exists (select * from sys.databases where name = 'db_dst_cdc')
create database db_dst_cdc
go
use db_dst_cdc
go
-- create a table
if object_id('dbo.Example','U') is null
create table dbo.Example
(
ID int constraint PK_Example primary key,
Title varchar(200) not null
)
go
-- create a table to store the load log
if object_id('dbo.log_cdc','U') is null
create table dbo.log_cdc
(
table_name nvarchar(512) not null,
dt datetime not null default getdate(),
lsn binary(10) not null default(0x0),
constraint pk_log_cdc primary key (table_name,dt desc)
)
go LOG_CDCテーブルのフィールドに注意を向けたいと思います:
- TABLE_NAMEは、ロードされたテーブルに関する情報を格納します(将来、異なるデータベースから、または異なるサーバーからでも、複数のテーブルをロードできます。テーブルの形式は「SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME」です
- DTは、ロード日時のフィールドであり、増分ロードではオプションです。ただし、読み込みの監査には役立ちます。
- LSN –テーブルがロードされた後、必要に応じて、次のロードを開始する場所に関する情報を保存する必要があります。したがって、ロードするたびに、最新の(最大)__$start_lsnをこの列に追加します。
データ読み込みのアルゴリズム
上記のように、クエリを使用すると、ウィンドウ関数を使用してテーブルの最新の状態を取得できます。最新のロードのLSNがわかっている場合、次にロードするときに、少なくとも1つの完全な前のロードがあった場合、その変更が保存されたLSNよりも大きいすべてのデータをソースからフィルタリングできます。
with incr_Example as ( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn > @lsn ) select * from incr_Example
次に、ロードLSNが保存されていない場合、完全なロードのすべてのレコードを取得できます。
with incr_Example as ( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn > @lsn ) , full_Example as ( select * from db_src_cdc.dbo.Example where @lsn is null ) select ID, Title, __$operation from incr_Example where __$rn = 1 union all select ID, Title, 2 as __$operation from full_Example
したがって、@ LSN値に応じて、このクエリは、ステータスが削除されたかどうかに関係なく、すべての最新の変更(暫定的な変更をバイパス)、またはステータス2(新しいレコード)を追加した元のテーブルのすべてのデータを表示します–このフィールド2つの選択を統合するためにのみ使用されます。このクエリを使用すると、MERGEコマンド(SQL 2008バージョン以降)を使用して、フルロードまたはリロードを簡単に実装できます。
代替プロセスを作成する可能性のあるボトルネックを回避し、異なるテーブルから一致するデータをロードするために(将来、複数のテーブルをロードし、場合によってはそれらの間に関係関係がある可能性があります)、ソースデータベースでDBスナップショットを使用することをお勧めします(別のSQL2008機能)。
ロードの全文は次のとおりです。
[expand title =” Code”]
/*
Algorithm of data loading
*/
-- create a database snapshot
if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
drop database db_src_cdc_ss;
declare
@query nvarchar(max);
select
@query = N'create database db_src_cdc_ss on ( name = N'''+name+
''', filename = N'''+[filename]+'.ss'' ) as snapshot of db_src_cdc'
from db_src_cdc.sys.sysfiles where groupid = 1;
exec ( @query );
-- read LSN from the previous load
declare @lsn binary(10) =
(select max(lsn) from db_dst_cdc.dbo.log_cdc
where table_name = 'localhost.db_src_cdc.dbo.Example');
-- clear a table before the complete load
if @lsn is null truncate table db_dst_cdc.dbo.Example;
-- load process
with incr_Example as
(
select
row_number() over
(
partition by ID
order by
__$start_lsn desc,
__$seqval desc
) as __$rn,
*
from db_src_cdc_ss.cdc.dbo_Example_CT
where
__$operation <> 3 and
__$start_lsn > @lsn
)
, full_Example as
(
select *
from db_src_cdc_ss.dbo.Example
where @lsn is null
)
, cte_Example as
(
select
ID, Title, __$operation
from incr_Example
where __$rn = 1
union all
select
ID, Title, 2 as __$operation
from full_Example
)
merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.ID
when matched and __$operation = 1 then delete
when matched and __$operation <> 1 then update set trg.Title = src.Title
when not matched by target and __$operation <> 1 then insert (ID, Title) values (src.ID, src.Title);
-- mark the end of the load process and the latest LSN
insert db_dst_cdc.dbo.log_cdc (table_name, lsn)
values ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0))
-- delete the database snapshot
if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
drop database db_src_cdc_ss [/エキスパンド]