Creating Service Broker Applications - SQL Server 2008

Now that you’ve learned what Service Broker is and what it’s for, let’s delve into the details of creating Service Broker applications. The exact number of steps you need to perform to do this varies, for example, on whether the conversation takes place within a single instance of SQL Server or whether the messages that will be sent are defined by an XML schema. For a single-instance application with validated XML messages, ten steps are involved:

  1. Enable Service Broker for the database.
  2. Create a master key for the database.
  3. Create one or more XML SCHEMA COLLECTIONs to validate messages sent to the queue.
  4. Define message types based on the schema(s).
  5. Define the contract for the application.
  6. Create a Service Broker program (for example, a stored procedure) to process messages that are sent to the queue.
  7. Create the queue.
  8. Create the target service.
  9. Create the initiating service.
  10. Create a program to initiate the conversation by sending a message to the queue.

Let’s look at these individual steps before putting everything together into a simple Service Broker application. We’ll skip steps 2 and 3 until we actually get to the example, as there’s nothing specific to Service Broker about these, and we’ll cover creating the target and initiating services (steps 8 and 9) and their associated stored procedures together.

Enabling Service Broker

Before Service Broker can be used in a database, you need to alter the database to enable Service Broker:

ALTER DATABASE database_name SET ENABLE_BROKER;

There’s obviously some security risk in this, simply because it opens up another possible line of attack for a hacker trying to get into your database. However, unless you explicitly create a Service Broker endpoint, Service Broker won’t accept connections from outside the instance. You also need to enable Service Broker in a database if you want to use Database Mail within that database.

Creating Message Types

Before you can create a contract for your Service Broker application, you need to define the message types that can be used in the conversation. The following is the syntax for the CREATE MESSAGE TYPE command:

CREATE MESSAGE TYPE message_type_name
VALIDATION = validation_type

The VALIDATION clause has four possible options:

  • EMPTY: The message must not contain any data.
  • NONE: The message is not validated and may contain any or no data.
  • WELL_FORMED_XML: The message must consist of a well-formed XML document.
  • VALID_XML WITH SCHEMA COLLECTIONschema collection name: Messages of this type must Conform to the schema(s) contained in the specified collection. If amessage doesn’t conform to this schema, it will be rejected and not placed on the queue; instead, an XML-formatted error message will be sent to the queue.
  • Creating Contracts :A contract consists of a list of the message types that can be included in the conversation, together with the service(s) that can send them. The basic syntax for creating a contract is as follows:
  • CREATE CONTRACT contract_name message_type_name SENT BY sending_service [, ...]

The CREATE CONTRACT statement specifies the message types that a conversation can accept and the service(s) that can send those messages. For each message type, you must also include a SENT BY clause, which indicates the services that can send messages of that type. The following are the possible values for SENT BY:

  • INITIATOR: Only the service that started the conversation can post messages of this type.
  • TARGET: Only the service that processes the messages on the queue can post messages of this type.
  • ANY: Either of the two services in the conversation can send this type of message.

Instead of specifying a named message type, you can alternatively use the identifier [DEFAULT], which indicates a default message type (with no validation) sent by either of the two services. As we noted earlier, each contract must include at least one message type that may be sent by the initiator (for example, is SENT BY either INITIATOR or ANY).

Creating Queues:The command for creating a queue is perhaps the most complex of the new Transact-SQL (T-SQL) commands used to create Service Broker objects:

CREATE QUEUE queue_name
WITH
STATUS = ON | OFF,
RETENTION = ON | OFF,
ACTIVATION
(
STATUS = ON | OFF,
PROCEDURE_NAME = queue_sproc_name,
MAX_QUEUE_READERS = integer,
EXECUTE AS SELF | OWNER | user_name
)
ON [DEFAULT] | filegroup_name

The WITH clause is optional, but if included, it can have a number of subclauses (again, all of these are optional):

  • STATUS: This subclause indicates whether or not the queue is originally enabled.
  • RETENTION: If this subclause is set to OFF (the default), any messages that are processed will be removed from the queue. Otherwise, the messages will be left on the queue, but their status will be updated to indicate that they have been processed.
  • ACTIVATION: This subclause indicates whether a procedure will be executed on the arrival of a message in the queue. Here, you can specify the STATUS (whether the procedure will be activated automatically or not), the PROCEDURE_NAME (the SQL Server identifier for the service program), the maximum number of instances of the service program that can be created to process the queue, and the user to execute the procedure as. This can be one of SELF (the currently logged-in user), OWNER (the user account that owns the queue), or a username as a string.

Lastly, you can optionally specify an ON clause, which specifies the filegroup on which to create the queue. Alternatively, you can use [DEFAULT] to specify the default filegroup for the database.

Creating Services

You need to specify two pieces of information when you create a service: the queue with which the service is associated and the conversations in which it can participate. The conversations are specified through the contracts, so the CREATE SERVICE command looks like this:

CREATE SERVICE service_name
ON QUEUE queue_name
contract_name [, ...]

Contracts are used to indicate to other services which conversations the service can take part in, so they need to be supplied only if the service receives messages. If no contracts are specified, the service will be able to initiate conversations, but not to retrieve any responses.

Instead of naming a user-defined contract, you can use the identifier [DEFAULT] to specify the default contract, which allows either service to send unvalidated messages.

Creating Service Broker Stored Procedures

The most complex part of writing a Service Broker application, unsurprisingly, is creating the programs (typically stored procedures or triggers) that send and receive the messages. Initiating and target service applications obviously differ in that only the initiating service needs to begin a conversation, but otherwise the tasks that they need to perform are similar.

Initiating a Conversation

The first task in a conversation is for the initiating service to open up a dialog using the BEGIN DIALOGCONVERSATION command. This takes a local variable of type UNIQUEIDENTIFIER that will be populated with a handle you can use to identify the new conversation. You use this when you actually send a message on the conversation. BEGIN DIALOG CONVERSATION has three mandatory clauses:

FROM SERVICE: The SQL Server identifier for the initiator service. Note that this is not surrounded by quotes.

TO SERVICE: The name of the receiving service, as a string.

ON CONTRACT: The SQL Server identifier for the contract that binds the two services.

Optionally, you can also specify that this conversation will belong to an existing conversation group. To associate a new dialog with an existing conversation group, supply either the ID for that group or the ID for another conversation in the group:

BEGIN DIALOG CONVERSATION @dialogHandle
FROM SERVICE initiating_service
TO SERVICE receiving_service
ON CONTRACT contract_name
WITH RELATED_CONVERSATION = conversation_ID
or
...
WITH RELATED_CONVERSATION_GROUP = conversation_group_ID

You can include two other pieces of information in the WITH clause. You can specify a LIFETIME timeout period, after which the conversation will be closed. You also can include an ENCRYPTION option to state whether the message must be encrypted. By default, this is ON, which means that an error will occur if encryption isn’t correctly configured. If it is OFF, encryption will still be used if configured, but otherwise the messages will be sent unencrypted. Messages sent to another service in the same SQL Service instance are never encrypted.

Sending Messages to a Queue

Once your conversation dialog is open, you can start to send messages to the queue. To do this, use the SEND command:

SEND
ON CONVERSATION dialog_handle
MESSAGE TYPE message_type_name
(message_body)

The dialog handle will be the variable that you received from the BEGIN DIALOG CONVERSATION command or from a RECEIVE command that was used to retrieve messages from a queue. Both the message_type_name and thmessage_body can be omitted. The former can be omitted if the message is of the default type (requiring no validation); the message_body should be omitted if the message type is EMPTY. Otherwise, the message body is enclosed in parentheses, and it can contain any data, including binary data.

Retrieving Messages from a Queue

To retrieve a message from a queue, use the RECEIVE statement:

RECEIVE [TOP n]
column_name [, ...]
FROM queue_name
INTO table_variable
[WHERE conversation_handle = dialog_handle |conversation_group_id = conversation_group_id]

This retrieves the messages in the queue as rows into the supplied table variable. You can optionally limit the messages retrieved by specifying the maximum number of messages in the TOP clause and by restricting the messages returned to a specific conversation or conversation group.

The information about each message is contained in columns in the returned result set, and you specify the information you want to retrieve just as you do for a SELECT statement: by including the column names in the RECEIVE statement. The full list of columns in the result set returned from the RECEIVE command is shown in Table.

Columns Returned from the RECEIVE Command

Columns Returned from the RECEIVE Command
By default, the RECEIVE statement will return an empty result set if no messages are present in the queue. You can alter this behavior by wrapping the statement in a WAITFOR statement:

WAITFOR
(
RECEIVE ...
) [, TIMEOUT timeout]

This causes the RECEIVE statement to wait for the timeout period (in milliseconds) until a message arrives. If the timeout period expires, an empty result set will be returned. To wait indefinitely, omit the TIMEOUT clause, or specify a value of –1.

Ending a Conversation

When your application wants to close the conversation, both sides should explicitly end the dialog:

END CONVERSATION dialog_handle
[WITH ERROR = error_code DESCRIPTION = error_description]
[WITH CLEANUP]

Use the WITH ERROR clause if you want to throw an error, passing in an error code (of type int) and description (of type nvarchar(3000)). This will cause Service Broker to send an Error message to the queue, which can then be handled by the other participant in the conversation. If the END CONVERSATION command is issued without a WITH ERROR clause, Service Broker places an End Dialog message on the queue to inform the remote service that the conversation is closed. However, the
remote application still needs to end its side of the conversation. Once it has done this, Service Broker will remove all messages belonging to this conversation from the queue.

The WITH CLEANUP clause is used to remove any messages from a queue when it isn’t possible to end the conversation normally, usually because the remote service isn’t available. If the conversation is ended WITH CLEANUP, the remote service isn’t informed that the conversation is ending.

Conversation Timeouts

We stated earlier in the chapter that a Service Broker application can issue one conversation timer per dialog, which will cause Service Broker to place a Dialog Timer message in the queue after a specific timeout period has elapsed. To do this, use the BEGIN CONVERSATION TIMER command:

BEGIN CONVERSATION TIMER(dialog_handle)
TIMEOUT = timeout

Here, dialog_handle is the ID for the conversation that the timer will be placed on, and timeout is the timeout period in seconds, after which the message will be placed in the queue.

A Simple Service Broker Example

The easiest way to see how these components relate to each other is to walk through an example. To demonstrate the concepts, we’ll start with a very simple example. We use only one database, although Service Broker has been designed to aid asynchronous communications distributed across multiple databases. This Service Broker application will process vacation requests from employees. Employees will call a stored procedure, passing in their employee ID number, their e-mail address, the number of hours of vacation they want to take, and the start time and date they plan to take the vacation. This will send a message to a specially created queue. When this message is processed, you will merely perform some rudimentary validation, and then use Database Mail (which itself uses Service Broker) to send an e-mail to the employee, indicating whether the request was successful.

The Service Broker application will have the following components:

  • A MESSAGE TYPE to represent the XML-formatted messages that will be stored on the queue.
  • An XML SCHEMA COLLECTION that will be used to validate messages as they arrive on the queue.

A QUEUE to store the vacation requests before they are processed.

  • A CONTRACT to define the message types that can be stored on the queue and the services that can send messages to the queue.
  • One SERVICE that acts as the endpoint that initiates a conversation by sending a message to a queue and another that acts as the endpoint for retrieving messages from the queue.
  • A stored procedure that will be called by an end user (or, more likely, an application) to make a vacation request.
  • Another stored procedure for processing messages on the queue. This will be called automatically by Service Broker whenever messages arrive on the queue.

Before creating these objects, you need to enable Service Broker in the AdventureWorks database:

ALTER DATABASE AdventureWorks SET ENABLE_BROKER;

You then need to create a master key for the AdventureWorks database, which Service Broker will use as the session key for the conversation:

CREATE MASTER KEY
ENCRYPTION BY PASSWORD = 'sl38!Gk$^&wMv';

Next, create the schema that you will use to validate the holiday requests:

CREATE XML SCHEMA COLLECTION
[http://schemas.apress.com/AcceleratedSQL2008/HolidayRequestSchema]
AS N'<?xml version="1.0" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="vacationRequest">
<xs:complexType>
<xs:sequence minOccurs="1" maxOccurs="1">
<xs:element name="employeeId" type="xs:integer" />
<xs:element name="email" type="xs:string" />
<xs:element name="startTime" type="xs:dateTime" />
<xs:element name="hours" type="xs:integer" />
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>';

Note that, as its name suggests, an XML SCHEMA COLLECTION object can hold more than one schema, so you can validate an XML document against multiple schemas in one go. In this case, however, you just need to ensure that your messages meet the simple criteria laid out in this single schema. Also notice that, following Microsoft’s practice, you’ve named the schema using a Uniform Resource Name (URN).

This schema specifies that each message will consist of a root element called <vacationRequest>, which contains one instance each of the child elements <employeeId>, <email>, <startTime>, and <hours>. For example, the following message requests one day’s vacation (8 hours) starting on July 26, 2004, for the employee with an ID of 140 and e-mail address of laura1@adventure-works.com:

<?xml version="1.0" encoding="utf-16"?>
<vacationRequest>
<employeeId>140</employeeId>
<email>laura1@adventure-works.com</email>
<startTime>2004-08-01T09:00:00+00:00</startTime>
<hours>8</hours>
</vacationRequest>

Now you can create aMessage Type object from this schema collection:

CREATE MESSAGE TYPE [http://schemas.apress.com/ AcceleratedSQL2008/HolidayRequest]
VALIDATION = VALID_XML WITH SCHEMA COLLECTION
[http://schemas.apress.com/AcceleratedSQL2008/HolidayRequestSchema];

You want validation to occur against our schema, so you set VALIDATION to VALID_XML WITH SCHEMA COLLECTION, passing in the name of the XML SCHEMA COLLECTION object that you’ve just created.

Next, you define the contract for the conversations that will take place between your initiating service and receiving service:

CREATE CONTRACT
[http://schemas.apress.com/AcceleratedSQL2008/ HolidayRequestContract]
(
[http://schemas.apress.com/AcceleratedSQL2008/HolidayRequest]
SENT BY INITIATOR
);

This contract stipulates that only the initiating service will be sending messages to the queue and that it will send messages of only your HolidayRequest type.

Receiving Service

Now you need to define the two services that will be used to send and process the messages. First, you’ll create a stored procedure to handle any messages that are sent to the queue. Once you have the message, you’ll just close the conversation, as you don’t want to send any information back to the initiating service.

As stated at the outset, the processing you’ll perform is minimal. You’ll simply check whether the employee has enough hours of vacation entitlement left (assuming an annual entitlement of 20 days or 160 hours, which we just made up for the purposes of this example), and that the employee has given enough notice (at least one week in advance of the desired vacation start date). You then e-mail the employee, either stating that the request has been granted or giving the reason for rejecting it.

CREATE PROCEDURE usp_ProcessHolidayRequest
AS
DECLARE @msgBody XML(
[http://schemas.apress.com/AcceleratedSQL2008 /HolidayRequestSchema]),
@convID uniqueidentifier,
@email varchar(50),
@employeeID int,
@hours int,
@startTime DateTime,
@hoursTaken int,
@msgType nvarchar(256);
DECLARE @msgTable TABLE
(
message_body varbinary(max),
conversation_handle uniqueidentifier,
message_type_name nvarchar(256)
);
BEGIN
WAITFOR
(
RECEIVE TOP (1) message_body, conversation_handle, message_type_name
FROM HolidayRequestQueue
INTO @msgTable
), TIMEOUT 2000;
SET @msgBody = (SELECT TOP (1) CAST(message_body AS XML) FROM @msgTable);
SET @convID = (SELECT TOP (1) conversation_handle FROM @msgTable);
SET @msgType = (SELECT TOP (1) message_type_name FROM @msgTable);
END CONVERSATION @convID;
IF @msgType = 'http://schemas.apress.com/ AcceleratedSQL2008/HolidayRequest'
BEGIN
SET @email = @msgBody.value('data(//email)[1]', 'varchar(50)');
SET @hours = @msgBody.value('data(//hours)[1]', 'int');
SET @startTime = @msgBody.value('data(//startTime)[1]', 'datetime');
SET @employeeID = @msgBody.value('data(//employeeId)[1]', 'int');
SET @hoursTaken = (SELECT VacationHours FROM HumanResources.Employee
WHERE EmployeeID = @employeeID);
IF @hoursTaken + @hours > 160
EXEC msdb.dbo.sp_send_dbmail
@profile_name = 'Default Profile',
@recipients = @email,
@subject = 'Vacation request',
@body = 'Your request for vacation has been refused because you
have insufficient hours remaining of your holiday entitlement.';
ELSE IF @startTime < DATEADD(Week, 1, GETDATE())
EXEC msdb.dbo.sp_send_dbmail
@profile_name = 'Default Profile',
@recipients = @email,
@subject = 'Vacation request',
@body = 'Your request for vacation has been refused because you
have not given sufficient notice. Please request holiday at least a week in
advance.';
ELSE
BEGIN
UPDATE HumanResources.Employee
SET VacationHours = VacationHours + @hours;
EXEC msdb.dbo.sp_send_dbmail
@profile_name = 'Default Profile',
@recipients = @email,
@subject = 'Vacation request',
@body = 'Your request for vacation has been granted.';
END
END
END

The first task here is to retrieve the message from the queue, which you do with a RECEIVE statement:

WAITFOR (
RECEIVE TOP (1) message_body, conversation_handle, message_type_name
FROM HolidayRequestQueue
INTO @msgTable
), TIMEOUT 2000;

Notice that you wrap the RECEIVE statement in a WAITFOR statement to ensure that the procedure will wait for the specified TIMEOUT value (in milliseconds, so 2 seconds in this case) for a message to arrive. The RECEIVE clause retrieves the body and type of the first message in the queue and the ID of the conversation it belongs to into a table variable called @msgTable.

Once you have the message shredded into a table variable, you can extract the message body from it and cast it to the XML datatype, and you can also retrieve the message type to ensure you have the right sort of message, and the dialog handle, which you need to close the conversation:

SET @msgBody = (SELECT TOP (1) CAST(message_body AS XML) FROM @msgTable);
SET @convID = (SELECT TOP (1) conversation_handle FROM @msgTable);
SET @msgType = (SELECT TOP (1) message_type_name FROM @msgTable);
END CONVERSATION @convID;

Then you check that the message is of the correct type. If so, you use XQuery and the value() method of the XML datatype to extract the individual values (the employee’s ID and e-mail address, and the start time and duration of the planned vacation) into local variables.

Next, you query the HumanResources.Employee table to see how many hours of vacation the employee has already taken, and perform checks to determine whether or not to grant the vacation. If the vacation is granted, you update the VacationHours of that employee and e-mail the employee to confirm you’re granting the request; otherwise, you just e-mail the employee stating the reason why you’re refusing the request.

Once you’ve defined the message type for your service and the stored procedure that will process the messages, you can create the queue to hold the messages:

CREATE QUEUE HolidayRequestQueue
WITH
STATUS = ON,
RETENTION = OFF,
ACTIVATION
(
STATUS = ON,
PROCEDURE_NAME = usp_ProcessHolidayRequest,
MAX_QUEUE_READERS = 5,
EXECUTE AS SELF
);

you set the queue to activate immediately and not to retain messages when they are retrieved by your stored procedure. The ACTIVATION clause ensures that your service program will execute whenever a message is sent to the queue.

You’re now ready to create the service itself:

CREATE SERVICE
[http://schemas.apress.com/AcceleratedSQL2008/ HolidayRequestProcessorService]
ON QUEUE HolidayRequestQueue
(
[http://schemas.apress.com/AcceleratedSQL2008/ HolidayRequestContract]
);

The service simply acts as a link between the queue and one or more contracts, so no new information is introduced here, and the syntax is self-explanatory.

Initiating Service

You’ve now arranged the processing of messages in the queue, but as yet, you don’t have any way to send messages to it. To do this, you need to create another service and a procedure that initiates a dialog and sends a message to the queue when an employee requests a vacation.

The code to create the initiator service is identical (except for the name) to the code for the processor service:

CREATE SERVICE
[http://schemas.apress.com/AcceleratedSQL2008/ HolidayRequestInitiatorService]
ON QUEUE HolidayRequestQueue
(
[http://schemas.apress.com/AcceleratedSQL2008/ HolidayRequestContract]
);

Again, you’re simply associating the HolidayRequestContract with your queue. The last task is to create the stored procedure that will send a message to the queue:

CREATE PROCEDURE usp_RequestHoliday
@employeeId int,
@email varchar(50),
@hours int,
@startDate varchar(50)
AS
DECLARE @dialogHandle uniqueidentifier,
@body nvarchar(1000),
@msg XML,
@date nvarchar(100)
BEGIN
SET @body = N'<?xml version="1.0"?>
<vacationRequest>
<employeeId>' + CAST(@employeeID AS varchar) + '</employeeId>
<email>' + @email + '</email>
<startTime>' + @startDate + '</startTime>
<hours>' + CAST(@hours AS nvarchar) + '</hours>
</vacationRequest>';
SET @msg = CAST(@body AS XML)
BEGIN DIALOG CONVERSATION @dialogHandle
FROM SERVICE
[http://schemas.apress.com/AcceleratedSQL2008/ HolidayRequestInitiatorService]
TO SERVICE
'http://schemas.apress.com/AcceleratedSQL2008/ HolidayRequestProcessorService'
ON CONTRACT
[http://schemas.apress.com/AcceleratedSQL2008/ HolidayRequestContract];
SEND ON CONVERSATION @dialogHandle
MESSAGE TYPE [http://schemas.apress.com/ AcceleratedSQL2008/HolidayRequest]
(@msg);
END CONVERSATION @dialogHandle;
END

This procedure takes four parameters—the information that you’ll pass into the message. You use this information to build an XML document that conforms to the schema you specified for your message type as nvarchar, and then cast this to the XML datatype.

Now you’re ready to start the conversation. To do this, use the BEGIN DIALOG [CONVERSATION] statement, specifying the initiating and target services and the contract, and retaining a reference to the ID for the conversation. You use this ID to send a message to the remote service, specifying your HolidayRequest message type and passing in the XML-typed variable that contains the body of the message. Once you’ve sent this, your stored procedure has done its work, so you just close the conversation.

This completes our simple Service Broker example. To test it, execute this stored procedure, for example:

EXEC usp_RequestHoliday 140, 'someone@somewhere.com', 8,
'2004-08-01T09:00:00+00:00'

Note that, to make testing much easier, the e-mail address is passed in separately, instead of being looked up in the database. This means you can supply your own e-mail address instead of using the one that corresponds to the EmployeeID you pass in. Also, note that the start date is passed in as a string in XSD dateTime format ('yyyy-mm-ddThh:mm:ss+timezone offset').


All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd DMCA.com Protection Status

SQL Server 2008 Topics