UPDATE: The prototype TCP Qbservable Provider library that is described in this article has become the Qactive project on GitHub, including many bug fixes and improvements.
I’ve recently released a TCP Qbservable Provider library within the Rxx project on CodePlex. It targets the .NET 4.5 Framework and is written entirely in C# 5 and VS 11 Beta. If stabilized in the future, it will probably be merged into the Rxx library. It’s currently pre-Alpha, so beware of bugs and don’t expect much consideration for security. Feedback is welcomed.
In this blog post I’m going to introduce you to the TCP Qbservable Provider library and discuss its key features in some depth. I'll also include a few working examples at the end. If you want to try programming the examples yourself, then start by downloading the library and unzip it. You’ll need to add references to all of the assemblies in the bin folder, though you can download the Rx 2.0 Beta assemblies from NuGet, if you’d prefer.
The applications in the zip file provide more detailed examples, but I won’t be discussing them in this blog post. I encourage you to try them on your own. You can download the complete source code, which contains the code for the example applications, the provider library and the Rxx 2.0 Beta library. (Note that the Rxx 2.0 Beta library has not been officially released yet and is likely to change.)
What does this thing do?
In a nutshell
TCP Qbservable Provider enables you to easily create a TCP web service that offers an IQbservable<T> for clients to query using Rx and LINQ. You simply choose an end point consisting of an IP address and a port number, then your observable becomes available to clients over TCP.
Clients can query your observable using Rx operators. Queries can also be written in LINQ’s query comprehension syntax. When a client subscribes to its query, a connection is made to the service and then the query is serialized and sent to the server, where it is executed.
In case you’re wondering, IQbservable<T> is the model that Rx defines to represent queryable observables. It’s the dual to queryable enumerables, which is represented by IQueryable<T>. IQbservable<T> is to IObservable<T> as IQueryable<T> is to IEnumerable<T>. For a deeper explanation, see this video on Channel 9 by Bart De Smet.
The following diagram illustrates the basic communication process:
The basic process:
- The server begins hosting a TCP service at a specific end point, which includes an IP address and a port.
- The ServeQbservableTcp extension method is used to serve an IObservable<T> as an IQbservable<T>.
- The ServeTcp extension method is used to serve an IQbservable<T> directly.
- The factory methods on the QbservableTcpServer class are used to create queries that accept subscription arguments.
- The client creates an instance of QbservableTcpClient<T> to represent the service locally. No call is made to the server yet.
- The client must specify a type for T that is compatible with the type of the data in the IQbservable<T> service.
- The client must supply the end point, which includes the IP address and port of the service.
- The client can optionally configure the service proxy; e.g., to enable duplex communication and specify known types.
- The client creates a LINQ query by applying Rx operators to the result of the Query method, either by using fluent method call syntax, query comprehension syntax or some mixture of both.
- All Qbservable Rx operators are supported.
- Arbitrary code can be written within the query to execute side-effects on the server; e.g., via the Do operator. Clearly, this can be a security concern, but it can also be quite powerful if used properly.
- Anonymous types are supported. (More on this below.)
- Full duplex communication is supported. (More on this below.)
- Optionally, the server can require an argument that the client passes to the Query method, which must be a serializable type that the server expects.
- The client eventually calls Subscribe on the query, just like a normal IObservable<T>. This causes a TCP socket to be opened with the server.
- The server receives the subscription request and performs a protocol negotiation with the client (currently the entire negotiation is mocked, but it may be developed in the future to support custom protocols).
- The client serializes the query and sends it to the server.
- The server deserializes the query and begins executing it.
- Observable notifications are pushed to the client asynchronously.
Note that all data is currently serialized using a binary formatter. This includes expression trees and all notifications, in both directions.
Serialized Expression Trees and Anonymous Types
When you write a query against an IQbservable<T>, the compiler creates an Expression Tree to represent your query at runtime. Often expression trees contain anonymous types that are generated by the compiler; e.g., to support let statements and closures. The TCP Qbservable Provider must serialize and send an expression tree to the server to be executed. Even though the Expression classes in the FCL and the types generated by compilers are not serializable, the TCP Qbservable Provider library enables serialization of Expression trees and anonymous types by automatically swapping them with internal, serializable representations. All types of expressions are supported except for dynamic and debug info expressions, but I’ll consider supporting those in the future as well.
Expression trees are an internal implementation detail that you don’t need to be aware of to consume and host IQbservable<T> services.
Full Duplex Communication
Full duplex communication is when a client and server communicate in both directions simultaneously over a single connection; i.e., the client sends the server data while the server sends the client data, without connecting to another end point. Full duplex communication occurs automatically for instances of IObservable<T> within a client’s query, though the server must opt-in to allow it. Full duplex communication can also occur automatically for local members and IEnumerable<T> objects that are generated by iterator blocks, though a client must opt-in to allow it; otherwise, the default behavior is for local members and IEnumerable<T> iterators found within a query to be evaluated locally and then replaced with constants before the query is sent to the server.
While the server is executing the client’s query, if it comes across any local member, including any static or instance methods, properties or fields, and if full duplex communication has been enabled on both the client and the server, then the server will send an invoke message to the client and wait for the return value, synchronously. The return value is not cached. A new message is sent each time a local member must be executed within the query.
For full duplex IEnumerable<T> objects generated by iterator blocks, the server sends a synchronous message to the client to get an enumerator. Subsequent calls to enumerate the enumerable from within the query (e.g., MoveNext, Reset and Dispose) are sent to the client as synchronous invoke messages, similar to the behavior previously described for local members.
While the server is executing the client’s query, if it comes across an instance of an IObservable<T> that was either a closed-over local variable or the return value of a full duplex local member, and if full duplex communication has been enabled on the server (it is automatic on the client for local instances of IObservable<T>), then the server sends a subscribe message to the client and waits for a subscription response, synchronously. The client is then free to asynchronously push notifications to the server; e.g., OnNext, OnError and OnCompleted.
A large number of local members, IEnumerable<T> iterators and IObservable<T> objects are supported simultaneously for individual clients. The maximum is currently bounded by the number of keys that fit into a generic Dictionary<TKey,TValue>, per client, though each category also has its own dictionary. This means that a given client probably supports over 2 billion unique local members, 2 billion IEnumerable<T> iterator instances and 2 billion IObservable<T> instances. It’s highly unlikely that local members will ever be a problem given the complexity required to generate a query containing over 2 billion expression objects; however, the latter two are dealing with instances of objects that may be generated by the query as it executes. Though I doubt these capacity constraints will ever be a problem given the context of network communication, it’s something to consider if, for example, you’ll be writing large multi-layered full duplex SelectMany queries that generate many notifications very fast and must execute uninterrupted for a very long period of time. It’s possible that you may eventually hit 2 billion observables, but it does sound a bit crazy.
Why should I create an IQbservable<T> service?
There are a few major advantages to using a queryable observable instead of a traditional web service:
It’s Asynchronous
Web services typically process requests synchronously, or are fairly complicated to make asynchronous, but a queryable observable service can easily be entirely asynchronous. By processing requests asynchronously, the server is free to handle additional requests. This makes for very scalable services.
It’s Reactive
Web services are not typically reactive, but a queryable observable service can easily be entirely reactive. Clients don’t have to poll your service. Instead, the server can determine when to send notifications to clients and clients can concentrate on doing other things.
Furthermore, the timing of notifications can be controlled by various internal factors, such as the semantics of the source or the current load on the server. This is better than clients hitting the server as fast as possible trying to squeeze out more data that simply isn’t available yet, while the server still tries to process requests in a fair manner. Though a truly reactive service does, however, require mostly persistent TCP connections to be entirely beneficial in this way, perhaps.
It’s Queryable
Reactive web services are typically blunt data hoses that simply receive a request and then begin pushing data to the client as fast as possible, but a queryable observable service receives the client’s query and processes it on the server, so it’s able to return only the data that the client has requested. Not only is this better for bandwidth usage and client performance, but it’s also potentially better for server performance and overall throughput because the service can interpret the client’s query to make optimizations on-the-fly.
Imagine a social media service that exposes an IQbservable<PersonSaidSomething>. A client could send a query to the service asking to receive notifications whenever a particular person says something, instead of receiving notifications whenever anyone says something and filtering them on the client. Admittedly, this particular example isn’t so impressive considering that most social media services probably offer a simple parameterized option to filter notifications for individual people. So what about a query that asks the service for notifications whenever any person says something about a particular character on a TV show during the date and time on which that show is airing? Do any social media services offer parameters for that? What about a query that asks the service for notifications whenever person A writes something longer than 50 characters, between 5 to 10 minutes after person B says something about a particular product? You get the idea…
It’s Simple
Web services are quite simple to create nowadays, considering all that ASP.NET, WCF and IIS offer, but queryable observable services are simple too if you’re already familiar with Rx and LINQ. Don’t be frightened by all of the strange terms and confusing explanations – they’re my fault. If you’re still confused, then try experimenting with it yourself. Begin with the examples later in this post.
Hosting an IQbservable<T> Service
A service returns an IQbservable<T> so that clients can write queries against it, but the source doesn’t have to be an IQbservable<T>. The source can be another IQbservable<T> or just any plain old IObservable<T>.
If the service implementation is an IObservable<T>, then the TCP Qbservable Provider uses the local IQbservableProvider (Qbservable.Provider) on the server to execute queries that it receives from clients. It happens automatically, so you don’t need to be concerned at all about how your observable is being translated into an IQbservable<T>. You can thank the Rx team for their fantastic work on that (and of course, Rx in general).
If the service implementation is an IQbservable<T>, then think of the TCP Qbservable Provider as merely a communication/serialization proxy. It takes your existing query and hosts it over TCP. Furthermore, writing a custom IQbservableProvider allows you to constrain the Rx operators that clients can use, which is good from a security standpoint.
For example, if you want to expose LINQ to WQL (a.k.a., WMI Events) on a server, then simply call the ServeTcp extension method to host your query over TCP. If you want to expose a typical IObservable<T> on a server, e.g., Observable.Interval, then simply call the ServeQbservableTcp extension method to host your observable over TCP as an IQbservable<T> service.
When a client queries your service, its query is serialized and sent to the server by the TCP Qbservable Provider running on the client. The TCP Qbservable Provider running on the server deserializes the client’s query and then glues it to your hosted query; e.g., Observable.Interval or LINQ to WQL, etc.. From the perspective of the source observable, it’s as if the client’s query was written within the same process. When data is pushed from the hosted observable, the TCP Qbservable Provider serializes it to the client. From the perspective of the client, it’s as if the provider is running within the same process.
Consuming an IQbservable<T> Service
Clients consume web services by creating proxies that encapsulate all of the communication logic. A queryable observable service is no different.
A client begins by creating an instance of QbservableTcpClient<T>, which represents a single queryable observable service end point. This object exposes an IQbservable<T> for the service by calling its Query method. At this point, the client is able to write a LINQ query and then call Subscribe to initiate communication. All of this is covered earlier in this blog post, in the section about the basic communication process.
Alright, let’s do some coding…
Examples
Add the code from each of the following examples to the Main methods of new console application projects in VS 11 Beta, targeting the .NET 4.5 Framework. I recommend creating separate console application projects for the server and client examples.
Add the following using directives to the top of each file. You may need to include others as well, depending upon the example.
using System;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using QbservableProvider;
As mentioned earlier, you’ll need to add references to all of the assemblies in the bin folder of the zip file you’ve downloaded, though the Rx 2.0 Beta assemblies can be downloaded from NuGet, if you’d prefer.
Example 1: Timer
Let’s start out easy and create a simple queryable observable timer service. Here’s the spec:
- When a client subscribes, start a 2 second timer on the server.
- When the timer elapses on the server, send a notification to the client.
- The client’s connection persists until either the client or the server cancels the query, or the query completes.
IObservable<long> source = Observable.Timer(TimeSpan.FromSeconds(2));
var service = source.ServeQbservableTcp(new IPEndPoint(IPAddress.Loopback, port: 49593));
using (service.SubscribeEither(
client => Console.WriteLine("Timer service acknowledged client shutdown."),
ex => Console.WriteLine("Timer service error: " + ex.Message),
ex => Console.WriteLine("Timer service fatal error: " + ex.Message),
() => Console.WriteLine("This will never be printed because a service host never completes.")))
{
Console.ReadKey();
}
Now let’s create our first client. Here’s the spec:
- Subscribe to the service that we’ve just created.
- Print the timer notification from the service.
var client = new QbservableTcpClient<long>(new IPEndPoint(IPAddress.Loopback, port: 49593));
IQbservable<long> query =
from value in client.Query()
select value;
using (query.Subscribe(
value => Console.WriteLine("Timer client observed: " + value),
ex => Console.WriteLine("Timer client error: " + ex.Message),
() => Console.WriteLine("Timer client completed")))
{
Console.ReadKey();
}
To test these examples, make sure that you run the server application first. Then run the client application.
After 2 seconds the client application prints the following:
Timer client observed: 0
Timer client completed
And the server application prints:
Timer service acknowledged client shutdown.
That example was boring, I know.
Example 2: Confirm the location
Let’s add a side-effect to the query so that we can see where the observable is running. Here’s the new client:
IQbservable<long> query =
from value in client.Query().Do(_ => Console.WriteLine("Where am I?"))
select value;
After 2 seconds the client’s output is the same as before, but the server application now prints the following:
Where am I?
Timer service acknowledged client shutdown.
So it’s clear that our query was running on the server. But what’s really awesome about this query is that the Do operator executed its lambda expression on the server! Essentially, we told the server to write something to its own console window from within the query sent by the client.
Example 3: A more interesting client
Now that we know we can have the server do anything that we want, let’s have it download a web page for us. Here’s the spec:
- Subscribe to the service that we’ve just created using the following query:
- When the service generates a timer notification, download the web page from http://blogs.msdn.com/b/rxteam.
- When the web page is downloaded, send its length to the client.
- Print the value received from the service.
Our goal here is to execute the entire query on the server, including downloading the web page. We know how to verify that the entire query is executing on the server by using the Do operator, so I’ll add that to the end of the query.
IQbservable<int> query =
(from value in client.Query()
from page in new WebClient().DownloadStringTaskAsync(new Uri("http://blogs.msdn.com/b/rxteam"))
select page.Length)
.Do(result => Console.WriteLine("Where am I? " + result));
Notice that the type of the query has changed from IQbservable<long> to IQbservable<int>. We’re using the select statement to project a different type of value from the service.
After 2 seconds the client application prints the following (of course, the value may be different in your tests):
Timer client observed: 110429
Timer client completed
And the server prints:
Where am I? 110429
Timer service acknowledged client shutdown.
The server has downloaded a web page and sent back the length of the page to the client.
Example 4: Service arguments
Let’s modify the service to accept the timer’s duration as an argument. That way the client can control the delay before the web page is downloaded on the server.
To accept an argument, we’ve got to make a slight change to the way in which we’re creating the service. Instead of simply using the ServeQbservableTcp extension method on our timer observable, we’ll need some way of receiving the argument before we can create the observable. The QbservableTcpServer class provides static factory methods for creating services based on a function that accepts an IObservable<T>, where T is the type of the argument that the service expects, and returns an IObservable<U> that represents the service. It’s quite natural to express the concept of clients subscribing to our service with arguments, sometime in the future, as an IObservable<T>. So we’ll define our service beginning with an IObservable<T> that represents client subscriptions. I’ll name the lambda parameter that represents this observable as request.
var service = QbservableTcpServer.CreateService<TimeSpan, long>(
new IPEndPoint(IPAddress.Loopback, port: 49593),
request =>
from duration in request.Do(arg => Console.WriteLine("Client sent arg: " + arg))
from value in Observable.Timer(duration)
select value);
Notice that we must specify the input and output types of the query when we call the generic CreateService<TSource, TResult> method. I’ve also added the Do operator to print out the argument that is received from the client.
Now let’s update our client to pass in a duration for the timer.
IQbservable<int> query =
(from value in client.Query(TimeSpan.FromSeconds(5))
from page in new WebClient().DownloadStringTaskAsync(new Uri("http://blogs.msdn.com/b/rxteam"))
select page.Length)
.Do(result => Console.WriteLine("Where am I? " + result));
Notice that I’m passing in the TimeSpan argument to the Query method. The rest of the query is unchanged.
The server immediately prints the following when the client application starts:
Client sent arg: 00:00:05
And then after 5 seconds the client and server print the same results as before.
For much more advanced examples, including local member evaluation, anonymous types, full duplex communication of observables and iterator blocks, see the applications that are included in the download. Grab the complete source code for the applications and the TCP Qbservable Provider library from the Rxx project on CodePlex.
Conclusion
I’ve barely scratched the surface of possibilities offered by the TCP Qbservable Provider service model. This blog post only provides a high level view with extremely primitive examples. I hope it has whet your appetite and you’re interested in using it in your apps.
I should also mention that the Rx team has done an amazing job with the core Rx libraries, and the IQbservable<T> stuff is in another league of its own. I’ve also seen Bart mention a couple of times in the Rx forum that his team is investing more in the IQbservable space regarding remote queries and serializable expression trees, so I’m really curious to see what they’ve done and when/if it will be released.
In the mean time, if you’re interested in developing applications using the TCP Qbservable Provider library, then please let me know. I enjoyed working on it as a proof-of-concept and I plan to invest more time stabilizing it, unless the Rx team releases something similar. It will help me to prioritize knowing that people are actually using it.
Your feedback can help guide the direction of this project, so please report bugs and ideas to the Rxx project on CodePlex. Thanks!