Using IObservable to make your API SignalR and REST ready

A few weeks ago I was put in charge with overhauling an API to allow it to be accessed over REST and SignalR. To avoid unreadable and awful code, I ended up researching the IObservable interface and realized that, as usual, Microsoft did half the work for me. Merging these access points really was two separate tasks: figuring out how data should be accessed, and figuring out how the heck to build it.

In order to use both access points without major differences, I first messed around and seperated my code with a sledgehammer; I made a few static classes in my web project (ignoring the rest of the product’s needs) and started to see what I could do in an hour to just hack and slash one endpoint together. At the end of the hour I had a bunch of static methods on a nonsense class, where the database was being passed to the methods and it was just downright disgusting.

Terrifyingly, it basically worked fine. It was disgusting and unreadable, but it worked. Mostly.

I’m a huge believer that there’s danger in trying to be dead right all the time. That being said, when you realize that even though something works that it’s dead wrong, you can’t ignore that. I had to create a real solution and began researching into the world of C# to find my direction. A key point to keep in mind is that I primarily work with data that is found in batches.

Being primarily a Javascript developer, my first instinct was to create a bunch of similar methods that took some form of callback. That wasn’t the right answer, I knew; so I began to look for some C# equivalent to the Q promise library so that I could have some value that would come, error out, or (most important to me) notify me when there’s an update.

I hit up my boss, who probably knows more about C# than the rest of the East Coast, and he linked me to the MSDN page on IObservable in about 9 seconds.

As I said before, my data is found in batches. It’s scoured across thousands of sources and updates constantly, making the concept of simply calling a function and having all your results moot. The original solution before adding in our REST endpoint was to batch our data on initial pull when we hit the hubs and join groups that would inform us of updates. We’d check our most common sources one at a time, and as we found results server-side we would use a callback to inform the client that they had new data. It worked perfectly.

With the need for the REST API we wanted to try to emulate that as much as possible. Ultimately, after some debate, we settled on layering our API.

From top to bottom:

  1. The access point (a SignalR hub, a REST call)
  2. The data service (our access point call a centralized service that would return an IObservable)
  3. Our databases
  4. The rest of the world

We decided it was time to separate our concerns: each API call would perform precisely one task. Instead of designing our API around our web application’s need, we began to restructure our application to fit our API’s constraints and found our code became much more logical.

For ‘simple services’, we were just able to return data directly from our database. Our access point would call the service, the service would return the data, and then the access point would return the data.

For our more complex services, where batching was required, we had our data service layer return an IObservable that would ship data back to the caller as it was found. We were able to wait on the immediate results in our REST access point and return them, while we were also able to batch our requests down to our client in the SignalR access point.

Ultimately, if you understand moving your code into services works, the code for implementing this change became hysterically simple:

The Service:

using System;  
using System.Collections.Generic;  
using System.Reactive.Linq;  
using System.Threading.Tasks;

namespace MyApp.MyServices  
{
    public class ExampleData
    {
        public IObservable<IEnumerable<int>> TestCall()
        {
            var observable = Observable.Create<IEnumerable<int>>(async x =>
            {
                // do your query to get your data here
                await Task.Yield();
                x.OnNext(new HashSet<int>() {1, 2, 3});

                // pretend there's more waiting
                await Task.Yield();
                x.OnNext(new HashSet<int>() {5, 10, 20});

                x.OnCompleted();
            });

            return observable;
        }
    }
}

Calling The Service From SignalR Hub:

public async Task<IEnumerable<int>> GetAllTheseNumbers()  
{
    // just for examples; in reality you would inject these into your class with Autofac or something similar
    var myService = new ExampleData();

    var allResults = new List<int>();
    var testInts = myService.TestCall();

    await testInts.SelectMany(thisRound => { 
        allResults.AddRange(thisRound);
        return ((Task)Clients.Caller.sendDownBatch(thisRound)).ToObservable();
    });
}

Calling The Service Over REST:

[Route("api/test/")]
[HttpGet]
public async Task<IEnumerable<int>> TestEndpoint()  
{
    var myService = new ExampleData();

    var observable = myService.TestCall();

    // When awaited, this will return an array that has each result from each iteration.
    var endResults = observable.SelectMany(x => x).ToArray();

    return await endResults;
}

Ultimately, allowing access to the same data over SignalR and REST was more a design problem than a code problem; by leveraging the IObservable we were able to make API calls trivially easy once properly abstracted.


Currently Drinking: Danish Blend from Porto Rico Importing Co, made in my handy dandy Aeropress. It’s got a little kick in it that really picked me up on a chilly afternoon.