Implementing a Retry Pattern for Azure Service Bus with Topic Filters
In this blog post I’m going to show an example of how you can implement a simple retry pattern for the Azure Service Bus when you are working with topics and subscriptions.
Introduction
Topics provide a one-to-many form of communication. A topic can have multiple queues that are called subscriptions. Each subscription has a secondary sub-queue, called a dead-letter queue (DLQ) to hold messages that cannot be delivered to the receiver.
A problem arises when you try to resubmit a DLQ message. Azure Service Bus by design doesn’t support message resubmission. This means that you cannot place a DLQ message directly onto your own subscription.
A simple solution seems to create a clone of the DLQ message and place this back onto the Topic. However, this will lead to unintentionally duplicate message handling because all subscriptions of that topic will receive that message again.
This problem can be solved with the use of Topic Filters.
Topic Filters
These filters are defined as rules on subscription queues. The idea is that each subscription has its own rule that filters the incoming messages. These rules exist of sql-like expressions that result in a boolean. If the expression evaluates to true, the message will be let through the filter. Otherwise the message will be discarded.
You always receive a default rule with the name $Default when creating a new subscription. This rule always returns true. As a result the subscription receives all messages on the topic.
Creating a solution
For this example we create a new servicebus and a topic. I use the Az PowerShell module to create all the resources in Azure. You can find here more information about the module.
$resourceGroup = "<your resourceGroup name>" $namespace = "<your namespace name>" $location = "<your location e.g. westeurope>" New-AzServiceBusNamespace -ResourceGroupName $resourceGroup -NamespaceName $namespace ` -Location $location -SkuName Standard $topic = "<your topic name>" New-AzServiceBusTopic -ResourceGroupName $resourceGroup -Namespace $namespace -Name $topic ` -EnablePartitioning $false
A message has a number of properties that can be used by applications for things like routing or special processing. The To property is used to set the recipient of the message.
We can set this property to the name of our subscription when we resubmit the DLQ message. Now we create a rule that accepts messages if the To property is empty or set the name of our subscription. This filters all messages that are not intended for us.
We create such a rule with the following two lines of code.
$subscription = "<your subscription name>" $sqlExpression = "sys.To IS NULL OR sys.To = '$subscription'"
A subscription with a new rule can be made with the following snippet of code. We also delete the $Default rule because that one is no longer of use. You can create as many subscriptions as you want. To test the rule, is it easiest to make more than one subscription.
New-AzServiceBusSubscription -ResourceGroupName $resourceGroup -Namespace $namespace -Topic $topic ` -Name $subscription New-AzServiceBusRule -ResourceGroupName $resourceGroup -Namespace $namespace -Topic $topic ` -Subscription $subscription -Name "OnToPropertyFilter" -SqlExpression $sqlExpression Remove-AzServiceBusRule -ResourceGroupName $resourceGroup -Namespace $namespace -Topic $topic ` -Subscription $subscription -Name '$Default' # We use explicit single quotes here, because $Default is in this context not a Powershell variable
The only thing left for us now is to create the code that will resubmit the DLQ message.
The following C# method will take care of this. I used the Microsoft.Azure.ServiceBus NuGet package for this example. You can find here more information about the package.
const string ServiceBusConnectionString = "<your connection string>"; public async Task RetryDeadLetterAsync(string topicPath, string subscriptionName) { // Create the deadletter queue name string entityPath = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionName); entityPath = EntityNameHelper.FormatDeadLetterPath(entityPath); MessageSender messageSender = new MessageSender(ServiceBusConnectionString, topicPath); MessageReceiver messageReceiver = new MessageReceiver(ServiceBusConnectionString, entityPath, receiveMode: ReceiveMode.PeekLock); // Receive the first message from the deadletter queue Message message = await messageReceiver.ReceiveAsync(); // Create a clone of the message, this removes the system properties of the message Message messageToResubmit = message.Clone(); // Set the To property of the message to specific subscriptionName of the subscription messageToResubmit.To = subscriptionName; // Resubmit the message onto the topic and complete the transaction await messageSender.SendAsync(messageToResubmit); await messageReceiver.CompleteAsync(message.SystemProperties.LockToken); await messageSender.CloseAsync(); await messageReceiver.CloseAsync(); }