Wednesday, September 17, 2008

MSMQ Transactional Message Processing using Multiple Receive Queues

Here's the situation:
  1. You want to asynchronously process messages on a queue in a transactional manner, such that the message is only taken from the queue upon successfully processing it. This allows messages that failed to be properly processed to be processed again later with a chance at success.
  2. You want to have multiple workers processing these messages to improve efficiency and performance of long-running processing on large numbers of messages.
  3. When you shutdown, you want all the processing code to complete before allowing the process to exit.
The first item can be accomplished with using MessageQueue's BeginPeek method and PeekCompleted event. This will allow you to peek at the queue and and begin a transaction before actually receiving the message. Beginning a transaction before receiving the message allows you to abort the transaction should an error occur during processing, leaving the message on the queue to be processed again later (hopefully with a higher chance of success). The following event handler shows the boilerplate code to accomplish this:

private void queue_PeekCompleted(object sender, PeekCompletedEventArgs e)
{
var queue = (MessageQueue)sender;

var transaction = new MessageQueueTransaction();
transaction.Begin();
try
{
var message = queue.Receive(transaction);
// process the message here
transaction.Commit();
}
catch (Exception ex)
{
// abort if processing fails
transaction.Abort();
}
finally
{
// start watching for another message
queue.BeginPeek();
}
}

The second item can be accomplished by creating multiple receiving queues and telling them to start watching for incoming messages. The following snippets of code demonstrate how to accomplish this:

private readonly MessageQueue[] Receivers; // member
...
this.Receivers = Enumerable.Range(0, (count <= 0) ? 1 : count)
.Select(i =>
{
var queue = new MessageQueue(path, QueueAccessMode.Receive)
{
Formatter = new BinaryMessageFormatter()
};
queue.MessageReadPropertyFilter.SetAll();
return queue;
})
.ToArray();

// begin watching
foreach (var queue in this.Receivers)
{
queue.PeekCompleted += queue_PeekCompleted;
queue.BeginPeek();
}

...
// closing
foreach (var queue in this.Receivers)
{
queue.PeekCompleted -= queue_PeekCompleted;
queue.Close(); // stop peeking
}

The third item can be accomplished by simply incrementing and decrementing a counter when processing begins and ends respectively; then you simply block until that counter reaches zero. You'll want to place the decrement in a finally block to ensure that the counter is decremented even if processing throws an exception. Assuming you have a Counter class that implements thread safe increment and decrement operations (see bottom), you can create a member named "ProcessingCounter", and your PeekCompleted handler has the following line to do the processing,
this.Handle(queue.Receive(transaction));

your Handle method would look like this,

private void Handle(Message message)
{
this.ProcessingCounter.Increment();
try
{
// process message here;
}
finally
{
this.ProcessingCounter.Decrement();
}
}

and you could block after your MessageQueue.Close() calls like this

while (this.ProcessingCounter.Value > 0)
Thread.Sleep(100);

The following abstract class puts it all together. Simply implement the Process method and away you go!

public abstract class MessageProcessor<TMessage>
{
private readonly MessageQueue[] Receivers;
private readonly Counter ProcessingCounter = new Counter();
private bool IsClosing;

public MessageProcessor(string path)
: this(path, 1) { }

public MessageProcessor(string path, int count)
: base()
{
if (string.IsNullOrEmpty(path))
throw new ArgumentNullException("path");

if (!MessageQueue.Exists(path))
MessageQueue.Create(path, true);

this.Receivers = Enumerable.Range(0, (count <= 0) ? 1 : count)
.Select(i =>
{
var queue = new MessageQueue(path, QueueAccessMode.Receive)
{
Formatter = new BinaryMessageFormatter()
};
queue.MessageReadPropertyFilter.SetAll();
return queue;
})
.ToArray();
}

public void Close()
{
this.IsClosing = true;

this.OnClosing();

foreach (var queue in this.Receivers)
{
queue.PeekCompleted -= queue_PeekCompleted;
queue.Close();
}

while (this.IsProcessing)
Thread.Sleep(100);

this.IsClosing = this.IsOpen = false;
this.OnClosed();
}

public bool IsOpen { get; private set; }

protected bool IsProcessing
{
get { return this.ProcessingCounter.Value > 0; }
}

protected virtual void OnClosing() { }
protected virtual void OnClosed() { }
protected virtual void OnOpening() { }
protected virtual void OnOpened() { }

public void Open()
{
if (this.IsOpen)
throw new Exception("This processor is already open.");

this.OnOpening();

foreach (var queue in this.Receivers)
{
queue.PeekCompleted += queue_PeekCompleted;
queue.BeginPeek();
}

this.IsOpen = true;
this.OnOpened();
}

protected abstract void Process(TMessage @object);

private void Handle(Message message)
{
Trace.Assert(null != message);

this.ProcessingCounter.Increment();
try
{
this.Process((TMessage)message.Body);
}
finally
{
this.ProcessingCounter.Decrement();
}
}

private void queue_PeekCompleted(object sender, PeekCompletedEventArgs e)
{
var queue = (MessageQueue)sender;

var transaction = new MessageQueueTransaction();
transaction.Begin();
try
{
// if the queue closes after the transaction begins,
// but before the call to Receive, then an exception
// will be thrown and the transaction will be aborted
// leaving the message to be processed next time
this.Handle(queue.Receive(transaction));
transaction.Commit();
}
catch (Exception ex)
{
transaction.Abort();
Trace.WriteLine(ex.Message);
}
finally
{
if (!this.IsClosing)
queue.BeginPeek();
}
}
}

Incidentally, the following is my implementation of a thread-safe counter.

public class Counter
{
private readonly object SyncRoot = new object();
private int value;

public int Value
{
get
{
lock (this.SyncRoot)
{
return value;
}
}
}

public int Decrement()
{
lock (this.SyncRoot)
{
return --value;
}
}

public int Increment()
{
lock (this.SyncRoot)
{
return ++value;
}
}
}