前回の記事では、SQLServerでServiceBrokerを使用して非同期処理を実装することの利点について、長いタスクの分離処理に存在する他の方法よりも優れていることについて説明しました。この記事では、単一のデータベースで基本的なService Broker構成用に構成する必要のあるすべてのコンポーネントと、ブローカーサービス間の会話管理に関する重要な考慮事項について説明します。開始するには、データベースを作成し、ServiceBrokerで使用できるようにデータベースを有効にする必要があります。
CREATE DATABASE AsyncProcessingDemo; GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0 BEGIN ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER; END GO USE AsyncProcessingDemo; GO
ブローカーコンポーネントの構成
データベースに作成する必要のある基本的なオブジェクトは、メッセージのメッセージタイプ、サービス間でのメッセージの送信方法を定義するコントラクト、キューとイニシエーターサービス、およびキューとターゲットサービスです。 Service Brokerのオンラインの多くの例は、Service Brokerのメッセージタイプ、コントラクト、およびサービスの複雑なオブジェクト命名を示しています。ただし、名前を複雑にする必要はなく、任意のオブジェクトに単純なオブジェクト名を使用できます。
メッセージの場合、リクエストのメッセージタイプを作成する必要があります。これはAsyncRequest
と呼ばれます。 、および結果のメッセージタイプ。これはAsyncResult
と呼ばれます。 。どちらも、ブローカーサービスによって正しく形成されたものとして検証されるXMLを使用して、サービスに必要なデータを送受信します。
-- Create the message types CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE [AsyncResult] VALIDATION = WELL_FORMED_XML;
コントラクトは、AsyncRequest
が指定されています 開始サービスによってターゲットサービスに送信され、ターゲットサービスはAsyncResult
を返します。 開始サービスにメッセージを返します。コントラクトでは、イニシエーターとターゲットに複数のメッセージタイプを指定することも、特定の処理で必要な場合は、特定のメッセージタイプを任意のサービスから送信することもできます。
-- Create the contract CREATE CONTRACT [AsyncContract] ( [AsyncRequest] SENT BY INITIATOR, [AsyncResult] SENT BY TARGET );を作成します
サービスごとに、サービスが受信したメッセージのストレージを提供するキューを作成する必要があります。リクエストが送信されるターゲットサービスは、AsyncContract
を指定して作成する必要があります メッセージをサービスに送信できるようにします。この場合、サービスの名前はProcessingService
です。 ProcessingQueue
に作成されます データベース内。開始サービスでは、コントラクトを指定する必要がないため、開始された会話に応答するメッセージのみを受信できます。
-- Create the processing queue and service - specify the contract to allow sending to the service CREATE QUEUE ProcessingQueue; CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Create the request queue and service CREATE QUEUE RequestQueue; CREATE SERVICE [RequestService] ON QUEUE RequestQueue;
処理のためのメッセージの送信
前回の記事で説明したように、ブローカーサービスに新しいメッセージを送信するためのラッパーストアドプロシージャを実装することを好みます。これにより、必要に応じてパフォーマンスを拡張するために一度変更できます。この手順は、新しい会話を作成し、メッセージをProcessingService
に送信するための単純なラッパーです。 。
-- Create the wrapper procedure for sending messages CREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XML AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; BEGIN TRANSACTION; BEGIN DIALOG CONVERSATION @conversation_handle FROM SERVICE @FromService TO SERVICE @ToService ON CONTRACT @Contract WITH ENCRYPTION = OFF; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE @MessageType(@MessageBody); COMMIT TRANSACTION; END GO
ラッパーストアドプロシージャを使用して、テストメッセージをProcessingService
に送信できるようになりました。 ブローカーサービスが正しく設定されていることを検証します。
-- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO
メッセージの処理
ProcessingQueue
からのメッセージを手動で処理することもできますが 、メッセージがProcessingService
に送信されるときに、メッセージが自動的に処理されるようにする必要があります。 。これを行うには、アクティブ化ストアドプロシージャを作成する必要があります。これをテストし、後でキューにバインドして、キューのアクティブ化時の処理を自動化します。メッセージを処理するには、RECEIVE
する必要があります トランザクション内のキューからのメッセージ、およびメッセージのメッセージタイプと会話ハンドル。メッセージタイプは、適切なロジックが処理中のメッセージに適用されることを保証し、会話ハンドルにより、メッセージが処理されたときに応答を開始サービスに送り返すことができます。
RECEIVE
コマンドを使用すると、同じ会話ハンドルまたはグループ内の1つまたは複数のメッセージを1つのトランザクションで処理できます。複数のメッセージを処理するには、テーブル変数を使用する必要があります。単一のメッセージ処理を実行するには、ローカル変数を使用できます。以下のアクティベーション手順では、キューから1つのメッセージを取得し、メッセージタイプをチェックして、それがAsyncRequest
であるかどうかを判断します。 メッセージを送信し、受信したメッセージ情報に基づいて長時間実行プロセスを実行します。ループ内でメッセージを受信しない場合、ループを終了して実行を終了する前に、別のメッセージがキューに入るまで最大5000ミリ秒(5秒)待機します。メッセージを処理した後、AsyncResult
を作成します メッセージを送信し、メッセージの受信元と同じ会話ハンドルでイニシエーターに送り返します。このプロシージャは、メッセージタイプもチェックして、EndDialog
かどうかを判断します。 またはError
会話を終了してクリーンアップするためのメッセージを受信しました。
-- Create processing procedure for processing queue CREATE PROCEDURE dbo.ProcessingQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM ProcessingQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncRequest' BEGIN -- Handle complex long processing here -- For demonstration we'll pull the account number and send a reply back only DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT'); -- Build reply message and send back DECLARE @reply_message_body XML = N' ' + CAST(@AccountNumber AS NVARCHAR(11)) + ' '; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE [AsyncResult] (@reply_message_body); END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Log the error code and perform any required handling here -- End the conversation for the error END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
RequestQueue
送信されたメッセージも処理する必要があるため、AsyncResult
を処理するための追加の手順 ProcessingQueueActivationプロシージャによって返されるメッセージを作成する必要があります。 AsnycResultメッセージはすべての処理作業が完了したことを意味することがわかっているため、そのメッセージを処理すると会話を終了できます。これにより、EndDialogメッセージがProcessingServiceに送信され、そのアクティベーション手順によって処理されて、会話はすべてを片付け、火事を避け、会話が適切に終了したときに発生する問題を忘れます。
-- Create procedure for processing replies to the request queue CREATE PROCEDURE dbo.RequestQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM RequestQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncResult' BEGIN -- If necessary handle the reply message here DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Since this is all the work being done, end the conversation to send the EndDialog message END CONVERSATION @conversation_handle; END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
手順のテスト
サービスのキュー処理を自動化する前に、アクティベーション手順をテストして、メッセージが適切に処理されることを確認し、適切に処理されないエラーが発生した場合にキューが無効にならないようにすることが重要です。 ProcessingQueue
にはすでにメッセージがあるため ProcessingQueueActivation
プロシージャを実行して、そのメッセージを処理できます。 WAITFOR
メッセージがキューからすぐに処理されたとしても、プロシージャが終了するのに5秒かかります。メッセージを処理した後、RequestQueue
にクエリを実行することで、プロシージャが正しく機能したことを確認できます。 AsyncResult
かどうかを確認する メッセージが存在し、RequestQueueActivation
であることを確認できます プロシージャは実行することで正しく機能します。
-- Process the message from the processing queue EXECUTE dbo.ProcessingQueueActivation; GO -- Check for reply message on request queue SELECT CAST(message_body AS XML) FROM RequestQueue; GO -- Process the message from the request queue EXECUTE dbo.RequestQueueActivation; GO
処理の自動化
この時点で、すべてのコンポーネントが完了し、処理が完全に自動化されています。残っているのは、アクティベーションプロシージャを適切なキューにバインドし、別のテストメッセージを送信して、それが処理され、その後キューに何も残っていないことを検証することだけです。
-- Alter the processing queue to specify internal activation ALTER QUEUE ProcessingQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.ProcessingQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Alter the request queue to specify internal activation ALTER QUEUE RequestQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.RequestQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Test automated activation -- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO -- Check for reply message on request queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM RequestQueue; GO
概要
SQL Server Service Brokerの自動非同期処理の基本コンポーネントは、単一のデータベースセットアップで構成して、長時間実行されるタスクの分離処理を可能にすることができます。これは、エンドユーザーのアプリケーションとの対話から処理を切り離すことにより、エンドユーザーのエクスペリエンスからアプリケーションのパフォーマンスを向上させるための強力なツールになります。