May 05, 2012

TCP Qbservable Provider Security

UPDATE: The prototype TCP Qbservable Provider library that is described in this article has become the Qactive project on GitHub, including many bug fixes and improvements.

This is the second post in a series of posts on my TCP Qbservable Provider library, which is a proof-of-concept library that enables easy hosting of queryable observable services over TCP.  In this post, we dive into security.

There are two kinds of security that I believe must be fully supported by any IQbservable Provider to be a viable option for hosting public web services that can accept queries from anonymous clients:

  1. Semantic security
  2. Resource security

I’ve recently checked into source control new security features that address these issues.  More work needs to be done before the next release, but it’s a good start.  I’ve also included new examples in the example applications: SandboxedService, LimitedService and MaliciousClient.  These new examples are explained throughout this post.

Note that the new restricted security model is enabled by default for all hosted queries.  You must opt-out of the default security model by setting specific properties and by using a specific API to host your service.  The details are explained within this post.

Semantic Security

Semantic security is about limiting the kinds of queries that a service will execute to a minimal set that makes sense for the kind of service being offered.  It can be broken down further into two subcategories:

  1. Operators and Methods
  2. Types of Expressions

Operators and Methods

Semantic security limits the kinds of operators and methods that the service permits.  Clients can technically send any kind of operators that they want – you can’t prevent anonymous clients from sending strange (or even malicious) queries; however, the service must reject queries that don’t make sense based on the operators used within the queries to prevent the server from executing code that is unrelated to or just plain wrong for the service being offered.

For example, imagine that you’ve written a service that pushes news to clients: IQbservable<Article>.  Without semantic security, the service will happily accept any kind of query from clients.  Thus clients may execute operations such as windowing, buffering, aggregation, creating timers, creating ranges, zipping, joining, filtering, projecting, among others.  That’s a lot a functionality to offer for a news service, though perhaps it makes sense for your particular implementation; e.g., it might be nice to allow a client to query for articles of two different authors joined by coincidence over a particular duration.  However, does it make sense for a client to submit a query that uses the Range operator to generate a sequence of values from 1 to 10, for each article, and then project each value without the article?  Probably not.  A client could just as easily query for all articles unconditionally and then generate a range from 1 to 10 locally, without having to burden the server with such a trivial request.  Perhaps it would be better for this service to disallow use of the Range operator.

In fact, it may be better to limit queries to just simple filtering and projection, at first.  Clients could filter news articles and select specific properties in which they’re interested.  This may provide enough functionality to support common queries and yet still provides some useful optimizations.  It also keeps your service fair and responsive by doing less work per query, assuming that clients don’t generally need any other functionality.  After going live with your service you could accept feature requests for other kinds of query operators based on user feedback, consider whether the additional semantics make sense for your particular service, and then enable those operators.  You should also consider whether your server can handle the additional load of more complex queries – the next section, Resource Security, addresses that topic.

The first release of TCP Qbservable Provider (1.0 Alpha) already supports semantic security by allowing you to host queries from your own IQbservableProvider by calling the ServeTcp extension method; however, to limit query operators you must manually create a custom ExpressionVisitor, which can be somewhat difficult to do.  So to make things easier, a new feature (recently checked into source control) of the TCP Qbservable Provider library enables easy whitelisting of operators so that you don’t have to create your own IQbservableProvider or ExpressionVistor.

To use this feature, simply create an empty instance of ServiceEvaluationContext and call the AddKnownOperators method to whitelist operators that your service must support.  Then assign this context to a QbservableServiceOptions object that is passed to the ServeQbservableTcp extension method, which is responsible for hosting your observable over TCP.

var context = new ServiceEvaluationContext(
    includeSafeOperators: false,
    includeConcurrencyOperators: false);


var service = source.ServeQbservableTcp(
    new QbservableServiceOptions()
        EvaluationContext = context

With the above configuration, client queries using any methods other than the Where and Select operators are automatically rejected by the server.  To see rejection in action, take a look at the MaliciousClient and LimitedService examples checked into source control.

Types of Expressions

The ExpressionOptions property of the QbservableServiceOptions object provides lower-level control over the types of expressions that are permitted.  By default, only basic expressions are permitted.  Expressions such as assignments, blocks, loops, constructor calls and others are rejected by the service.  The default option is recommended for public services, though you can assign this property to any combination of flags to allow additional expression types when needed.

Opting Out of Semantic Security

To disable the semantic security model entirely, set the AllowExpressionsUnrestricted property of the QbservableServiceOptions object to true.  By doing so, the provider will ignore the ServiceEvaluationContext object and the ExpressionOptions property and simply allow any expression and operator to be used within a query.  Of course, this is only recommended if you fully trust all clients, which is not the case in public scenarios.

Resource Security

Resource security is about preventing unauthorized code from accessing system resources, such as memory, CPU, the file system and networking components.  It can be broken down further into two subcategories:

  1. API Security
  2. Algorithmic Security

API Security

The FCL offers many APIs that malicious clients could abuse, such as System.AppDomain, System.Environment, System.IO.*, System.Reflection.*, System.Security.*, System.Windows.*, etc.  In order to safely host a service that can execute arbitrary code from clients, the service would have to prevent unauthorized access to certain APIs.  The whitelisting approach used by the semantic security model described previously is one way to solve this problem, though the mechanism that is generally used to prevent unauthorized access to APIs in the .NET Framework is Code Access Security (CAS).  CAS offers a way to restrict APIs through permissions.

Whitelisting has an advantage over CAS.  CAS allows certain APIs to execute that don’t make sense in a server environment, simply because they don’t demand any permissions; e.g., System.Console.WriteLine.  In order to entirely prevent use of System.Console within queries, whitelisting must be used.

However, there are advantages to using CAS over whitelisting.  For one thing, the whitelisting approach I’ve implemented only checks methods and operators, while the CAS approach applies to any operator, method, constructor, event or property.  Furthermore, the CAS approach applies generally to the entire .NET Framework and third-party assemblies, whereas the process of manually whitelisting APIs may inadvertently expose clients to a security vulnerability hidden deep within some seemingly benign API.  In other words, CAS frees you from the burden of having to ensure that your chosen whitelisted operators and methods don’t inadvertently permit clients to format your C: drive.

Therefore, using sandboxing together with whitelisting provides the strongest security model for hosting services publicly by limiting the APIs that services will permit within anonymous client queries to the set of APIs that are resource-safe and semantically acceptable for a given service.

To incorporate CAS security into the TCP Qbservable Provider library, I’ve added new overloads for the QbservableTcpServer.CreateService method.  You may already be familiar with the CreateService method, since it’s also the API that enables you to host a service accepting an argument, as described in my first blog post in this series.  The new overloads begin with an AppDomainSetup parameter.  They host services within a sandboxed AppDomain that applies minimal CAS permissions by default, though it’s easily configurable by passing in a PermissionSet object to one of the overloads.  By default, sandboxed services will only permit queries to execute code that runs within a transparent security context, denying access to any API that demands additional permissions.  In other words, basic code can be executed within queries, but any APIs that demand additional permissions are rejected by throwing a SecurityException.

To host your service in a sandbox with minimal permissions, call the QbservableTcpServer.CreateService method and pass in an AppDomainSetup object as the first argument.  You must assign its ApplicationBase property to the base path of your sandboxed AppDomain, from which dependent assemblies will be loaded.  The .NET Framework guidelines recommend specifying a different path than the application’s actual base path, for additional security.  In the example below, I’ve assigned the sandbox’s base path to the application’s sandbox_bin subfolder, which would contain only the assemblies that the sandbox requires to execute the host service.

Unfortunately, that’s not all you must do.  The service factory argument cannot be a lambda because it’s invoked within the AppDomain and, therefore, must be serializable, though the compiler does not ensure that lambdas’ closures are serializable.  You may be able to workaround this if a closure isn’t created by referencing a method on a MarshalByRefObject, but that may actually defeat the purpose of the sandbox because ultimately client queries would be executed outside of the sandbox.  Instead, you should use a static factory method to create your service, as shown in the example below.

At the moment, due to type-inference issues, you must explicitly wrap the call to the factory method within an explicit delegate, though I plan on addressing that problem in the future, if possible.

var appBase = Path.GetDirectoryName(new Uri(Assembly.GetEntryAssembly().CodeBase).LocalPath);

var newAppBase = Path.Combine(appBase, "sandbox_bin");

var service = QbservableTcpServer.CreateService<object, int>(
    new AppDomainSetup() { ApplicationBase = newAppBase },
    new Func<IObservable<object>, IObservable<int>>(CreateServiceObservable));
public static IObservable<int> CreateServiceObservable(IObservable<object> request)
    return Observable.Range(1, 3);

With the above configuration, client queries using APIs that require any permissions other than security-transparent execution are automatically rejected by the server.  To see rejection in action, take a look at the MaliciousClient and SandboxedService examples checked into source control.

Keep in mind that your service callbacks, including the CreateServiceObservable method shown above, are executed with the reduced permission set of the sandboxed AppDomain; however, you can assert additional permissions from within your callbacks if needed.  Additional permissions may be needed when creating the service observable or executing observation side-effects, as shown in the following example.

var service = QbservableTcpServer.CreateService<object, int>(
    new AppDomainSetup() { ApplicationBase = newAppBase },
    new Func<IObservable<object>, IObservable<int>>(CreateServiceObservable));

service.Subscribe(terminatedClient =>
        new PermissionSet(PermissionState.Unrestricted).Assert();

            Log.Trace("Client shutdown: " + terminatedClient.Reason);

In the example above, the hypothetical Log.Trace method demands full trust to execute.  Since the callback is executing under the permission set granted to the sandboxed service, the demand will fail unless the required permissions are asserted, even across an AppDomain boundary.

The reason why full trust can be asserted here but not by clients’ queries is because callbacks are executed by one of your own assemblies while queries are executed within security transparent dynamic assemblies, which cannot assert permissions beyond the set granted to the sandbox.

The entry point assembly is automatically granted full trust permissions by the CreateService method and, presumably in the above example, the assembly containing the callback code is the application assembly, so it’s able to assert full trust.  Though if it’s not the application assembly, then you can specify additional full-trust assemblies by calling a specific overload of the CreateService method that accepts an array of Assembly objects.

Note that to assert full trust, assemblies must be strong-named.

Algorithmic Security

Algorithmic security is about restricting the types of expressions that the service permits based on their effect on system resources.

For example, returning to our IQbservable<Article> news service, let’s assume that we’re executing our service in a sandbox and we’ve already applied limited semantic security to prevent most operators from being executed, with the exception of a few that we think will be useful to clients for querying news articles:  Where, Select, Aggregate, Scan, GroupBy, Join, Timer, Interval and Sample.  We’re also allowing primitive types, List<T> and arrays to be created within clients’ queries.

Is our news service safe?

No, it’s not safe.  We wouldn’t want the service executing a client’s query if it contained expressions that performed any of the following actions, listed in an approximately-progressively-worsening order:

  • Performs 100 operations in a single query.
  • Creates 500 joins.
  • Allocates a 10MB array for every article containing the individual words within that article.
  • Aggregates lists of 10K articles containing the entire contents of each article.
  • Samples at a very short duration causing frequent output.
  • Starts a high-rate timer causing frequent output.
  • Starts 100K low-rate timers.
  • Allocates a single 2GB array.
  • Creates a single 10GB string.
  • Uses the GroupBy operator in such a way that it never frees memory.

These kinds of queries negatively impact CPU and memory, so their use must be restricted somehow.  To do that, we must first identify how system resources can be effected by queries, then determine how each particular API can be used within queries to effect system resources, and finally design a model for constraining each particular API, or otherwise prevent them entirely.

Generally, the issues with the example queries above are as follows:

  1. Unrestricted quantities of permitted operators.
  2. Unrestricted combinatorial use of operators.
  3. Unrestricted operator arguments.
  4. Unrestricted allocation of objects, including primitive types, collections and arrays.
    1. Unrestricted magnitude.
    2. Unrestricted quantity.
  5. Unthrottled output.
  6. Unthrottled input.  (Applies to duplex queries only; examples are not provided above.)

It seems that some of these issues are much easier to detect than others, though all of them need to be addressed to have a truly secure service model for publicly hosting queryable observables.

In general, when any potentially unsafe expressions are found within a client’s query, they must be evaluated to determine whether their particular usage is permitted and under what constraints.  You may notice some overlap with the concept of semantic security described previously.  The difference is that semantic security is only about semantics; i.e., whether the intent of a query through the expressions it uses is compatible with the intent of the service, whereas algorithmic security is about constraining the effects particular expressions have on system resources.

In order to constrain algorithmically-unsafe expressions, we must go beyond simple whitelisting and provide additional options for configuring the limits of particular expressions on a per-operation and per-type basis.

Currently, the TCP Qbservable Provider library does not offer any solutions for algorithmic security, though it’s something that I’m considering for a future release.  I plan to add configurable options such as the following:

  1. Maximum quantities for individually whitelisted operators and methods, with reasonable defaults.
  2. Maximum quantities for categorized operators and methods, with reasonable defaults; e.g.,
    1. Permit only 1 usage of concurrency-introducing operators.
    2. Permit only 1 usage of timer operators.
    3. Permit only 1 usage of buffering operators.
  3. Constraints on particular operator arguments, with reasonable defaults; e.g.,
    1. Interval:  30 seconds <= period <= 12 hours
    2. Range:  0 <= count <= 10
    3. Buffer:  2 <= length <= 100
  4. Maximum quantity and capacity of new collections and arrays, with reasonable defaults; e.g., new object[N], where N <= 10
  5. Maximum quantity and size of strings.
  6. Maximum size of incoming serialized expression trees, in bytes.
  7. Maximum size of serialized query payloads, in bytes, with separate settings for output notifications and duplex input notifications.
  8. Input/output throttling, with a default duration of 30 seconds.

In the meantime, I’ve configured the default semantic security whitelist to exclude all potentially unsafe operators, to disallow array allocations and to disallow explicit calls to constructors.  Note that your service is still capable of creating unbounded objects, including arrays and collections, though queries cannot create non-primitive objects themselves.  Assignments and void-returning methods are also disallowed by default, so all objects and collections are immutable.

However, the current security model isn’t an exhaustive solution.  It doesn’t prevent clients from using clever algorithms to cause stack overflows and out of memory exceptions.  It doesn’t even, for example, prevent clients from easily creating a 2GB string.


There’s more work to be done on the security model to safely host queryable observable services publicly.  Even with whitelisting and sandboxing, the TCP Qbservable Provider is easily vulnerable to simple attacks that could bring down an entire process.

The current security model should, however, prevent clients from directly damaging your system.  Direct damage is prevented by using the sandboxing APIs to create your service, which rejects queries that demand unsafe API permissions.  You should also use semantically-restricted expressions, which are enabled by default, to reject queries attempting to use operators and methods that aren’t permitted by your service’s whitelist.

Tags: ,

IQbservable | Rx | Rxx

April 25, 2012

LINQ to Cloud - IQbservable Over the Wire

UPDATE: The prototype TCP Qbservable Provider library that is described in this article has become the Qactive project on GitHub, including many bug fixes and improvements.

I’ve recently released a TCP Qbservable Provider library within the Rxx project on CodePlex.  It targets the .NET 4.5 Framework and is written entirely in C# 5 and VS 11 Beta.  If stabilized in the future, it will probably be merged into the Rxx library.  It’s currently pre-Alpha, so beware of bugs and don’t expect much consideration for security.  Feedback is welcomed.

In this blog post I’m going to introduce you to the TCP Qbservable Provider library and discuss its key features in some depth.  I'll also include a few working examples at the end.  If you want to try programming the examples yourself, then start by downloading the library and unzip it.  You’ll need to add references to all of the assemblies in the bin folder, though you can download the Rx 2.0 Beta assemblies from NuGet, if you’d prefer.

The applications in the zip file provide more detailed examples, but I won’t be discussing them in this blog post.  I encourage you to try them on your own.  You can download the complete source code, which contains the code for the example applications, the provider library and the Rxx 2.0 Beta library.  (Note that the Rxx 2.0 Beta library has not been officially released yet and is likely to change.)

What does this thing do?

In a nutshell

TCP Qbservable Provider enables you to easily create a TCP web service that offers an IQbservable<T> for clients to query using Rx and LINQ.  You simply choose an end point consisting of an IP address and a port number, then your observable becomes available to clients over TCP.

Clients can query your observable using Rx operators.  Queries can also be written in LINQ’s query comprehension syntax.  When a client subscribes to its query, a connection is made to the service and then the query is serialized and sent to the server, where it is executed.

In case you’re wondering, IQbservable<T> is the model that Rx defines to represent queryable observables.  It’s the dual to queryable enumerables, which is represented by IQueryable<T>.  IQbservable<T> is to IObservable<T> as IQueryable<T> is to IEnumerable<T>.  For a deeper explanation, see this video on Channel 9 by Bart De Smet.

The following diagram illustrates the basic communication process:


The basic process:

  1. The server begins hosting a TCP service at a specific end point, which includes an IP address and a port.
    1. The ServeQbservableTcp extension method is used to serve an IObservable<T> as an IQbservable<T>.
    2. The ServeTcp extension method is used to serve an IQbservable<T> directly.
    3. The factory methods on the QbservableTcpServer class are used to create queries that accept subscription arguments.
  2. The client creates an instance of QbservableTcpClient<T> to represent the service locally.  No call is made to the server yet.
    1. The client must specify a type for T that is compatible with the type of the data in the IQbservable<T> service.
    2. The client must supply the end point, which includes the IP address and port of the service.
    3. The client can optionally configure the service proxy; e.g., to enable duplex communication and specify known types.
  3. The client creates a LINQ query by applying Rx operators to the result of the Query method, either by using fluent method call syntax, query comprehension syntax or some mixture of both.
    1. All Qbservable Rx operators are supported.
    2. Arbitrary code can be written within the query to execute side-effects on the server; e.g., via the Do operator.  Clearly, this can be a security concern, but it can also be quite powerful if used properly.
    3. Anonymous types are supported.  (More on this below.)
    4. Full duplex communication is supported.  (More on this below.)
    5. Optionally, the server can require an argument that the client passes to the Query method, which must be a serializable type that the server expects.
  4. The client eventually calls Subscribe on the query, just like a normal IObservable<T>.  This causes a TCP socket to be opened with the server.
  5. The server receives the subscription request and performs a protocol negotiation with the client (currently the entire negotiation is mocked, but it may be developed in the future to support custom protocols).
  6. The client serializes the query and sends it to the server.
  7. The server deserializes the query and begins executing it.
  8. Observable notifications are pushed to the client asynchronously.

Note that all data is currently serialized using a binary formatter.  This includes expression trees and all notifications, in both directions.

Serialized Expression Trees and Anonymous Types

When you write a query against an IQbservable<T>, the compiler creates an Expression Tree to represent your query at runtime.  Often expression trees contain anonymous types that are generated by the compiler; e.g., to support let statements and closures.  The TCP Qbservable Provider must serialize and send an expression tree to the server to be executed.  Even though the Expression classes in the FCL and the types generated by compilers are not serializable, the TCP Qbservable Provider library enables serialization of Expression trees and anonymous types by automatically swapping them with internal, serializable representations.  All types of expressions are supported except for dynamic and debug info expressions, but I’ll consider supporting those in the future as well.

Expression trees are an internal implementation detail that you don’t need to be aware of to consume and host IQbservable<T> services.

Full Duplex Communication

Full duplex communication is when a client and server communicate in both directions simultaneously over a single connection; i.e., the client sends the server data while the server sends the client data, without connecting to another end point.  Full duplex communication occurs automatically for instances of IObservable<T> within a client’s query, though the server must opt-in to allow it.  Full duplex communication can also occur automatically for local members and IEnumerable<T> objects that are generated by iterator blocks, though a client must opt-in to allow it; otherwise, the default behavior is for local members and IEnumerable<T> iterators found within a query to be evaluated locally and then replaced with constants before the query is sent to the server.

While the server is executing the client’s query, if it comes across any local member, including any static or instance methods, properties or fields, and if full duplex communication has been enabled on both the client and the server, then the server will send an invoke message to the client and wait for the return value, synchronously.  The return value is not cached.  A new message is sent each time a local member must be executed within the query.

For full duplex IEnumerable<T> objects generated by iterator blocks, the server sends a synchronous message to the client to get an enumerator.  Subsequent calls to enumerate the enumerable from within the query (e.g., MoveNext, Reset and Dispose) are sent to the client as synchronous invoke messages, similar to the behavior previously described for local members.

While the server is executing the client’s query, if it comes across an instance of an IObservable<T> that was either a closed-over local variable or the return value of a full duplex local member, and if full duplex communication has been enabled on the server (it is automatic on the client for local instances of IObservable<T>), then the server sends a subscribe message to the client and waits for a subscription response, synchronously.  The client is then free to asynchronously push notifications to the server; e.g., OnNext, OnError and OnCompleted.

A large number of local members, IEnumerable<T> iterators and IObservable<T> objects are supported simultaneously for individual clients.  The maximum is currently bounded by the number of keys that fit into a generic Dictionary<TKey,TValue>, per client, though each category also has its own dictionary.  This means that a given client probably supports over 2 billion unique local members, 2 billion IEnumerable<T> iterator instances and 2 billion IObservable<T> instances.  It’s highly unlikely that local members will ever be a problem given the complexity required to generate a query containing over 2 billion expression objects; however, the latter two are dealing with instances of objects that may be generated by the query as it executes.  Though I doubt these capacity constraints will ever be a problem given the context of network communication, it’s something to consider if, for example, you’ll be writing large multi-layered full duplex SelectMany queries that generate many notifications very fast and must execute uninterrupted for a very long period of time.  It’s possible that you may eventually hit 2 billion observables, but it does sound a bit crazy.

Why should I create an IQbservable<T> service?

There are a few major advantages to using a queryable observable instead of a traditional web service:

It’s Asynchronous

Web services typically process requests synchronously, or are fairly complicated to make asynchronous, but a queryable observable service can easily be entirely asynchronous.  By processing requests asynchronously, the server is free to handle additional requests.  This makes for very scalable services.

It’s Reactive

Web services are not typically reactive, but a queryable observable service can easily be entirely reactive.  Clients don’t have to poll your service.  Instead, the server can determine when to send notifications to clients and clients can concentrate on doing other things.

Furthermore, the timing of notifications can be controlled by various internal factors, such as the semantics of the source or the current load on the server.  This is better than clients hitting the server as fast as possible trying to squeeze out more data that simply isn’t available yet, while the server still tries to process requests in a fair manner.  Though a truly reactive service does, however, require mostly persistent TCP connections to be entirely beneficial in this way, perhaps.

It’s Queryable

Reactive web services are typically blunt data hoses that simply receive a request and then begin pushing data to the client as fast as possible, but a queryable observable service receives the client’s query and processes it on the server, so it’s able to return only the data that the client has requested.  Not only is this better for bandwidth usage and client performance, but it’s also potentially better for server performance and overall throughput because the service can interpret the client’s query to make optimizations on-the-fly.

Imagine a social media service that exposes an IQbservable<PersonSaidSomething>.  A client could send a query to the service asking to receive notifications whenever a particular person says something, instead of receiving notifications whenever anyone says something and filtering them on the client.  Admittedly, this particular example isn’t so impressive considering that most social media services probably offer a simple parameterized option to filter notifications for individual people.  So what about a query that asks the service for notifications whenever any person says something about a particular character on a TV show during the date and time on which that show is airing?  Do any social media services offer parameters for that?  What about a query that asks the service for notifications whenever person A writes something longer than 50 characters, between 5 to 10 minutes after person B says something about a particular product?  You get the idea…

It’s Simple

Web services are quite simple to create nowadays, considering all that ASP.NET, WCF and IIS offer, but queryable observable services are simple too if you’re already familiar with Rx and LINQ.  Don’t be frightened by all of the strange terms and confusing explanations – they’re my fault.  If you’re still confused, then try experimenting with it yourself.  Begin with the examples later in this post.

Hosting an IQbservable<T> Service

A service returns an IQbservable<T> so that clients can write queries against it, but the source doesn’t have to be an IQbservable<T>.  The source can be another IQbservable<T> or just any plain old IObservable<T>.

If the service implementation is an IObservable<T>, then the TCP Qbservable Provider uses the local IQbservableProvider (Qbservable.Provider) on the server to execute queries that it receives from clients.  It happens automatically, so you don’t need to be concerned at all about how your observable is being translated into an IQbservable<T>.  You can thank the Rx team for their fantastic work on that (and of course, Rx in general).

If the service implementation is an IQbservable<T>, then think of the TCP Qbservable Provider as merely a communication/serialization proxy.  It takes your existing query and hosts it over TCP.  Furthermore, writing a custom IQbservableProvider allows you to constrain the Rx operators that clients can use, which is good from a security standpoint.

For example, if you want to expose LINQ to WQL (a.k.a., WMI Events) on a server, then simply call the ServeTcp extension method to host your query over TCP.  If you want to expose a typical IObservable<T> on a server, e.g., Observable.Interval, then simply call the ServeQbservableTcp extension method to host your observable over TCP as an IQbservable<T> service.

When a client queries your service, its query is serialized and sent to the server by the TCP Qbservable Provider running on the client. The TCP Qbservable Provider running on the server deserializes the client’s query and then glues it to your hosted query; e.g., Observable.Interval or LINQ to WQL, etc..  From the perspective of the source observable, it’s as if the client’s query was written within the same process.  When data is pushed from the hosted observable, the TCP Qbservable Provider serializes it to the client. From the perspective of the client, it’s as if the provider is running within the same process.

Consuming an IQbservable<T> Service

Clients consume web services by creating proxies that encapsulate all of the communication logic.  A queryable observable service is no different.

A client begins by creating an instance of QbservableTcpClient<T>, which represents a single queryable observable service end point.  This object exposes an IQbservable<T> for the service by calling its Query method.  At this point, the client is able to write a LINQ query and then call Subscribe to initiate communication.  All of this is covered earlier in this blog post, in the section about the basic communication process.

Alright, let’s do some coding…


Add the code from each of the following examples to the Main methods of new console application projects in VS 11 Beta, targeting the .NET 4.5 Framework.  I recommend creating separate console application projects for the server and client examples.

Add the following using directives to the top of each file.  You may need to include others as well, depending upon the example.

using System;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using QbservableProvider;

As mentioned earlier, you’ll need to add references to all of the assemblies in the bin folder of the zip file you’ve downloaded, though the Rx 2.0 Beta assemblies can be downloaded from NuGet, if you’d prefer.

Example 1: Timer

Let’s start out easy and create a simple queryable observable timer service.  Here’s the spec:

  1. When a client subscribes, start a 2 second timer on the server.
  2. When the timer elapses on the server, send a notification to the client.
  3. The client’s connection persists until either the client or the server cancels the query, or the query completes.
IObservable<long> source = Observable.Timer(TimeSpan.FromSeconds(2));

var service = source.ServeQbservableTcp(new IPEndPoint(IPAddress.Loopback, port: 49593));

using (service.SubscribeEither(
    client => Console.WriteLine("Timer service acknowledged client shutdown."),
    ex => Console.WriteLine("Timer service error: " + ex.Message),
    ex => Console.WriteLine("Timer service fatal error: " + ex.Message),
    () => Console.WriteLine("This will never be printed because a service host never completes.")))

Now let’s create our first client.  Here’s the spec:

  1. Subscribe to the service that we’ve just created.
  2. Print the timer notification from the service.
var client = new QbservableTcpClient<long>(new IPEndPoint(IPAddress.Loopback, port: 49593));

IQbservable<long> query =
    from value in client.Query()
    select value;

using (query.Subscribe(
    value => Console.WriteLine("Timer client observed: " + value),
    ex => Console.WriteLine("Timer client error: " + ex.Message),
    () => Console.WriteLine("Timer client completed")))

To test these examples, make sure that you run the server application first.  Then run the client application.

After 2 seconds the client application prints the following:

Timer client observed: 0
Timer client completed

And the server application prints:

Timer service acknowledged client shutdown.

That example was boring, I know.

Example 2: Confirm the location

Let’s add a side-effect to the query so that we can see where the observable is running.  Here’s the new client:

IQbservable<long> query =
    from value in client.Query().Do(_ => Console.WriteLine("Where am I?"))
    select value;

After 2 seconds the client’s output is the same as before, but the server application now prints the following:

Where am I?
Timer service acknowledged client shutdown.

So it’s clear that our query was running on the server.  But what’s really awesome about this query is that the Do operator executed its lambda expression on the server!  Essentially, we told the server to write something to its own console window from within the query sent by the client.

Example 3: A more interesting client

Now that we know we can have the server do anything that we want, let’s have it download a web page for us.  Here’s the spec:

  1. Subscribe to the service that we’ve just created using the following query:
    1. When the service generates a timer notification, download the web page from
    2. When the web page is downloaded, send its length to the client.
  2. Print the value received from the service.

Our goal here is to execute the entire query on the server, including downloading the web page.  We know how to verify that the entire query is executing on the server by using the Do operator, so I’ll add that to the end of the query.

IQbservable<int> query =
    (from value in client.Query()
     from page in new WebClient().DownloadStringTaskAsync(new Uri(""))
     select page.Length)
    .Do(result => Console.WriteLine("Where am I? " + result));

Notice that the type of the query has changed from IQbservable<long> to IQbservable<int>.  We’re using the select statement to project a different type of value from the service.

After 2 seconds the client application prints the following (of course, the value may be different in your tests):

Timer client observed: 110429
Timer client completed

And the server prints:

Where am I?  110429
Timer service acknowledged client shutdown.

The server has downloaded a web page and sent back the length of the page to the client.

Example 4: Service arguments

Let’s modify the service to accept the timer’s duration as an argument.  That way the client can control the delay before the web page is downloaded on the server.

To accept an argument, we’ve got to make a slight change to the way in which we’re creating the service.  Instead of simply using the ServeQbservableTcp extension method on our timer observable, we’ll need some way of receiving the argument before we can create the observable.   The QbservableTcpServer class provides static factory methods for creating services based on a function that accepts an IObservable<T>, where T is the type of the argument that the service expects, and returns an IObservable<U> that represents the service.  It’s quite natural to express the concept of clients subscribing to our service with arguments, sometime in the future, as an IObservable<T>.  So we’ll define our service beginning with an IObservable<T> that represents client subscriptions.  I’ll name the lambda parameter that represents this observable as request.

var service = QbservableTcpServer.CreateService<TimeSpan, long>(
    new IPEndPoint(IPAddress.Loopback, port: 49593),
    request =>
        from duration in request.Do(arg => Console.WriteLine("Client sent arg: " + arg))
        from value in Observable.Timer(duration)
        select value);

Notice that we must specify the input and output types of the query when we call the generic CreateService<TSource, TResult> method.  I’ve also added the Do operator to print out the argument that is received from the client.

Now let’s update our client to pass in a duration for the timer.

IQbservable<int> query =
    (from value in client.Query(TimeSpan.FromSeconds(5))
     from page in new WebClient().DownloadStringTaskAsync(new Uri(""))
     select page.Length)
    .Do(result => Console.WriteLine("Where am I? " + result));

Notice that I’m passing in the TimeSpan argument to the Query method.  The rest of the query is unchanged.

The server immediately prints the following when the client application starts:

Client sent arg: 00:00:05

And then after 5 seconds the client and server print the same results as before.

For much more advanced examples, including local member evaluation, anonymous types, full duplex communication of observables and iterator blocks, see the applications that are included in the download.  Grab the complete source code for the applications and the TCP Qbservable Provider library from the Rxx project on CodePlex.


I’ve barely scratched the surface of possibilities offered by the TCP Qbservable Provider service model.  This blog post only provides a high level view with extremely primitive examples.  I hope it has whet your appetite and you’re interested in using it in your apps.

I should also mention that the Rx team has done an amazing job with the core Rx libraries, and the IQbservable<T> stuff is in another league of its own.  I’ve also seen Bart mention a couple of times in the Rx forum that his team is investing more in the IQbservable space regarding remote queries and serializable expression trees, so I’m really curious to see what they’ve done and when/if it will be released.

In the mean time, if you’re interested in developing applications using the TCP Qbservable Provider library, then please let me know.  I enjoyed working on it as a proof-of-concept and I plan to invest more time stabilizing it, unless the Rx team releases something similar.  It will help me to prioritize knowing that people are actually using it.

Your feedback can help guide the direction of this project, so please report bugs and ideas to the Rxx project on CodePlex.  Thanks!

Tags: ,

IQbservable | Rx | Rxx