Advanced Saga mapping for NServiceBus

For this blogpost I assume the reader is familiar with the concept of Saga as defined and implemented in NServiceBus (see http://particular.net/articles/sagas-in-nservicebus).

Scenario

In this blog I am going to explain the solution I have created for the following scenario:
I have a message from the type TestMessage which has a property named PropertyOne. On this message I have also placed a header which is identified by urn:HeaderA.
Now I want to create a saga that stores the values of PropertyOne and urn:HeaderA in it’s saga data for the first message it receives. After that it should only receive messages that have the same values for PropertyOne and urn:HeaderA as the first message.

NServiceBus Default Behavior

First let us take a look at what NServiceBus offers us out of the box. When using Sagas within NServiceBus messages are mapped by comparing a property on the message with a property that has been set on the IContainSagaData instance for that saga. The syntax for that looks like this:

// NServiceBus 4.0 onward
public override void ConfigureHowToFindSaga()
{
    ConfigureMapping<TestMessage>(message => message.PropertyOne)
                                 .ToSaga(data => data.PropertyOne);
}

// NServiceBus 3.3 and earlier
public override void ConfigureHowToFindSaga()
{
    ConfigureMapping<TestMessage>(data => data.PropertyOne,
                                  message => message.PropertyOne)
}

In my scenario, I want to map on both a propery and a header. It that case NServiceBus provides two options:
1) Combine all mapped fields into one single property which can then be used for mapping purposes.
2) Create a custom finder for this mapping by implementing IFindSagas<TSagaData>.Using<TMessage>.

Single Property Mapping

Using a single property to map on both a propery and a header is easily accomplished on both the saga data and the message. Whilst for commands this is not a major issue, for events this is rather ugly as it requires the event to know that it will be handled by a specific saga. It also comes with the issue that NServiceBus will serialize the data twice (since I do not wish to remove the original property, which I need for processing).

IFindSagas<TSagaData>.Using<TMessage>

Implementing the IFindSagas<TSagaData>.Using<TMessage> interface (mind you, the Using is an interface, the IFindSagas is actually an abstract class!) is a rather elegant solution where all mapping logic is placed in a new class opposed to altering the message and/or saga data. The downside is that at that point I also have to take on the responsibility to retrieve the sagadata from storage (e.g. RavenDB). And since NServiceBus does not accept generic implementations of this interface, I would have to implement this for every mapping where I want to map on more than a single property.

Custom mapping syntax

Although the option of implementing IFindSagas<TSagaData>.Using<TMessage> is the cleanest solution, it still takes a lot of work every time I need to use more than a single property. Additionally the mapping would no longer be stored within the saga class and might cause some confusion for the other developers on my team. What I am really looking for is something like what is shown below.

public override void ConfigureHowToFindSaga()
{
    ConfigureMapping<TestMessage>()
        .ByProperty(data => data.PropertyOne, 
                    message => message.PropertyOne)
        .AndByHeader(data => data.HeaderA, 
                     "urn:HeaderA");
}

Obviously (at the moment of writing this post) this syntax is not available in NServiceBus. So, because I really dislike doing the same thing twice, the only route left to me was building something which would work according the above syntax.

Since the chosen syntax uses method-chaining, the use of a state-object was required. Every method should return an object that contains the result of the calls made up to that point as well as a definition of the methods that can still be chained. I did this by creating a SagaMapping class.

public class SagaMapping<TSagaData, TMessage> : IEmptySagaMapping<TSagaData, TMessage>, IFilledSagaMapping<TSagaData, TMessage>
        where TSagaData : IContainSagaData
{
    public IQueryable<TSagaData> AppendMappingClauses(IQueryable<TSagaData> query, TMessage message)

    private SagaMapping<TSagaData, TMessage> MapByProperty(Expression<Func<TSagaData, object>> sagaDataProperty,
                                                           Expression<Func<TMessage, object>> messageProperty)

    private SagaMapping<TSagaData, TMessage> MapByHeader(Expression<Func<TSagaData, object>> sagaDataProperty,
                                                         string messageHeaderName)

    IFilledSagaMapping<TSagaData, TMessage> IEmptySagaMapping<TSagaData, TMessage>.ByProperty(Expression<Func<TSagaData, object>> sagaDataProperty,
                                                                                              Expression<Func<TMessage, object>> messageProperty)
    {
        return MapByProperty(sagaDataProperty, messageProperty);
    }

    IFilledSagaMapping<TSagaData, TMessage> IEmptySagaMapping<TSagaData, TMessage>.ByHeader(Expression<Func<TSagaData, object>> sagaDataProperty,
                                                                                            string messageHeaderName)
    {
        return MapByHeader(sagaDataProperty, messageHeaderName);
    }

    IFilledSagaMapping<TSagaData, TMessage> IFilledSagaMapping<TSagaData, TMessage>.AndByProperty(Expression<Func<TSagaData, object>> sagaDataProperty,
                                                                                                  Expression<Func<TMessage, object>> messageProperty)
    {
        return MapByProperty(sagaDataProperty, messageProperty);
    }

    IFilledSagaMapping<TSagaData, TMessage> IFilledSagaMapping<TSagaData, TMessage>.AndByHeader(Expression<Func<TSagaData, object>> sagaDataProperty,
                                                                                                string messageHeaderName)
    {
        return MapByHeader(sagaDataProperty, messageHeaderName);
    }
}

To limit the length of this, already lengthy, post I omitted the implementation of most methods, since the implementation is mostly regular .NET and not really interesting for this specific post. The private MapByProperty and MapByHeader methods verify if the provided expressions actually point towards a property and if so add them to a private dictionary, otherwise an exception is thrown. (Sounds a lot like the current NServiceBus implementation, right?)
Both methods are called by the interface methods of IEmptySagaMapping and IFilledSagaMapping which results in the syntax as shown above.

Additionally, the class has a public AppendMappingClauses method. By providing an IQueryable and the message for which a matching saga data instance should be found it will rewrite the earlier provided mappings into proper Where expressions. In case of a message where PropertyOne equals 1 and urn:HeaderA equals A, the sample I gave earlier would be rewritten to:

.Where(data => data.PropertyOne == 1).Where(data => data.HeaderA == "A")

MappingStore

While I do have a mapping state-object now, I still need to add a ConfigureMapping method the saga class. The easiest way to do this is by creating an extension method. The method would only have to create an instance of SagaMapping and return it to the caller as IEmptySagaMapping. Oh, and it should probably store it in memory somewhere, so it can be used when a message should actually be mapped to a saga. That’s where the MappingStore comes in.

public class MappingStore
{
    private readonly ConcurrentDictionary<Tuple<Type, Type>, object> _mappings;
    private readonly ModuleBuilder _moduleBuilder;

    internal MappingStore()
    {
        _mappings = new ConcurrentDictionary<Tuple<Type, Type>, object>();

        var assemblyBuilder = 
            AppDomain.CurrentDomain.DefineDynamicAssembly(
                                        new AssemblyName("SagaFinders.dll"), 
                                        AssemblyBuilderAccess.Run);
        _moduleBuilder = assemblyBuilder.DefineDynamicModule("SagaFinders");
    }

    internal void RegisterMapping<TSagaData, TMessage>
        (SagaMapping<TSagaData, TMessage> mapping)
            where TSagaData : IContainSagaData
    {
        // Add the mapping, or override it if it already exists
        _mappings.AddOrUpdate(new Tuple<Type, Type>(typeof (TSagaData), 
                                                    typeof (TMessage)),
                              key =>
                                  {
                                      CreateType<TSagaData, TMessage>();
                                      return mapping;
                                  },
                              (key, originalValue) => mapping);
    }

    private void CreateType<TSagaData, TMessage>()
    {
        Type type = typeof(SagaFinder<,>).MakeGenericType(typeof(TSagaData), typeof(TMessage));
        var typeBuilder = 
             _moduleBuilder.DefineType(String.Format(CultureInfo.InvariantCulture,
                                                     "{0}{1}AdvancedSagaFinder",
                                                     typeof(TSagaData).Name,
                                                     typeof(TMessage).Name),
                                       TypeAttributes.Public | TypeAttributes.Class);
        typeBuilder.SetParent(type);

        new Features.Sagas().FindAndConfigureSagasIn(new[] {typeBuilder.CreateType()});
    }

    internal SagaMapping<TSagaData, TMessage> RetrieveMapping<TSagaData, TMessage>()
        where TSagaData : IContainSagaData
    {
        object mapping;
        if (_mappings.TryGetValue(new Tuple<Type, Type>(typeof (TSagaData), 
                                                        typeof (TMessage)), 
                                  out mapping))
        {
            return (SagaMapping<TSagaData, TMessage>) mapping;
        }
        return null;
    }
}

The MappingStore class has two responsibilities. The first, quite obvious, responsibility is to store all created mappings for later use. To achieve this a ConcurrentDictionary is used. It always assumes the provided mapping to be the correct one, as such it will override any existing mapping for the provided TSagaData/TMessage combination. (There should at any time only be one mapping between a message type and a saga data type.)

The second responsibilty is a bit uglier, as the store will also need to dynamically create a class implementing IFindSagas<TSagaData>.Using<TMessage>. Eventhough you can use the MakeGenericType method (on the Type class) to fill the generic parameters on a type, it will retain the IsGenericType flag. Which causes NServiceBus to reject the type as a valid Finder. In order to work around this issue an in-memory assembly is created. The first time a specific mapping is provided to the MappingStore it will create a non generic-type and have it derive from my implementation of IFindSagas<TSagaData>.Using<TMessage> using the correct generic parameters. Once created it will hand the type to NServiceBus through the Sagas.FindandConfigureSagasIn() method. NServiceBus will detect that the type implements IFinder and is not generic, so it will accept the type as a valid Finder. From that moment onward, whenever NServiceBus needs to map a message of the provided type to saga data of the provided type it will instantiate the Finder and call it’s FindBy method, allowing my code to handle the exact mapping.

SagaFinder

All that is left at this point is actually implementing IFindSagas<TSagaData>.Using<TMessage>. Since the MappingStore dynamically derives classes from this baseclass, it can be completely Generic.

public abstract class SagaFinder<TSagaData, TMessage> 
    : IFindSagas<TSagaData>.Using<TMessage>
        where TSagaData : IContainSagaData
{
    public MappingStore MappingStore { get; set; }

    public RavenSessionFactory SessionFactory { get; set; }

    public TSagaData FindBy(TMessage message)
    {
        var mapping = 
            MappingStore.RetrieveMapping<TSagaData, TMessage>();

        if (mapping == null)
        {
            return default(TSagaData);
        }

        IQueryable<TSagaData> query = 
            SessionFactory.Session.Query<TSagaData>();

        query = mapping.AppendMappingClauses(query, message);

        return Enumerable.FirstOrDefault(query);
    }
}

The implementation of this class is rather straightforward. It retrieves the mapping based upon the requested type. If there is no mapping, just return null (which I cannot since I am working with generics, but as structs cannot implement interfaces default(TSagaData) will always return null), this will cause a new saga to be started.

If the mapping does exist, the code goes on to get a query for the provided TSagaData type from the SessionFactory. At this moment, I only implemented it to work with RavenDB as Saga Persister (Configure.RavenSagaPersister()), since I never use the in-memory persistance anyway.

Raven provides an IQueryable which will only execute when one of it’s iteration methods is called (Any(), Select(), GetEnumerator(), etc…), allowing me to add additional filters in the form of calls to Where() by calling the AppendMappingClauses method of the mapping. The advantage of this approach is that the appended clauses will be translated to Lucene (the query language used by Raven), so all filtering is done at the databaselevel.

Once al that is completed, a simple FirstOrDefault() is enough to return the matching saga, or null in case no saga is found.

Final result

Because I implemented this feature as an external component (instead of creating a GitHub fork of NServiceBus), I had to settle for the following syntax:

public override void ConfigureHowToFindSaga()
{
    this.ConfigureMapping<TestSagaData, TestMessage>()
        .ByProperty(data => data.PropertyOne, 
                    message => message.PropertyOne)
        .AndByHeader(data => data.HeaderA, 
                     "urn:HeaderA");
}

It ended up very close to what I indended with the only differences being the mandatory use of the this keyword (because I used an extension method) and the necessity to explicitly provide the saga data type (once again, because I used an extension method).

NuGet

If you wish to give this a try, a compiled version is available through NuGet under a MIT License.
By default this mapping feature is disabled, it needs to be explicitly enabled during initialization by calling Configure.AdvancedSagaMapping(). Be careful to map a sagadata/message combination by either the NServiceBus mapping or this advanced mapping, but never both! Doing so would cause the saga to trigger twice.

For those who like to use the console instead of the website or packagemanager, use the following command to install the latest version:
PM> Install-Package DotAlpha.NServiceBus.Saga.Advanced