NServiceBus Transport over SignalR, Part One

On March 18 I teamed up with Roy Cornelissen to do a presentation about NServiceBus combined with SignalR at the Dutch SDN Event. One of the topics that we covered was using SignalR as a NServiceBus transport (instead of the default MSMQ).

In this post I will explain how to implement a custom NServiceBus Transport and have it integrate seamlessly into the NServiceBus API. In the “Part Two” blog post that will follow in a week or two I will take a look at the SignalR Hub which allows the NServiceBus services to communicate.

Queue

As with everything NServiceBus, creating a custom Transport is done by implementing a few interfaces. In this case the ISendMessages and IReceiveMessages. While it is possible to implement these interfaces on two separate classes, I will need them on the same class as I want to use a single SignalR hub for both sending and receiving. Implementing both interfaces on my custom SignalRMessageQueue leaves me with:

public class SignalRMessageQueue : ISendMessages, IReceiveMessages
{
    public void Send(TransportMessage message, Address address)
    {
        throw new NotImplementedException();
    }

    public bool HasMessage()
    {
        throw new NotImplementedException();
    }

    public void Init(Address address, bool transactional)
    {
        throw new NotImplementedException();
    }

    public TransportMessage Receive()
    {
        throw new NotImplementedException();
    }
}

The first method that will be invoked by the NServiceBus framework is the Init which is used to initialize the receiver. What I need the Init method to do is create a persistent SignalR connection and register the queue of the current service with the Hub. Seeing as I probably want to configure the uri for the MessageHub at a later time, for now I will just create a public property on my SignalRMessageQueue class.

private IHubProxy _proxy;

public void Init(Address address, bool transactional)
{
    var kind = UriKind.RelativeOrAbsolute;
    Uri uri;

    if (String.IsNullOrWhiteSpace(MessageHubUri) || 
        !Uri.TryCreate(MessageHubUri, kind, out uri))
    {
        throw new InvalidOperationException(
            "Url is not valid. Please configure a valid url before initializing.");
    }

    var connection = new HubConnection(MessageHubUri);

    _proxy = connection.CreateHubProxy("NServiceBusHub");
    _proxy.On("receive", OnReceive);

    connection.Start()
              .ContinueWith(t => _proxy.Invoke("RegisterQueue", address))
              .Wait();
}

The implementation of the method turned out to be rather straightforward. After parsing the configured uri all that needed to be done was creating a proxy for the MessageHub, registering an eventhandler for the receive event (in my case a OnReceive method) and invoke the RegisterQueue method. (Both of these methods will be implemented on the Hub in my next blog post)
Seeing as I just registered an OnReceive method, I will need to implement it. For now it will suffice to put the message in a local Queue, in a thread-safe manner of course.

private static Queue _queue = new Queue(10);
private object _lockObject = new object();

private void OnReceive(TransportMessage transportMessage)
{
    Monitor.Enter(_lockObject);
    _queue.Enqueue(transportMessage);
    Monitor.Exit(_lockObject);
}

Now that the message can be received I need to hand it over to the NServiceBus framework. This is achieved by implementing the HasMessage and Receive methods. NServiceBus will keep calling HasMessage to check if there are any messages in my queue. If I return true it will then call Receive and expect me to return a TransportMessage instance (or null, in which case it will be ignored).

public bool HasMessage()
{
    bool result = false;
    Monitor.Enter(_lockObject);
    result = _queue.Count > 0;
    Monitor.Exit(_lockObject);
    return result;
}

For the HasMessage method a simple Count > 0 check will suffice. But seeing as I will Enqueue and Dequeue using locks, I will need to apply proper locking here as well.

public TransportMessage Receive()
{
    TransportMessage message = null;
    Monitor.Enter(_lockObject);
    if (_queue.Count > 0)
    {
        message = _queue.Dequeue();
    }
    Monitor.Exit(_lockObject);
    return message;
}

The Receive method is the exact opposite of the OnReceive, it dequeues a message and returns it. Which leaves us to the Send method. After all the “complexity” of receiving the messages, sending them is surprisingly simple. All I need to do is invoke the Send method on the MessageHub proxy.

public void Send(TransportMessage message, Address address)
{
    _proxy.Invoke("Send", message, address);
}

Configuration

Now that I have a Custom Transport, it is ready for use! But wouldn’t it be great if I could apply it in the typical NServiceBus manner?

Configure.With()
         .DefaultBuilder()
         .SignalRTransport()
         .UnicastBus();

To do this I will need to create an extension method for the Configure class, something like this:

public static  class ConfigureExtensions
{
    public static ConfigureSignalRTransport SignalRTransport
        (this Configure config)
    {
        config.Configurer
              .ConfigureComponent(DependencyLifecycle.SingleInstance);

        return config;
    }
}

While this does what I need, it doesn’t give me the option to specify the MessageHub uri I mentioned earlier. To get the utmost flexibility I would like to be able to specify the uri both in code and in the configuration file. So first I will need a custom ConfigurationSection.

public class SignalRTransportConfig : ConfigurationSection
{
    [ConfigurationProperty("MessageHubUri", IsRequired = true)]
    public string MessageHubUri
    {
        get { return this["MessageHubUri"] as string; }
        set { this["MessageHubUri"] = value; }
    }
}

The easiest way to load these settings from config while still allowing the developer to specifty them from code as well is to extend the Configure class. By loading the config section in the constructor and defining a method to override that value, I can easily fulfil both of my requirements.

public class ConfigureSignalRTransport : Configure
{
    private IComponentConfig _transportConfig;

    public void Configure(Configure config)
    {
        Builder = config.Builder;
        Configurer = config.Configurer;

        _transportConfig = Configurer
                .ConfigureComponent(DependencyLifecycle.SingleInstance);

        var cfg = GetConfigSection();
        if (cfg == null) return;

        MessageHubUri(cfg.MessageHubUri);
    }

    public ConfigureSignalRTransport MessageHubUri(string value)
    {
        _transportConfig.ConfigureProperty(t => t.MessageHubUri, value);
        return this;
    }
}

By returning the current instance in the MessageHubUri method it becomes easy to include the method in the initial Configuration.
The last thing that remains is to update the extension method I wrote earlier to use the ConfigureSignalRTransport instead of the regular Configure class.

public static  class ConfigureExtensions
{
    public static ConfigureSignalRTransport SignalRTransport
        (this Configure config)
    {
        var cfg = new ConfigureSignalRTransport();
        cfg.Configure(config);

        return cfg;
    }
}

In order to configure everything from code I can just modify the initial configuration call and I’m done:

Configure.With()
         .DefaultBuilder()
         .SignalRTransport()
         .MessageHubUri("localhost")
         .UnicastBus();


I hope this blogpost (my very first!) was clear to understand and gave some insight into implementing a NServiceBus Transport. In my next post I will revisit this topic and take a look at the SignalR side of this endeavour.