前回の記事では、SQLServerでServiceBrokerを使用して非同期処理を実装することの利点について、長いタスクの分離処理に存在する他の方法よりも優れていることについて説明しました。この記事では、単一のデータベースで基本的なService Broker構成用に構成する必要のあるすべてのコンポーネントと、ブローカーサービス間の会話管理に関する重要な考慮事項について説明します。開始するには、データベースを作成し、ServiceBrokerで使用できるようにデータベースを有効にする必要があります。
CREATE DATABASE AsyncProcessingDemo; GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0 BEGIN ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER; END GO USE AsyncProcessingDemo; GO
ブローカーコンポーネントの構成
データベースに作成する必要のある基本的なオブジェクトは、メッセージのメッセージタイプ、サービス間でのメッセージの送信方法を定義するコントラクト、キューとイニシエーターサービス、およびキューとターゲットサービスです。 Service Brokerのオンラインの多くの例は、Service Brokerのメッセージタイプ、コントラクト、およびサービスの複雑なオブジェクト命名を示しています。ただし、名前を複雑にする必要はなく、任意のオブジェクトに単純なオブジェクト名を使用できます。
メッセージの場合、リクエストのメッセージタイプを作成する必要があります。これはAsyncRequestと呼ばれます。 、および結果のメッセージタイプ。これはAsyncResultと呼ばれます。 。どちらも、ブローカーサービスによって正しく形成されたものとして検証されるXMLを使用して、サービスに必要なデータを送受信します。
-- Create the message types CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE [AsyncResult] VALIDATION = WELL_FORMED_XML;
コントラクトは、AsyncRequestが指定されています 開始サービスによってターゲットサービスに送信され、ターゲットサービスはAsyncResultを返します。 開始サービスにメッセージを返します。コントラクトでは、イニシエーターとターゲットに複数のメッセージタイプを指定することも、特定の処理で必要な場合は、特定のメッセージタイプを任意のサービスから送信することもできます。
-- Create the contract CREATE CONTRACT [AsyncContract] ( [AsyncRequest] SENT BY INITIATOR, [AsyncResult] SENT BY TARGET );を作成します
サービスごとに、サービスが受信したメッセージのストレージを提供するキューを作成する必要があります。リクエストが送信されるターゲットサービスは、AsyncContractを指定して作成する必要があります メッセージをサービスに送信できるようにします。この場合、サービスの名前はProcessingServiceです。 ProcessingQueueに作成されます データベース内。開始サービスでは、コントラクトを指定する必要がないため、開始された会話に応答するメッセージのみを受信できます。
-- Create the processing queue and service - specify the contract to allow sending to the service CREATE QUEUE ProcessingQueue; CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Create the request queue and service CREATE QUEUE RequestQueue; CREATE SERVICE [RequestService] ON QUEUE RequestQueue;
処理のためのメッセージの送信
前回の記事で説明したように、ブローカーサービスに新しいメッセージを送信するためのラッパーストアドプロシージャを実装することを好みます。これにより、必要に応じてパフォーマンスを拡張するために一度変更できます。この手順は、新しい会話を作成し、メッセージをProcessingServiceに送信するための単純なラッパーです。 。
-- Create the wrapper procedure for sending messages
CREATE PROCEDURE dbo.SendBrokerMessage
@FromService SYSNAME,
@ToService SYSNAME,
@Contract SYSNAME,
@MessageType SYSNAME,
@MessageBody XML
AS
BEGIN
SET NOCOUNT ON;
DECLARE @conversation_handle UNIQUEIDENTIFIER;
BEGIN TRANSACTION;
BEGIN DIALOG CONVERSATION @conversation_handle
FROM SERVICE @FromService
TO SERVICE @ToService
ON CONTRACT @Contract
WITH ENCRYPTION = OFF;
SEND ON CONVERSATION @conversation_handle
MESSAGE TYPE @MessageType(@MessageBody);
COMMIT TRANSACTION;
END
GO
ラッパーストアドプロシージャを使用して、テストメッセージをProcessingServiceに送信できるようになりました。 ブローカーサービスが正しく設定されていることを検証します。
-- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO
メッセージの処理
ProcessingQueueからのメッセージを手動で処理することもできますが 、メッセージがProcessingServiceに送信されるときに、メッセージが自動的に処理されるようにする必要があります。 。これを行うには、アクティブ化ストアドプロシージャを作成する必要があります。これをテストし、後でキューにバインドして、キューのアクティブ化時の処理を自動化します。メッセージを処理するには、RECEIVEする必要があります トランザクション内のキューからのメッセージ、およびメッセージのメッセージタイプと会話ハンドル。メッセージタイプは、適切なロジックが処理中のメッセージに適用されることを保証し、会話ハンドルにより、メッセージが処理されたときに応答を開始サービスに送り返すことができます。
RECEIVE コマンドを使用すると、同じ会話ハンドルまたはグループ内の1つまたは複数のメッセージを1つのトランザクションで処理できます。複数のメッセージを処理するには、テーブル変数を使用する必要があります。単一のメッセージ処理を実行するには、ローカル変数を使用できます。以下のアクティベーション手順では、キューから1つのメッセージを取得し、メッセージタイプをチェックして、それがAsyncRequestであるかどうかを判断します。 メッセージを送信し、受信したメッセージ情報に基づいて長時間実行プロセスを実行します。ループ内でメッセージを受信しない場合、ループを終了して実行を終了する前に、別のメッセージがキューに入るまで最大5000ミリ秒(5秒)待機します。メッセージを処理した後、AsyncResultを作成します メッセージを送信し、メッセージの受信元と同じ会話ハンドルでイニシエーターに送り返します。このプロシージャは、メッセージタイプもチェックして、EndDialogかどうかを判断します。 またはError 会話を終了してクリーンアップするためのメッセージを受信しました。
-- Create processing procedure for processing queue
CREATE PROCEDURE dbo.ProcessingQueueActivation
AS
BEGIN
SET NOCOUNT ON;
DECLARE @conversation_handle UNIQUEIDENTIFIER;
DECLARE @message_body XML;
DECLARE @message_type_name sysname;
WHILE (1=1)
BEGIN
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP (1)
@conversation_handle = conversation_handle,
@message_body = CAST(message_body AS XML),
@message_type_name = message_type_name
FROM ProcessingQueue
), TIMEOUT 5000;
IF (@@ROWCOUNT = 0)
BEGIN
ROLLBACK TRANSACTION;
BREAK;
END
IF @message_type_name = N'AsyncRequest'
BEGIN
-- Handle complex long processing here
-- For demonstration we'll pull the account number and send a reply back only
DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT');
-- Build reply message and send back
DECLARE @reply_message_body XML = N'
' + CAST(@AccountNumber AS NVARCHAR(11)) + '
';
SEND ON CONVERSATION @conversation_handle
MESSAGE TYPE [AsyncResult] (@reply_message_body);
END
-- If end dialog message, end the dialog
ELSE IF @message_type_name = N'https://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @conversation_handle;
END
-- If error message, log and end conversation
ELSE IF @message_type_name = N'https://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
-- Log the error code and perform any required handling here
-- End the conversation for the error
END CONVERSATION @conversation_handle;
END
COMMIT TRANSACTION;
END
END
GO
RequestQueue 送信されたメッセージも処理する必要があるため、AsyncResultを処理するための追加の手順 ProcessingQueueActivationプロシージャによって返されるメッセージを作成する必要があります。 AsnycResultメッセージはすべての処理作業が完了したことを意味することがわかっているため、そのメッセージを処理すると会話を終了できます。これにより、EndDialogメッセージがProcessingServiceに送信され、そのアクティベーション手順によって処理されて、会話はすべてを片付け、火事を避け、会話が適切に終了したときに発生する問題を忘れます。
-- Create procedure for processing replies to the request queue
CREATE PROCEDURE dbo.RequestQueueActivation
AS
BEGIN
SET NOCOUNT ON;
DECLARE @conversation_handle UNIQUEIDENTIFIER;
DECLARE @message_body XML;
DECLARE @message_type_name sysname;
WHILE (1=1)
BEGIN
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP (1)
@conversation_handle = conversation_handle,
@message_body = CAST(message_body AS XML),
@message_type_name = message_type_name
FROM RequestQueue
), TIMEOUT 5000;
IF (@@ROWCOUNT = 0)
BEGIN
ROLLBACK TRANSACTION;
BREAK;
END
IF @message_type_name = N'AsyncResult'
BEGIN
-- If necessary handle the reply message here
DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT');
-- Since this is all the work being done, end the conversation to send the EndDialog message
END CONVERSATION @conversation_handle;
END
-- If end dialog message, end the dialog
ELSE IF @message_type_name = N'https://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @conversation_handle;
END
-- If error message, log and end conversation
ELSE IF @message_type_name = N'https://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
END CONVERSATION @conversation_handle;
END
COMMIT TRANSACTION;
END
END
GO 手順のテスト
サービスのキュー処理を自動化する前に、アクティベーション手順をテストして、メッセージが適切に処理されることを確認し、適切に処理されないエラーが発生した場合にキューが無効にならないようにすることが重要です。 ProcessingQueueにはすでにメッセージがあるため ProcessingQueueActivation プロシージャを実行して、そのメッセージを処理できます。 WAITFOR メッセージがキューからすぐに処理されたとしても、プロシージャが終了するのに5秒かかります。メッセージを処理した後、RequestQueueにクエリを実行することで、プロシージャが正しく機能したことを確認できます。 AsyncResultかどうかを確認する メッセージが存在し、RequestQueueActivationであることを確認できます プロシージャは実行することで正しく機能します。
-- Process the message from the processing queue EXECUTE dbo.ProcessingQueueActivation; GO -- Check for reply message on request queue SELECT CAST(message_body AS XML) FROM RequestQueue; GO -- Process the message from the request queue EXECUTE dbo.RequestQueueActivation; GO
処理の自動化
この時点で、すべてのコンポーネントが完了し、処理が完全に自動化されています。残っているのは、アクティベーションプロシージャを適切なキューにバインドし、別のテストメッセージを送信して、それが処理され、その後キューに何も残っていないことを検証することだけです。
-- Alter the processing queue to specify internal activation
ALTER QUEUE ProcessingQueue
WITH ACTIVATION
(
STATUS = ON,
PROCEDURE_NAME = dbo.ProcessingQueueActivation,
MAX_QUEUE_READERS = 10,
EXECUTE AS SELF
);
GO
-- Alter the request queue to specify internal activation
ALTER QUEUE RequestQueue
WITH ACTIVATION
(
STATUS = ON,
PROCEDURE_NAME = dbo.RequestQueueActivation,
MAX_QUEUE_READERS = 10,
EXECUTE AS SELF
);
GO
-- Test automated activation
-- Send a request
EXECUTE dbo.SendBrokerMessage
@FromService = N'RequestService',
@ToService = N'ProcessingService',
@Contract = N'AsyncContract',
@MessageType = N'AsyncRequest',
@MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
-- Check for message on processing queue
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO
-- Check for reply message on request queue
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO 概要
SQL Server Service Brokerの自動非同期処理の基本コンポーネントは、単一のデータベースセットアップで構成して、長時間実行されるタスクの分離処理を可能にすることができます。これは、エンドユーザーのアプリケーションとの対話から処理を切り離すことにより、エンドユーザーのエクスペリエンスからアプリケーションのパフォーマンスを向上させるための強力なツールになります。