Seeking Advice on Schema for a Database Queue

Posts   
 
    
Devildog74
User
Posts: 719
Joined: 04-Feb-2004
# Posted on: 24-Feb-2006 16:05:11   

I have an application that is a command engine. A command is a serializable unit of work. In my current implementation, my command scheduler component builds commands and pumps them into Microsoft MSMQ. Then my command engin component receives the message received event from the queue.

My current implementation is tied to MSMQ. I want to abstract the queuing implementation behing a MessageQueueProvider. By doing this I should be able to plugin different queuing implementations, such as IMB MQ, or database queuing techniques.

So I need a table that acts as a container for a FIFO queue.

Does anyone know of any particular patterns or table schema designs that are applicable to FIFO queues?

Here is my current schema, (any advice would be appreciated)

create table MessageQueue { MessageId int identity(1,1), -- is the identity field TimeReceived DateTime, -- is the date and time the message was received TimeProcessed DateTime, -- is the date and time a thread processed the message ThreadLock bit default(0), -- is the flag indicating if a thread is working with the message MessageBody ntext, -- is the binary data of the command object Status nvarchar -- is the human readable stauts of the message }

Am I missing anythying in my schema?

Marcus avatar
Marcus
User
Posts: 747
Joined: 23-Apr-2004
# Posted on: 24-Feb-2006 16:53:47   

Here is something I started working on a while back... As far as I know it was working.

--------------------------------- START

CREATE TABLE [dbo].[QM_QueueItem]( [QueueItemID] [bigint] IDENTITY(1,1) NOT NULL, [QueueItemUID] [uniqueidentifier] NOT NULL, [QueueID] [int] NOT NULL, [DisplayName] varchar COLLATE Latin1_General_CI_AS NOT NULL, [TextData] [text] COLLATE Latin1_General_CI_AS NULL, [BinaryData] [image] NULL, [Priority] [int] NOT NULL, [Attempt] [int] NOT NULL, [NextAttempt] [datetime] NOT NULL, [CreatedDate] [datetime] NOT NULL, [InProgress] [bit] NOT NULL, [InternalUID] [uniqueidentifier] NOT NULL, CONSTRAINT [PK_QM_Queue_QueueItemUID] PRIMARY KEY NONCLUSTERED ( [QueueItemUID] ASC )WITH (IGNORE_DUP_KEY = OFF) ON [PRIMARY], CONSTRAINT [UC_QM_Queue_InternalUID] UNIQUE NONCLUSTERED ( [InternalUID] ASC )WITH (IGNORE_DUP_KEY = OFF) ON [PRIMARY] ) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]

GO

CREATE PROCEDURE [dbo].[QM_BeginDequeue] ( @QueueID int ) AS

SET NOCOUNT ON

DECLARE @InternalUID uniqueidentifier SET @InternalUID = newid();

SET TRANSACTION ISOLATION LEVEL REPEATABLE READ BEGIN TRAN

DECLARE @NOW datetime SET @NOW = GetDate()

UPDATE dbo.QM_Queue SET InternalUID = @InternalUID, Attempt = Attempt + 1, InProgress = 1, NextAttempt = @NOW WHERE QueueItemID IN (SELECT TOP 1 QueueItemID FROM dbo.QM_Queue WHERE InProgress = 0 AND NextAttempt <= @NOW AND QueueID = @QueueID ORDER BY Priority DESC, CreatedDate, QueueItemID)

COMMIT TRAN

SELECT QueueItemUID, DisplayName, TextData, BinaryData, Priority, Attempt, NextAttempt, CreatedDate FROM dbo.QM_Queue WHERE InternalUID = @InternalUID

SET NOCOUNT OFF

GO

CREATE PROCEDURE [dbo].[QM_CancelEnqueue] ( @QueueItemUID uniqueidentifier ) AS

DELETE FROM dbo.QM_Queue WHERE QueueItemUID = @QueueItemUID AND InProgress = 0

GO

CREATE PROCEDURE [dbo].[QM_Dequeue] ( @QueueID int ) AS

SET NOCOUNT ON

DECLARE @QueueUID uniqueidentifier

DECLARE @InternalUID uniqueidentifier SET @InternalUID = newid();

SET TRANSACTION ISOLATION LEVEL REPEATABLE READ BEGIN TRAN

DECLARE @NOW datetime SET @NOW = GetDate()

UPDATE dbo.QM_Queue SET InternalUID = @InternalUID, Attempt = Attempt + 1, InProgress = 1, NextAttempt = @NOW WHERE QueueItemID IN (SELECT TOP 1 QueueItemID FROM dbo.QM_Queue WHERE InProgress = 0 AND NextAttempt <= @NOW AND QueueID = @QueueID ORDER BY Priority DESC, CreatedDate, QueueItemID)

COMMIT TRAN

SELECT QueueItemUID, DisplayName, TextData, BinaryData, Priority, Attempt, NextAttempt, CreatedDate FROM dbo.QM_Queue WHERE InternalUID = @InternalUID

DELETE FROM dbo.QM_Queue WHERE InternalUID = @InternalUID

SET NOCOUNT OFF

GO

CREATE PROCEDURE [dbo].[QM_EndDequeue] ( @QueueItemUID uniqueidentifier ) AS

DELETE FROM dbo.QM_Queue WHERE @QueueItemUID = @QueueItemUID AND InProgress = 1

GO

CREATE PROCEDURE [dbo].[QM_Enqueue] ( @QueueItemUID uniqueidentifier, @QueueID int, @DisplayName nvarchar(255), @TextData text = NULL, @BinaryData image = NULL, @Priority int, @NextAttempt datetime ) AS

INSERT INTO dbo.QM_Queue (QueueItemUID, QueueID, DisplayName, TextData, BinaryData, Priority, Attempt, NextAttempt, CreatedDate, InProgress, InternalUID) VALUES (@QueueItemUID, @QueueID, @DisplayName, @TextData, @BinaryData, @Priority, 0, @NextAttempt, GetDate(), 0, newid())

GO

CREATE PROCEDURE [dbo].[QM_ReEnqueue] ( @QueueItemUID uniqueidentifier, @Priority int, @NextAttempt DateTime ) AS

UPDATE dbo.QM_Queue SET NextAttempt = @NextAttempt, Priority = @Priority, InProgress = 0 WHERE QueueItemUID = @QueueItemUID AND InProgress = 1

GO

CREATE PROCEDURE [dbo].[QM_ResetQueue] AS SET NOCOUNT ON

UPDATE dbo.QM_Queue SET InProgress = 0 WHERE InProgress = 1

SET NOCOUNT OFF

--------------------------------- END Marcus

Devildog74
User
Posts: 719
Joined: 04-Feb-2004
# Posted on: 24-Feb-2006 19:49:51   

Marcus, thanks for the info. The DisplayName, Priority, Attempt, and NextAttempt are all great additions.

I am using binary serialization, I was going to place the serialized data into the ntext field. I notice that your schema has a BinaryData field as type image and a TextData field as type text.

In your schema and implementation how are these 2 fields used?

In my implementation, I will be taking the serialized data and turning it back into a command object that can be executed.

Marcus avatar
Marcus
User
Posts: 747
Joined: 23-Apr-2004
# Posted on: 24-Feb-2006 21:09:32   

Devildog74 wrote:

I am using binary serialization, I was going to place the serialized data into the ntext field. I notice that your schema has a BinaryData field as type image and a TextData field as type text.

In your schema and implementation how are these 2 fields used?

My queue table is generic in that you will also notice a QueueID. So I can have different Qs all using the same table. Some QueueItems are binary serializations and would use the image field which is capable of storing a byte[]. String based XML would be stored in the text field.

Probably not the best, but it works... simple_smile

Devildog74 wrote:

In my implementation, I will be taking the serialized data and turning it back into a command object that can be executed.

Yes that's pretty much the way to go...

I have also abstracted the queue implementation at a higher level so that I can swap the SQL queue out for MSMQ at some point.

[Edit] ALSO: this is NOT production code. It was just a stab implementing a queue which is on my list of TODOs...

JimFoye avatar
JimFoye
User
Posts: 656
Joined: 22-Jun-2004
# Posted on: 24-Feb-2006 23:28:45   

This book might be helpful:

http://www.amazon.com/gp/product/1590592883/sr=8-1/qid=1140820026/ref=sr_1_1/002-0907197-7235262?%5Fencoding=UTF8

He has an example of an abstracted queueing mechanism that sits on top of either MSMQ or IBM's product.

Devildog74
User
Posts: 719
Joined: 04-Feb-2004
# Posted on: 27-Feb-2006 13:10:35   

Jim, thanks for the link. Actually, I have that book, and the sample in the book is what I am implementing.