June 04, 2013

Bye Google Reader, Hello Windows RSS Platform

With the impending demise of iGoogle and Google Reader, which have been set to my web browser's home page for several years now, I had recently come to the realization that I was going to open IE one day and see an error message where iGoogle used to be.  All of my subscribed RSS feeds would be gone.

I did some research to see what kinds of replacements were out there.  For me, RSS feeds provide topics of particular interest that may be worthy of exploring on the web, and my web browser's home page is the gateway to the web; therefore, I wasn't interested in using feed-reader software that runs outside of my browser.  Feeds contain hyperlinks that lead me into the browser, so I might as well read my feeds right on the home page.

It seemed appropriate to begin my research by revisiting the Feeds tab in IE 10.  I had thought perhaps there have been improvements that could make it a viable replacement for Google Reader.  It didn't take long before I realized why I had chosen Google Reader several years ago and never switched to IE's built-in RSS feed support.  Alas, since I have no choice now but to change, I decided to research deeper into this feature of IE and see whether it may be worth using.  There were a few things I really liked about it:

  1. IE automatically detects feeds on web pages and lets you subscribe to them easily via a button on its Command Bar.
  2. IE opens feeds in a special reader allowing you to easily search, sort and filter.  It also provides a link to quickly subscribe or unsubscribe.
  3. IE lets you easily organize your feeds into folders.

I'm sure you'd agree that the integrated experience is a huge plus; however, IE also has a few negatives:

  1. IE doesn't provide a "What's New?" kind of view; i.e., an aggregated view containing all of the latest unread items among all feeds.
  2. IE doesn't provide a way to quickly view a list of items in a selected feed without showing their contents.
  3. IE doesn't provide a way to mark a single item as read/unread.

So it seems that IE has a great story for subscribing to and organizing feeds, though it really lacks in support for interacting with feeds.  The latter is extremely important to me, which is why I had originally chosen iGoogle as my home page for its Google Reader widget.

But the story doesn't end there.  Digging deeper I discovered that IE builds its RSS features on top of a public Windows API called the Windows RSS Platform.   IE merely provides a GUI containing only a subset of the functionality offered by this platform.  That was great news!  I decided to write my own web page against this API.  I could set it as my home page and interact with my RSS feeds easily, similar to the Google Reader experience, while taking advantage of the integration and organizational features built-in to IE.

These were my primary goals:

  1. Implement a stand-alone HTML page.  No server, no external scripts, no external style sheets, no external images.
  2. Target HTML5 in IE 10 (desktop mode only) for Windows 8.
  3. Touch input support.
  4. Use JavaScript to interact with the Windows RSS Platform APIs via Window's built-in ActiveX automation.
  5. Display 3 synchronized panels:
    • Tree view shows all feeds organized into folders, similar to the Feeds tab in IE.
    • Aggregate view containing all of the latest unread items; i.e., "What's New?"
    • Content viewer for the selected item.

I'm happy to report that my project was a success.  I worked on it all last weekend.  I even added several usability features that will hopefully make it appealing to others.  It relies heavily on jQuery, which is minified and embedded into the file.

To the right, you'll see a screenshot of the page in touch-screen mode.

Editing and organizational features weren't required thankfully due to IE's existing integration; e.g., the feed button on the Command Bar and the Feeds tab in the so-called Favorites Center provide all that is needed to subscribe and unsubscribe feeds, organize feeds into folders and configure feed update intervals and cache settings.  Once you've subscribed to a feed in IE, my page will automatically show it and aggregate the latest items into the center view.  To update the page, simply refresh it by pressing F5.

Download the finished product (as is, without warranty, use at your own risk, etc. etc.) from the following link.  It's a single minified HTML file named "rss.htm", approx. 108 KB in size.  Feel free to link to this blog post, but please do not link to the file directly and do not host the file publicly.

(EDIT: This file is now part of an open source project named RubyFeed.  I've changed the URL below to point to the v1.0 release of RubyFeed, which corresponds to the exact version of the file that was previously linked here.  However, I recommend downloading the latest release instead.)


(MD5 = e7d93aab06276614b917ea68369c1ac9)

Please let me know if you find it useful.  If you have any ideas for new features or would like to report bugs then let me know in the comments of this post.  I'm also considering publishing the project as open source if anyone's interested in contributing to it.


No installation is required.  If you're running Windows 8 and IE 10 then all of the components that you need are already installed on your computer.

The simplest way to get started is to copy the HTML file anywhere onto your computer and then double-click it (assuming that IE is configured as your default web browser).  You can also set it as your home page using the file:/// protocol.  For example, if you copy the file to your C: drive then you could set your home page to file:///c:\rss.htm.

WARNING: Since the page uses ActiveX you may have to deal with some security restrictions in IE, which may cause annoying prompts or, in the worst case scenario, IE will entirely block the page from loading.  Unfortunately, I haven't found a way to avoid the prompts when loading the file directly from the local file system with the file:/// protocol.

Alternatively, the best solution for me was to host the page locally and relax some of the security requirements in the Local Intranet security zone.  I was able to set my homepage to http://localhost/ and I'm no longer prompted with any security warnings.  Since I'm already running IIS locally, this was a piece of cake - it was as simple as copying the file to my root web directory and renaming it to default.htm.  Hunting down the appropriate security settings in IE was a bit more complicated, so I embedded the instructions directly into the file itself.  Open the page and when prompted with an ActiveX security warning simply choose to disallow the control to load; after a second or two the page will provide step-by-step instructions on how to disable the prompts.  Note that you must not disable scripts from running; otherwise, the instructions won't be shown.


The page references a favicon.ico file in the same directory, though it's not required to actually exist.  If it does exist, then IE will display the icon when you browse to the page.  Feel free to create your own icon if you want.  Name it favicon.ico and copy it to the same directory as the HTML page.

The page also contains several configurable settings.  Open the file in a text editor such as notepad and look at the first JavaScript block near the top of the file.  It defines several variables that you can change to suit your needs, just don't delete any of them.

Happy RSS surfing!

Tags: ,

newsgroups | Web | WWW

November 23, 2012

Visual Studio Settings Switcher

I just published the first release of my Settings Switcher extension for Visual Studio 2012 on CodePlex.

Yesterday, I was looking for a quick solution to switch between Visual Studio settings files (.vssettings) without using the Tools > Import and Export Settings… dialog.  The dialog’s great, but it’s a bit slow to use a wizard every time I want to switch settings.  Sometimes I like to jump back and forth between different solutions quickly during the day, like when I’m working on various open source projects with different code formatting requirements.  I wanted a quicker way to load settings after opening a solution.

I found this blog post, which had a link to a tip on Sara Ford’s blog showing how easy it is to create a command button in the IDE to switch between settings by writing a simple macro.

Unfortunately, Visual Studio 2012 doesn’t support macros.

So I decided to create a Visual Studio extension.  The original premise was simply to show a drop-down list containing all of the .vssettings files and when one is selected those settings are applied.  That turned out to be a useful feature of the extension; however, while I was developing it I realized that I could add a few more useful features.  For example, I added a button to format every code file in the solution according to the currently selected settings.

But the primary feature, IMO, is that when you close a solution or exit Visual Studio, the extension saves a reference to the current settings file in the solution’s user options file (.suo).  The next time that you open that solution, the associated settings are automatically applied by the extension.  “Automate everything” is my motto :)

This extension should be useful to open source developers.  Typically we work on projects across multiple teams, with different code formatting requirements.  The next time you fork a project you can simply open the solution, edit your settings in the normal Tools > Options… dialog to meet the requirements of the team, then export the settings to a new file using the Export Current Settings button on the Settings Switcher toolbar provided by the extension.  And that’s it!  Every time you open that particular solution, its associated settings are applied automatically.

Project coordinators should also consider checking in a .vssettings file to their repositories.  This will make it easier for interested developers to set up their environment for working on your project.  A developer would simply need to copy the .vssettings file that you provided to the directory that Visual Studio uses to export settings files on their system (Tools > Options > Environment > Import and Export Settings).  Then follow the instructions above to associate the settings with the solution.

I’ve got some additional features that I plan on adding in the future, time permitting.  Some of them are very community-oriented.  We’ll see how that goes.

Feedback is appreciated.  Feel free to request a feature, report a bug or start a discussion.


November 07, 2012

Open Reactive Extensions (Rx)

It’s time to start thinking reactively!  What have you been waiting for?

Maybe you were waiting for this:
Yesterday, Microsoft announced that Reactive Extensions (Rx) is now an open source project.  The complete source code for Rx 2.0 is being hosted at http://rx.codeplex.com under the Apache License 2.0.  After an extensive prerelease period and two “officially” released versions, Rx is finally being opened up to the community for a look inside the monadic machinery that helps developers to compose asynchronous queries with ease.  Not only that, but now you can submit bug fixes and implement features yourself.*

Maybe you were waiting for this:
Interactive Extensions (Ix) for .NET and JavaScript are open as well.  They are included under the same source control root as Rx.  I’ve decided to form a habit:  Whenever I create a new project, before I even write a single line of code, I’ll add references to the Rx Package and Ix Package from NuGet.  (Note:  I just started a discussion about the future of distribution, so those links may change.)

Maybe you were waiting for this:
The Rx Team has included brand new libraries to support C/C++.  They are aptly named Rx++ and Ix++.

Or maybe this:
The Rx Team has included new “Rx binding” examples, which are basically just specialized examples illustrating how to apply Rx and LINQ to specific domains.  They’ve included Tx, which is a set of code samples showing how to use LINQ to Events; e.g., real-time queries from trace logs for ETW, Windows Events and SQL Server Extended Events.  Another Rx binding example they’ve included is LINQ2Charts, which “allows developers to use LINQ to create/change/update charts in an easy way and avoid having to deal with XML or other underneath data structures”.

They’re also looking for new examples of Rx bindings, so let them know if you’ve got any.

Or this?
Rx already has an open source community building libraries and tools around it.  And now that Rx itself is open source, that should help to strengthen the existing open source Rx community by exposing it to more developers.  So even if you can’t/won’t contribute code directly to Rx, either due to complexity or legal constraints or whatever, then at least consider contributing code and/or feedback to one of the existing open source projects that enhance Rx.

Seriously, are you still considering how to develop asynchronous applications without Rx?  Are you also considering how to simplify data extraction from a SQL Server table without using SQL?  Or concurrently poke your eyes out with the pointy ends of a Metro-style button?

My (Additional) Impressions

A primary key to success for any open source library is to be extremely useful.  Another key to success is to be unique.  Rx is both, so I have no doubt that it will flourish in the open source world.

Rx goes a long way to make asynchronous programming easy.  It’s a solid library built around core principles that hides much of the complexity of controlling and coordinating asynchrony within any kind of application.  Opening it will help to lower the learning curve and increase the adoption rate of this amazing library, enabling developers to create complex asynchronous queries with relative ease and without any spaghetti code left over.  If asynchronous spaghetti code were a disease, Rx is the cure.

The more functionality a library offers, the more it becomes imperative to be able to peruse the source code.  Understanding user error and library bugs can be very difficult without source code, especially given the inherent complexity of asynchronous programming and the many factors of Rx’s public contracts, which must be considered while developing and debugging.  Capturing every last detail about Rx operators in documentation may be overly cumbersome or even impossible.  Opening Rx can help to make building and debugging complex asynchronous queries much easier.

Opening Rx allows the community to be less entitled and more empowered.  Developers using Rx already identify bugs and request features by posting them in the Rx forum, so it makes perfect sense to allow them to fix bugs and implement features themselves in a timely manner and share their work directly with the community.

Opening Rx to the community will help to tighten their relationship.  It will help to focus the flow of feedback to Microsoft, enabling more informed decisions about the shape of features and the future of Rx.

I’ve noticed the trend at Microsoft to move very useful products into open source projects on CodePlex.  I’ve been wondering whether Rx would take this route as well.  There are many great uses for Rx and its reach will certainly grow even more as an open source project.  The communities around Microsoft’s many open source projects are continuously growing and Rx certainly deserves to benefit from community involvement as much as any other product.

*After jumping through a legal hoop or two.  Read the Rx project home page for details.

Tags: ,

Open Source | Rx

May 05, 2012

TCP Qbservable Provider Security

UPDATE: The prototype TCP Qbservable Provider library that is described in this article has become the Qactive project on GitHub, including many bug fixes and improvements.

This is the second post in a series of posts on my TCP Qbservable Provider library, which is a proof-of-concept library that enables easy hosting of queryable observable services over TCP.  In this post, we dive into security.

There are two kinds of security that I believe must be fully supported by any IQbservable Provider to be a viable option for hosting public web services that can accept queries from anonymous clients:

  1. Semantic security
  2. Resource security

I’ve recently checked into source control new security features that address these issues.  More work needs to be done before the next release, but it’s a good start.  I’ve also included new examples in the example applications: SandboxedService, LimitedService and MaliciousClient.  These new examples are explained throughout this post.

Note that the new restricted security model is enabled by default for all hosted queries.  You must opt-out of the default security model by setting specific properties and by using a specific API to host your service.  The details are explained within this post.

Semantic Security

Semantic security is about limiting the kinds of queries that a service will execute to a minimal set that makes sense for the kind of service being offered.  It can be broken down further into two subcategories:

  1. Operators and Methods
  2. Types of Expressions

Operators and Methods

Semantic security limits the kinds of operators and methods that the service permits.  Clients can technically send any kind of operators that they want – you can’t prevent anonymous clients from sending strange (or even malicious) queries; however, the service must reject queries that don’t make sense based on the operators used within the queries to prevent the server from executing code that is unrelated to or just plain wrong for the service being offered.

For example, imagine that you’ve written a service that pushes news to clients: IQbservable<Article>.  Without semantic security, the service will happily accept any kind of query from clients.  Thus clients may execute operations such as windowing, buffering, aggregation, creating timers, creating ranges, zipping, joining, filtering, projecting, among others.  That’s a lot a functionality to offer for a news service, though perhaps it makes sense for your particular implementation; e.g., it might be nice to allow a client to query for articles of two different authors joined by coincidence over a particular duration.  However, does it make sense for a client to submit a query that uses the Range operator to generate a sequence of values from 1 to 10, for each article, and then project each value without the article?  Probably not.  A client could just as easily query for all articles unconditionally and then generate a range from 1 to 10 locally, without having to burden the server with such a trivial request.  Perhaps it would be better for this service to disallow use of the Range operator.

In fact, it may be better to limit queries to just simple filtering and projection, at first.  Clients could filter news articles and select specific properties in which they’re interested.  This may provide enough functionality to support common queries and yet still provides some useful optimizations.  It also keeps your service fair and responsive by doing less work per query, assuming that clients don’t generally need any other functionality.  After going live with your service you could accept feature requests for other kinds of query operators based on user feedback, consider whether the additional semantics make sense for your particular service, and then enable those operators.  You should also consider whether your server can handle the additional load of more complex queries – the next section, Resource Security, addresses that topic.

The first release of TCP Qbservable Provider (1.0 Alpha) already supports semantic security by allowing you to host queries from your own IQbservableProvider by calling the ServeTcp extension method; however, to limit query operators you must manually create a custom ExpressionVisitor, which can be somewhat difficult to do.  So to make things easier, a new feature (recently checked into source control) of the TCP Qbservable Provider library enables easy whitelisting of operators so that you don’t have to create your own IQbservableProvider or ExpressionVistor.

To use this feature, simply create an empty instance of ServiceEvaluationContext and call the AddKnownOperators method to whitelist operators that your service must support.  Then assign this context to a QbservableServiceOptions object that is passed to the ServeQbservableTcp extension method, which is responsible for hosting your observable over TCP.

var context = new ServiceEvaluationContext(
    includeSafeOperators: false,
    includeConcurrencyOperators: false);


var service = source.ServeQbservableTcp(
    new QbservableServiceOptions()
        EvaluationContext = context

With the above configuration, client queries using any methods other than the Where and Select operators are automatically rejected by the server.  To see rejection in action, take a look at the MaliciousClient and LimitedService examples checked into source control.

Types of Expressions

The ExpressionOptions property of the QbservableServiceOptions object provides lower-level control over the types of expressions that are permitted.  By default, only basic expressions are permitted.  Expressions such as assignments, blocks, loops, constructor calls and others are rejected by the service.  The default option is recommended for public services, though you can assign this property to any combination of flags to allow additional expression types when needed.

Opting Out of Semantic Security

To disable the semantic security model entirely, set the AllowExpressionsUnrestricted property of the QbservableServiceOptions object to true.  By doing so, the provider will ignore the ServiceEvaluationContext object and the ExpressionOptions property and simply allow any expression and operator to be used within a query.  Of course, this is only recommended if you fully trust all clients, which is not the case in public scenarios.

Resource Security

Resource security is about preventing unauthorized code from accessing system resources, such as memory, CPU, the file system and networking components.  It can be broken down further into two subcategories:

  1. API Security
  2. Algorithmic Security

API Security

The FCL offers many APIs that malicious clients could abuse, such as System.AppDomain, System.Environment, System.IO.*, System.Reflection.*, System.Security.*, System.Windows.*, etc.  In order to safely host a service that can execute arbitrary code from clients, the service would have to prevent unauthorized access to certain APIs.  The whitelisting approach used by the semantic security model described previously is one way to solve this problem, though the mechanism that is generally used to prevent unauthorized access to APIs in the .NET Framework is Code Access Security (CAS).  CAS offers a way to restrict APIs through permissions.

Whitelisting has an advantage over CAS.  CAS allows certain APIs to execute that don’t make sense in a server environment, simply because they don’t demand any permissions; e.g., System.Console.WriteLine.  In order to entirely prevent use of System.Console within queries, whitelisting must be used.

However, there are advantages to using CAS over whitelisting.  For one thing, the whitelisting approach I’ve implemented only checks methods and operators, while the CAS approach applies to any operator, method, constructor, event or property.  Furthermore, the CAS approach applies generally to the entire .NET Framework and third-party assemblies, whereas the process of manually whitelisting APIs may inadvertently expose clients to a security vulnerability hidden deep within some seemingly benign API.  In other words, CAS frees you from the burden of having to ensure that your chosen whitelisted operators and methods don’t inadvertently permit clients to format your C: drive.

Therefore, using sandboxing together with whitelisting provides the strongest security model for hosting services publicly by limiting the APIs that services will permit within anonymous client queries to the set of APIs that are resource-safe and semantically acceptable for a given service.

To incorporate CAS security into the TCP Qbservable Provider library, I’ve added new overloads for the QbservableTcpServer.CreateService method.  You may already be familiar with the CreateService method, since it’s also the API that enables you to host a service accepting an argument, as described in my first blog post in this series.  The new overloads begin with an AppDomainSetup parameter.  They host services within a sandboxed AppDomain that applies minimal CAS permissions by default, though it’s easily configurable by passing in a PermissionSet object to one of the overloads.  By default, sandboxed services will only permit queries to execute code that runs within a transparent security context, denying access to any API that demands additional permissions.  In other words, basic code can be executed within queries, but any APIs that demand additional permissions are rejected by throwing a SecurityException.

To host your service in a sandbox with minimal permissions, call the QbservableTcpServer.CreateService method and pass in an AppDomainSetup object as the first argument.  You must assign its ApplicationBase property to the base path of your sandboxed AppDomain, from which dependent assemblies will be loaded.  The .NET Framework guidelines recommend specifying a different path than the application’s actual base path, for additional security.  In the example below, I’ve assigned the sandbox’s base path to the application’s sandbox_bin subfolder, which would contain only the assemblies that the sandbox requires to execute the host service.

Unfortunately, that’s not all you must do.  The service factory argument cannot be a lambda because it’s invoked within the AppDomain and, therefore, must be serializable, though the compiler does not ensure that lambdas’ closures are serializable.  You may be able to workaround this if a closure isn’t created by referencing a method on a MarshalByRefObject, but that may actually defeat the purpose of the sandbox because ultimately client queries would be executed outside of the sandbox.  Instead, you should use a static factory method to create your service, as shown in the example below.

At the moment, due to type-inference issues, you must explicitly wrap the call to the factory method within an explicit delegate, though I plan on addressing that problem in the future, if possible.

var appBase = Path.GetDirectoryName(new Uri(Assembly.GetEntryAssembly().CodeBase).LocalPath);

var newAppBase = Path.Combine(appBase, "sandbox_bin");

var service = QbservableTcpServer.CreateService<object, int>(
    new AppDomainSetup() { ApplicationBase = newAppBase },
    new Func<IObservable<object>, IObservable<int>>(CreateServiceObservable));
public static IObservable<int> CreateServiceObservable(IObservable<object> request)
    return Observable.Range(1, 3);

With the above configuration, client queries using APIs that require any permissions other than security-transparent execution are automatically rejected by the server.  To see rejection in action, take a look at the MaliciousClient and SandboxedService examples checked into source control.

Keep in mind that your service callbacks, including the CreateServiceObservable method shown above, are executed with the reduced permission set of the sandboxed AppDomain; however, you can assert additional permissions from within your callbacks if needed.  Additional permissions may be needed when creating the service observable or executing observation side-effects, as shown in the following example.

var service = QbservableTcpServer.CreateService<object, int>(
    new AppDomainSetup() { ApplicationBase = newAppBase },
    new Func<IObservable<object>, IObservable<int>>(CreateServiceObservable));

service.Subscribe(terminatedClient =>
        new PermissionSet(PermissionState.Unrestricted).Assert();

            Log.Trace("Client shutdown: " + terminatedClient.Reason);

In the example above, the hypothetical Log.Trace method demands full trust to execute.  Since the callback is executing under the permission set granted to the sandboxed service, the demand will fail unless the required permissions are asserted, even across an AppDomain boundary.

The reason why full trust can be asserted here but not by clients’ queries is because callbacks are executed by one of your own assemblies while queries are executed within security transparent dynamic assemblies, which cannot assert permissions beyond the set granted to the sandbox.

The entry point assembly is automatically granted full trust permissions by the CreateService method and, presumably in the above example, the assembly containing the callback code is the application assembly, so it’s able to assert full trust.  Though if it’s not the application assembly, then you can specify additional full-trust assemblies by calling a specific overload of the CreateService method that accepts an array of Assembly objects.

Note that to assert full trust, assemblies must be strong-named.

Algorithmic Security

Algorithmic security is about restricting the types of expressions that the service permits based on their effect on system resources.

For example, returning to our IQbservable<Article> news service, let’s assume that we’re executing our service in a sandbox and we’ve already applied limited semantic security to prevent most operators from being executed, with the exception of a few that we think will be useful to clients for querying news articles:  Where, Select, Aggregate, Scan, GroupBy, Join, Timer, Interval and Sample.  We’re also allowing primitive types, List<T> and arrays to be created within clients’ queries.

Is our news service safe?

No, it’s not safe.  We wouldn’t want the service executing a client’s query if it contained expressions that performed any of the following actions, listed in an approximately-progressively-worsening order:

  • Performs 100 operations in a single query.
  • Creates 500 joins.
  • Allocates a 10MB array for every article containing the individual words within that article.
  • Aggregates lists of 10K articles containing the entire contents of each article.
  • Samples at a very short duration causing frequent output.
  • Starts a high-rate timer causing frequent output.
  • Starts 100K low-rate timers.
  • Allocates a single 2GB array.
  • Creates a single 10GB string.
  • Uses the GroupBy operator in such a way that it never frees memory.

These kinds of queries negatively impact CPU and memory, so their use must be restricted somehow.  To do that, we must first identify how system resources can be effected by queries, then determine how each particular API can be used within queries to effect system resources, and finally design a model for constraining each particular API, or otherwise prevent them entirely.

Generally, the issues with the example queries above are as follows:

  1. Unrestricted quantities of permitted operators.
  2. Unrestricted combinatorial use of operators.
  3. Unrestricted operator arguments.
  4. Unrestricted allocation of objects, including primitive types, collections and arrays.
    1. Unrestricted magnitude.
    2. Unrestricted quantity.
  5. Unthrottled output.
  6. Unthrottled input.  (Applies to duplex queries only; examples are not provided above.)

It seems that some of these issues are much easier to detect than others, though all of them need to be addressed to have a truly secure service model for publicly hosting queryable observables.

In general, when any potentially unsafe expressions are found within a client’s query, they must be evaluated to determine whether their particular usage is permitted and under what constraints.  You may notice some overlap with the concept of semantic security described previously.  The difference is that semantic security is only about semantics; i.e., whether the intent of a query through the expressions it uses is compatible with the intent of the service, whereas algorithmic security is about constraining the effects particular expressions have on system resources.

In order to constrain algorithmically-unsafe expressions, we must go beyond simple whitelisting and provide additional options for configuring the limits of particular expressions on a per-operation and per-type basis.

Currently, the TCP Qbservable Provider library does not offer any solutions for algorithmic security, though it’s something that I’m considering for a future release.  I plan to add configurable options such as the following:

  1. Maximum quantities for individually whitelisted operators and methods, with reasonable defaults.
  2. Maximum quantities for categorized operators and methods, with reasonable defaults; e.g.,
    1. Permit only 1 usage of concurrency-introducing operators.
    2. Permit only 1 usage of timer operators.
    3. Permit only 1 usage of buffering operators.
  3. Constraints on particular operator arguments, with reasonable defaults; e.g.,
    1. Interval:  30 seconds <= period <= 12 hours
    2. Range:  0 <= count <= 10
    3. Buffer:  2 <= length <= 100
  4. Maximum quantity and capacity of new collections and arrays, with reasonable defaults; e.g., new object[N], where N <= 10
  5. Maximum quantity and size of strings.
  6. Maximum size of incoming serialized expression trees, in bytes.
  7. Maximum size of serialized query payloads, in bytes, with separate settings for output notifications and duplex input notifications.
  8. Input/output throttling, with a default duration of 30 seconds.

In the meantime, I’ve configured the default semantic security whitelist to exclude all potentially unsafe operators, to disallow array allocations and to disallow explicit calls to constructors.  Note that your service is still capable of creating unbounded objects, including arrays and collections, though queries cannot create non-primitive objects themselves.  Assignments and void-returning methods are also disallowed by default, so all objects and collections are immutable.

However, the current security model isn’t an exhaustive solution.  It doesn’t prevent clients from using clever algorithms to cause stack overflows and out of memory exceptions.  It doesn’t even, for example, prevent clients from easily creating a 2GB string.


There’s more work to be done on the security model to safely host queryable observable services publicly.  Even with whitelisting and sandboxing, the TCP Qbservable Provider is easily vulnerable to simple attacks that could bring down an entire process.

The current security model should, however, prevent clients from directly damaging your system.  Direct damage is prevented by using the sandboxing APIs to create your service, which rejects queries that demand unsafe API permissions.  You should also use semantically-restricted expressions, which are enabled by default, to reject queries attempting to use operators and methods that aren’t permitted by your service’s whitelist.

Tags: ,

IQbservable | Rx | Rxx

April 25, 2012

LINQ to Cloud - IQbservable Over the Wire

UPDATE: The prototype TCP Qbservable Provider library that is described in this article has become the Qactive project on GitHub, including many bug fixes and improvements.

I’ve recently released a TCP Qbservable Provider library within the Rxx project on CodePlex.  It targets the .NET 4.5 Framework and is written entirely in C# 5 and VS 11 Beta.  If stabilized in the future, it will probably be merged into the Rxx library.  It’s currently pre-Alpha, so beware of bugs and don’t expect much consideration for security.  Feedback is welcomed.

In this blog post I’m going to introduce you to the TCP Qbservable Provider library and discuss its key features in some depth.  I'll also include a few working examples at the end.  If you want to try programming the examples yourself, then start by downloading the library and unzip it.  You’ll need to add references to all of the assemblies in the bin folder, though you can download the Rx 2.0 Beta assemblies from NuGet, if you’d prefer.

The applications in the zip file provide more detailed examples, but I won’t be discussing them in this blog post.  I encourage you to try them on your own.  You can download the complete source code, which contains the code for the example applications, the provider library and the Rxx 2.0 Beta library.  (Note that the Rxx 2.0 Beta library has not been officially released yet and is likely to change.)

What does this thing do?

In a nutshell

TCP Qbservable Provider enables you to easily create a TCP web service that offers an IQbservable<T> for clients to query using Rx and LINQ.  You simply choose an end point consisting of an IP address and a port number, then your observable becomes available to clients over TCP.

Clients can query your observable using Rx operators.  Queries can also be written in LINQ’s query comprehension syntax.  When a client subscribes to its query, a connection is made to the service and then the query is serialized and sent to the server, where it is executed.

In case you’re wondering, IQbservable<T> is the model that Rx defines to represent queryable observables.  It’s the dual to queryable enumerables, which is represented by IQueryable<T>.  IQbservable<T> is to IObservable<T> as IQueryable<T> is to IEnumerable<T>.  For a deeper explanation, see this video on Channel 9 by Bart De Smet.

The following diagram illustrates the basic communication process:


The basic process:

  1. The server begins hosting a TCP service at a specific end point, which includes an IP address and a port.
    1. The ServeQbservableTcp extension method is used to serve an IObservable<T> as an IQbservable<T>.
    2. The ServeTcp extension method is used to serve an IQbservable<T> directly.
    3. The factory methods on the QbservableTcpServer class are used to create queries that accept subscription arguments.
  2. The client creates an instance of QbservableTcpClient<T> to represent the service locally.  No call is made to the server yet.
    1. The client must specify a type for T that is compatible with the type of the data in the IQbservable<T> service.
    2. The client must supply the end point, which includes the IP address and port of the service.
    3. The client can optionally configure the service proxy; e.g., to enable duplex communication and specify known types.
  3. The client creates a LINQ query by applying Rx operators to the result of the Query method, either by using fluent method call syntax, query comprehension syntax or some mixture of both.
    1. All Qbservable Rx operators are supported.
    2. Arbitrary code can be written within the query to execute side-effects on the server; e.g., via the Do operator.  Clearly, this can be a security concern, but it can also be quite powerful if used properly.
    3. Anonymous types are supported.  (More on this below.)
    4. Full duplex communication is supported.  (More on this below.)
    5. Optionally, the server can require an argument that the client passes to the Query method, which must be a serializable type that the server expects.
  4. The client eventually calls Subscribe on the query, just like a normal IObservable<T>.  This causes a TCP socket to be opened with the server.
  5. The server receives the subscription request and performs a protocol negotiation with the client (currently the entire negotiation is mocked, but it may be developed in the future to support custom protocols).
  6. The client serializes the query and sends it to the server.
  7. The server deserializes the query and begins executing it.
  8. Observable notifications are pushed to the client asynchronously.

Note that all data is currently serialized using a binary formatter.  This includes expression trees and all notifications, in both directions.

Serialized Expression Trees and Anonymous Types

When you write a query against an IQbservable<T>, the compiler creates an Expression Tree to represent your query at runtime.  Often expression trees contain anonymous types that are generated by the compiler; e.g., to support let statements and closures.  The TCP Qbservable Provider must serialize and send an expression tree to the server to be executed.  Even though the Expression classes in the FCL and the types generated by compilers are not serializable, the TCP Qbservable Provider library enables serialization of Expression trees and anonymous types by automatically swapping them with internal, serializable representations.  All types of expressions are supported except for dynamic and debug info expressions, but I’ll consider supporting those in the future as well.

Expression trees are an internal implementation detail that you don’t need to be aware of to consume and host IQbservable<T> services.

Full Duplex Communication

Full duplex communication is when a client and server communicate in both directions simultaneously over a single connection; i.e., the client sends the server data while the server sends the client data, without connecting to another end point.  Full duplex communication occurs automatically for instances of IObservable<T> within a client’s query, though the server must opt-in to allow it.  Full duplex communication can also occur automatically for local members and IEnumerable<T> objects that are generated by iterator blocks, though a client must opt-in to allow it; otherwise, the default behavior is for local members and IEnumerable<T> iterators found within a query to be evaluated locally and then replaced with constants before the query is sent to the server.

While the server is executing the client’s query, if it comes across any local member, including any static or instance methods, properties or fields, and if full duplex communication has been enabled on both the client and the server, then the server will send an invoke message to the client and wait for the return value, synchronously.  The return value is not cached.  A new message is sent each time a local member must be executed within the query.

For full duplex IEnumerable<T> objects generated by iterator blocks, the server sends a synchronous message to the client to get an enumerator.  Subsequent calls to enumerate the enumerable from within the query (e.g., MoveNext, Reset and Dispose) are sent to the client as synchronous invoke messages, similar to the behavior previously described for local members.

While the server is executing the client’s query, if it comes across an instance of an IObservable<T> that was either a closed-over local variable or the return value of a full duplex local member, and if full duplex communication has been enabled on the server (it is automatic on the client for local instances of IObservable<T>), then the server sends a subscribe message to the client and waits for a subscription response, synchronously.  The client is then free to asynchronously push notifications to the server; e.g., OnNext, OnError and OnCompleted.

A large number of local members, IEnumerable<T> iterators and IObservable<T> objects are supported simultaneously for individual clients.  The maximum is currently bounded by the number of keys that fit into a generic Dictionary<TKey,TValue>, per client, though each category also has its own dictionary.  This means that a given client probably supports over 2 billion unique local members, 2 billion IEnumerable<T> iterator instances and 2 billion IObservable<T> instances.  It’s highly unlikely that local members will ever be a problem given the complexity required to generate a query containing over 2 billion expression objects; however, the latter two are dealing with instances of objects that may be generated by the query as it executes.  Though I doubt these capacity constraints will ever be a problem given the context of network communication, it’s something to consider if, for example, you’ll be writing large multi-layered full duplex SelectMany queries that generate many notifications very fast and must execute uninterrupted for a very long period of time.  It’s possible that you may eventually hit 2 billion observables, but it does sound a bit crazy.

Why should I create an IQbservable<T> service?

There are a few major advantages to using a queryable observable instead of a traditional web service:

It’s Asynchronous

Web services typically process requests synchronously, or are fairly complicated to make asynchronous, but a queryable observable service can easily be entirely asynchronous.  By processing requests asynchronously, the server is free to handle additional requests.  This makes for very scalable services.

It’s Reactive

Web services are not typically reactive, but a queryable observable service can easily be entirely reactive.  Clients don’t have to poll your service.  Instead, the server can determine when to send notifications to clients and clients can concentrate on doing other things.

Furthermore, the timing of notifications can be controlled by various internal factors, such as the semantics of the source or the current load on the server.  This is better than clients hitting the server as fast as possible trying to squeeze out more data that simply isn’t available yet, while the server still tries to process requests in a fair manner.  Though a truly reactive service does, however, require mostly persistent TCP connections to be entirely beneficial in this way, perhaps.

It’s Queryable

Reactive web services are typically blunt data hoses that simply receive a request and then begin pushing data to the client as fast as possible, but a queryable observable service receives the client’s query and processes it on the server, so it’s able to return only the data that the client has requested.  Not only is this better for bandwidth usage and client performance, but it’s also potentially better for server performance and overall throughput because the service can interpret the client’s query to make optimizations on-the-fly.

Imagine a social media service that exposes an IQbservable<PersonSaidSomething>.  A client could send a query to the service asking to receive notifications whenever a particular person says something, instead of receiving notifications whenever anyone says something and filtering them on the client.  Admittedly, this particular example isn’t so impressive considering that most social media services probably offer a simple parameterized option to filter notifications for individual people.  So what about a query that asks the service for notifications whenever any person says something about a particular character on a TV show during the date and time on which that show is airing?  Do any social media services offer parameters for that?  What about a query that asks the service for notifications whenever person A writes something longer than 50 characters, between 5 to 10 minutes after person B says something about a particular product?  You get the idea…

It’s Simple

Web services are quite simple to create nowadays, considering all that ASP.NET, WCF and IIS offer, but queryable observable services are simple too if you’re already familiar with Rx and LINQ.  Don’t be frightened by all of the strange terms and confusing explanations – they’re my fault.  If you’re still confused, then try experimenting with it yourself.  Begin with the examples later in this post.

Hosting an IQbservable<T> Service

A service returns an IQbservable<T> so that clients can write queries against it, but the source doesn’t have to be an IQbservable<T>.  The source can be another IQbservable<T> or just any plain old IObservable<T>.

If the service implementation is an IObservable<T>, then the TCP Qbservable Provider uses the local IQbservableProvider (Qbservable.Provider) on the server to execute queries that it receives from clients.  It happens automatically, so you don’t need to be concerned at all about how your observable is being translated into an IQbservable<T>.  You can thank the Rx team for their fantastic work on that (and of course, Rx in general).

If the service implementation is an IQbservable<T>, then think of the TCP Qbservable Provider as merely a communication/serialization proxy.  It takes your existing query and hosts it over TCP.  Furthermore, writing a custom IQbservableProvider allows you to constrain the Rx operators that clients can use, which is good from a security standpoint.

For example, if you want to expose LINQ to WQL (a.k.a., WMI Events) on a server, then simply call the ServeTcp extension method to host your query over TCP.  If you want to expose a typical IObservable<T> on a server, e.g., Observable.Interval, then simply call the ServeQbservableTcp extension method to host your observable over TCP as an IQbservable<T> service.

When a client queries your service, its query is serialized and sent to the server by the TCP Qbservable Provider running on the client. The TCP Qbservable Provider running on the server deserializes the client’s query and then glues it to your hosted query; e.g., Observable.Interval or LINQ to WQL, etc..  From the perspective of the source observable, it’s as if the client’s query was written within the same process.  When data is pushed from the hosted observable, the TCP Qbservable Provider serializes it to the client. From the perspective of the client, it’s as if the provider is running within the same process.

Consuming an IQbservable<T> Service

Clients consume web services by creating proxies that encapsulate all of the communication logic.  A queryable observable service is no different.

A client begins by creating an instance of QbservableTcpClient<T>, which represents a single queryable observable service end point.  This object exposes an IQbservable<T> for the service by calling its Query method.  At this point, the client is able to write a LINQ query and then call Subscribe to initiate communication.  All of this is covered earlier in this blog post, in the section about the basic communication process.

Alright, let’s do some coding…


Add the code from each of the following examples to the Main methods of new console application projects in VS 11 Beta, targeting the .NET 4.5 Framework.  I recommend creating separate console application projects for the server and client examples.

Add the following using directives to the top of each file.  You may need to include others as well, depending upon the example.

using System;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using QbservableProvider;

As mentioned earlier, you’ll need to add references to all of the assemblies in the bin folder of the zip file you’ve downloaded, though the Rx 2.0 Beta assemblies can be downloaded from NuGet, if you’d prefer.

Example 1: Timer

Let’s start out easy and create a simple queryable observable timer service.  Here’s the spec:

  1. When a client subscribes, start a 2 second timer on the server.
  2. When the timer elapses on the server, send a notification to the client.
  3. The client’s connection persists until either the client or the server cancels the query, or the query completes.
IObservable<long> source = Observable.Timer(TimeSpan.FromSeconds(2));

var service = source.ServeQbservableTcp(new IPEndPoint(IPAddress.Loopback, port: 49593));

using (service.SubscribeEither(
    client => Console.WriteLine("Timer service acknowledged client shutdown."),
    ex => Console.WriteLine("Timer service error: " + ex.Message),
    ex => Console.WriteLine("Timer service fatal error: " + ex.Message),
    () => Console.WriteLine("This will never be printed because a service host never completes.")))

Now let’s create our first client.  Here’s the spec:

  1. Subscribe to the service that we’ve just created.
  2. Print the timer notification from the service.
var client = new QbservableTcpClient<long>(new IPEndPoint(IPAddress.Loopback, port: 49593));

IQbservable<long> query =
    from value in client.Query()
    select value;

using (query.Subscribe(
    value => Console.WriteLine("Timer client observed: " + value),
    ex => Console.WriteLine("Timer client error: " + ex.Message),
    () => Console.WriteLine("Timer client completed")))

To test these examples, make sure that you run the server application first.  Then run the client application.

After 2 seconds the client application prints the following:

Timer client observed: 0
Timer client completed

And the server application prints:

Timer service acknowledged client shutdown.

That example was boring, I know.

Example 2: Confirm the location

Let’s add a side-effect to the query so that we can see where the observable is running.  Here’s the new client:

IQbservable<long> query =
    from value in client.Query().Do(_ => Console.WriteLine("Where am I?"))
    select value;

After 2 seconds the client’s output is the same as before, but the server application now prints the following:

Where am I?
Timer service acknowledged client shutdown.

So it’s clear that our query was running on the server.  But what’s really awesome about this query is that the Do operator executed its lambda expression on the server!  Essentially, we told the server to write something to its own console window from within the query sent by the client.

Example 3: A more interesting client

Now that we know we can have the server do anything that we want, let’s have it download a web page for us.  Here’s the spec:

  1. Subscribe to the service that we’ve just created using the following query:
    1. When the service generates a timer notification, download the web page from http://blogs.msdn.com/b/rxteam.
    2. When the web page is downloaded, send its length to the client.
  2. Print the value received from the service.

Our goal here is to execute the entire query on the server, including downloading the web page.  We know how to verify that the entire query is executing on the server by using the Do operator, so I’ll add that to the end of the query.

IQbservable<int> query =
    (from value in client.Query()
     from page in new WebClient().DownloadStringTaskAsync(new Uri("http://blogs.msdn.com/b/rxteam"))
     select page.Length)
    .Do(result => Console.WriteLine("Where am I? " + result));

Notice that the type of the query has changed from IQbservable<long> to IQbservable<int>.  We’re using the select statement to project a different type of value from the service.

After 2 seconds the client application prints the following (of course, the value may be different in your tests):

Timer client observed: 110429
Timer client completed

And the server prints:

Where am I?  110429
Timer service acknowledged client shutdown.

The server has downloaded a web page and sent back the length of the page to the client.

Example 4: Service arguments

Let’s modify the service to accept the timer’s duration as an argument.  That way the client can control the delay before the web page is downloaded on the server.

To accept an argument, we’ve got to make a slight change to the way in which we’re creating the service.  Instead of simply using the ServeQbservableTcp extension method on our timer observable, we’ll need some way of receiving the argument before we can create the observable.   The QbservableTcpServer class provides static factory methods for creating services based on a function that accepts an IObservable<T>, where T is the type of the argument that the service expects, and returns an IObservable<U> that represents the service.  It’s quite natural to express the concept of clients subscribing to our service with arguments, sometime in the future, as an IObservable<T>.  So we’ll define our service beginning with an IObservable<T> that represents client subscriptions.  I’ll name the lambda parameter that represents this observable as request.

var service = QbservableTcpServer.CreateService<TimeSpan, long>(
    new IPEndPoint(IPAddress.Loopback, port: 49593),
    request =>
        from duration in request.Do(arg => Console.WriteLine("Client sent arg: " + arg))
        from value in Observable.Timer(duration)
        select value);

Notice that we must specify the input and output types of the query when we call the generic CreateService<TSource, TResult> method.  I’ve also added the Do operator to print out the argument that is received from the client.

Now let’s update our client to pass in a duration for the timer.

IQbservable<int> query =
    (from value in client.Query(TimeSpan.FromSeconds(5))
     from page in new WebClient().DownloadStringTaskAsync(new Uri("http://blogs.msdn.com/b/rxteam"))
     select page.Length)
    .Do(result => Console.WriteLine("Where am I? " + result));

Notice that I’m passing in the TimeSpan argument to the Query method.  The rest of the query is unchanged.

The server immediately prints the following when the client application starts:

Client sent arg: 00:00:05

And then after 5 seconds the client and server print the same results as before.

For much more advanced examples, including local member evaluation, anonymous types, full duplex communication of observables and iterator blocks, see the applications that are included in the download.  Grab the complete source code for the applications and the TCP Qbservable Provider library from the Rxx project on CodePlex.


I’ve barely scratched the surface of possibilities offered by the TCP Qbservable Provider service model.  This blog post only provides a high level view with extremely primitive examples.  I hope it has whet your appetite and you’re interested in using it in your apps.

I should also mention that the Rx team has done an amazing job with the core Rx libraries, and the IQbservable<T> stuff is in another league of its own.  I’ve also seen Bart mention a couple of times in the Rx forum that his team is investing more in the IQbservable space regarding remote queries and serializable expression trees, so I’m really curious to see what they’ve done and when/if it will be released.

In the mean time, if you’re interested in developing applications using the TCP Qbservable Provider library, then please let me know.  I enjoyed working on it as a proof-of-concept and I plan to invest more time stabilizing it, unless the Rx team releases something similar.  It will help me to prioritize knowing that people are actually using it.

Your feedback can help guide the direction of this project, so please report bugs and ideas to the Rxx project on CodePlex.  Thanks!

Tags: ,

IQbservable | Rx | Rxx

March 01, 2012

Async Iterators

You’ve already heard about async in C# 5.  This new feature is going to improve UI responsiveness and developer productivity simultaneously, to say the least.  I’ll assume that you’re also familiar with Task<T>, the generic type in the .NET Framework that provides a common model for asynchronous functions, and one of the types on which the async feature depends.

Task<T> has a limitation: it only represents a scalar value T.  This is fine for async methods that only compute a single value, but what if we need to write an async method that lazily computes a sequence of T?  In other words, the entire operation is asynchronous just like Task<T>, but instead of computing a single value we want to potentially compute many values, sequentially, while also being able to await other asynchronous operations.

The best we can do with Task<T> is to return Task<IList<T>>, gathering each T into a list.  There’s no way to yield each individual T as they are computed when using Task<IList<T>>.

The following C# console application illustrates this behavior.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApplication1
    class TaskExample
        public static void Main()
            Output.WriteLine("Main Thread");


        static async Task Start()

            var list = await AsyncScalar();

            Output.WriteLine("Async Completed");

            foreach (var value in list)


        static async Task<IList<int>> AsyncScalar()
            Output.WriteLine("AsyncScalar Called");

            var list = new List<int>();

            for (int i = 0; i < 3; i++)
                Output.WriteLine("Computing " + i);

                await Task.Delay(TimeSpan.FromSeconds(1));


            return list;


(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): AsyncScalar Called
(Thread 1): Computing 0
(Thread 4): Computing 1
(Thread 4): Computing 2
(Thread 4): Async Completed
(Thread 4): Value = 0
(Thread 4): Value = 1
(Thread 4): Value = 2
(Thread 4): Done

Note that the Output class is also used by subsequent code snippets in this post, but I won’t be repeating it again.  Here’s the definition:

using System;
using System.Threading;

namespace ConsoleApplication1
    static class Output
        public static void WriteLine(string message)
            Console.WriteLine("(Thread {0}): {1}", Thread.CurrentThread.ManagedThreadId, message);

        public static void WriteLine(int data)
            WriteLine("Value = " + data);

Notice that the AsyncScalar method in the previous example computes values asynchronously, but they’re gathered into a list that is eventually returned when the computation completes.  The caller awaiting the task gets back a pre-computed list of data (referred to as a hot sequence), which can only be iterated synchronously.

Ultimately, our goal is to somehow replace list.Add with something like yield.  We want to be able to intermix awaiting and yielding in an imperative-style method that supports control flow statements, such as for loops.  And the result must be an asynchronous sequence, which doesn’t require synchronous iteration.

async IEnumerable<T> = Error

C# doesn’t allow us to apply the async keyword to iterator blocks.  To use the async keyword on a method it must return void, Task or Task<T>.  To define an iterator block, we must return IEnumerable<T>.  Clearly these features aren’t compatible based on return type alone.

But why not support IEnumerable<T> as another possible return type for async methods?

The answer (if I may assume what the C# team was thinking) is that Task<T> represents an asynchronous function, which means that it provides the necessary model for awaiting a function’s return value.   IEnumerable<T> only represents an interactive sequence, which means that it doesn’t provide a model for awaiting an iterator block’s return value(s).  Instead, values must be pulled synchronously from the sequence.  As a result, async iterator blocks would always be forced to execute synchronously, defeating the purpose of async.

Ok, so how about Task<IEnumerable<T>>?

Nope.  It’s similar to my first code example above, which uses Task<IList<T>>.  It models an asynchronous function that eventually returns a sequence, but that sequence remains synchronous.  Recall that Task<T> represents a scalar-valued asynchronous function, which means that the C# compiler can’t do any tricks with an iterator block to convert Task<T> into something that represents an asynchronous sequence.  The best it can do is allow us to asynchronously invoke an iterator block, but the generated sequence must be enumerated synchronously, as shown in the previous example.  That’s not our goal at all.

Then how about IEnumerable<Task<T>>?

This might seem correct at first, but actually it doesn’t meet our requirements either.  It represents a synchronous sequence of asynchronous functions.  Interesting, but not what we want.  Let’s assume for a moment that the C# compiler transforms the code between yield return statements into Task<T> objects.  We could then write something like this:

// Doesn't compile
static async IEnumerable<Task<int>> AsyncIterator()
    for (int i = 0; i < 3; i++)
        await Task.Delay(TimeSpan.FromSeconds(1));

        yield return i;

But consider the behavior when consuming it.  We’d probably use a foreach loop, which calls GetEnumerator to get an IEnumerator<Task<int>>.  Then we’d call MoveNext, which would synchronously enter the for loop and hit the first await.  At this point, we’d expect MoveNext to return true and set Current to a Task<int>.  When the Task<int> completes it should execute the current continuation, which is the code following the await.  So then what happens if we ignore the Task<int> and call MoveNext again?  Perhaps it could block until the current Task<int> completes, but that behavior doesn’t meet our requirements.  We don’t want to block while iterating the sequence.  Furthermore, what happens if when Task<int> completes we don’t call MoveNext right away?  The iterator could begin computing the next Task<int> asynchronously, which means that it would have to buffer the sequence until our synchronous iteration catches up.  This is like the producer/consumer pattern with an implicit queue.  Again, it’s interesting but not what we want.  When a value is computed, we want our continuation to execute right away, without buffering.

So what would happen if we reversed it?  MoveNext returns Task<bool> and Current returns T.

Well that can’t be modeled with any combination of IEnumerable<T> and Task<T>, but if it existed it would allow us to await between yields, keeping the entire computation asynchronous and its iteration asynchronous as well.


In the Ix Experimental release (Ix_Experimental-Async on NuGet), there’s a type named IAsyncEnumerable<T>.  It has a single method named GetEnumerator that returns an IAsyncEnumerator<T>, which returns a Task<bool> from MoveNext and a T from Current.  Can we use it to write an asynchronous iterator block?  Let’s take a look at an example.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace ConsoleApplication1
    class AsyncEnumerableExample
        public static void Main()
            Output.WriteLine("Main Thread");


        static async Task Start()

            IAsyncEnumerable<int> sequence = DelayedSequence().ToAsyncEnumerable();


            await sequence.ForEachAsync(Output.WriteLine);


        static IEnumerable<int> DelayedSequence()
            Output.WriteLine("DelayedSequence Called");

            for (int i = 0; i < 3; i++)

                Output.WriteLine("Yielding " + i);

                yield return i;



(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): Awaiting
(Thread 4): DelayedSequence Called
(Thread 4): Yielding 0
(Thread 4): Value = 0
(Thread 4): Yielding 1
(Thread 4): Value = 1
(Thread 4): Yielding 2
(Thread 4): Value = 2
(Thread 4): Done

At first glance it looks like we’re closer to our goal.  At least we can iterate the sequence asynchronously and values are computed asynchronously.  The ForEachAsync extension is provided as a convenience because the foreach statement can’t handle a MoveNext method that returns Task<bool>, but you could instead call GetEnumerator and loop over a call to await MoveNext manually, if you prefer.

So we’re fine on the consumer side, but what about the producer?  Unfortunately, in order to introduce a delay we must block the current thread (note the call to Wait).  That’s because the iterator block returns IEnumerable<T>, which as we already know doesn’t model an asynchronous sequence.  It seems that IAsyncEnumerable<T> creates a Task<T> for each call to MoveNext to make our sequence consumable in an asynchronous fashion.  This means that our iterator block can’t control its own asynchrony.  Concurrency is injected for us automatically when the consumer decides that it’s ready to await the next value.  So essentially it’s still a pull model even though it’s asynchronous.

And of course we can’t simply add the async keyword to our iterator block, as discussed previously.  We’re now back where we started.

We need a better model for our asynchronous sequence.  A type that represents an asynchronous function like Task<T> as well as a sequence of values like IEnumerable<T>, but in a push-based fashion.  Then we can define out iterator block in terms of this new type by yielding values as they are computed asynchronously.


IObservable<T> and IEnumerable<T> overlap in that they both provide a lazily-computed sequence of values; however, they differ in that IObservable<T> represents an asynchronous (push) sequence while IEnumerable<T> represents a synchronous (pull) sequence.  A sequence that pushes values enables reactive computations, while a sequence from which values are pulled enables interactive computations.  LINQ provides a declarative abstraction over any computation.  Reactive Extensions for .NET (Rx) provides LINQ operators for IObservable<T> similar to the LINQ to Objects operators that require IEnumerable<T>.  If you’re not familiar with Rx yet, then you may want to get started with this blog post.

Now back to our problem at hand.  How can we write an iterator block with IObservable<T>?  Does the following work?

// Doesn't compile
static async IObservable<int> AsyncIterator()
    for (int i = 0; i < 3; i++)
        await Task.Delay(TimeSpan.FromSeconds(1));

        yield return i;

Unfortunately, no.  But wouldn’t it be nice?  Conceptually, it meets our requirements.  We can subscribe to the observable to receive values asynchronously.  The iterator block can define the observable by awaiting other asynchronous operations and yielding values as they are computed, sequentially yet asynchronously.

[Edit: 8/21/2012: 
Rx 2.0 offers a better alternative than the Rx 1.1 Experimental method that is described in the remainder of this blog post.  It enables you to write code that looks very similar to the previous hypothetical example.  For more information, read this forum thread.]

Well the Rx team has already thought about this limitation in C# and provided a solution.  The Rx Experimental release (Rx_Experimental-Main on NuGet) provides several overloads of the Observable.Create method, one of which has the following signature:

public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod);

This method allows us to generate an observable using a normal iterator block.  But since we can’t mark the iterator block as async, we can’t await other asynchronous operations.  Instead, we’ll yield observables to await them.  But if we’re yielding observables, then how do we also yield computed values?  That’s where the IObserver<T> object comes in handy.  Here’s an example:

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;

namespace ConsoleApplication1
    class ObservableExample
        public static void Main()
            Output.WriteLine("Main Thread");



        static void Start()

            var xs = Observable.Create<int>(observer => AsyncSequence(observer));


            xs.Subscribe(Output.WriteLine, () => Output.WriteLine("Done"));

        static IEnumerable<IObservable<object>> AsyncSequence(IObserver<int> observer)
            Output.WriteLine("AsyncSequence Called");

            for (int i = 0; i < 3; i++)
                yield return Task.Delay(TimeSpan.FromSeconds(1)).ToObservable().UpCast();

                Output.WriteLine("Pushing " + i);


    static class ObservableExtensions
        public static IObservable<object> UpCast<T>(this IObservable<T> source)
            return source.Select(value => (object) value);


(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): Subscribing
(Thread 1): AsyncSequence Called
(Thread 5): Pushing 0
(Thread 5): Value = 0
(Thread 4): Pushing 1
(Thread 4): Value = 1
(Thread 6): Pushing 2
(Thread 6): Value = 2
(Thread 6): Done

This behavior is exactly what we wanted: asynchronous subscribe, observations of values as they are generated, the ability to await other asynchronous operations and the use of normal control flow statements, such as for.

However, the implementation is a bit strange.  We’ve replaced list.Add from the first example with observer.OnNext instead of yield, which isn’t so bad, but then we also had to replace await from the first example with yield.  Not very intuitive, perhaps.

Maybe in the future we’ll get first-class compiler support for writing observable iterators.  Until then, this should do fine.

[Edit: 8/21/2012: 
Rx 2.0 offers a better alternative than the Rx 1.1 Experimental method that is described above.  It enables you to write code that looks very similar to what first-class compiler support might look like, although it requires the use of a callback.  For more information, read this forum thread.]

Side note:

C# doesn’t support anonymous iterator blocks.  Here’s something else interesting: VB 5 does!

Do you agree that anonymous observable iterators is an apt use case for such a feature in C#?


Async | C# 5 | Rx

January 12, 2012

Code Contracts for Visual Studio Pro

If you decided long ago that you weren’t going to bother trying Code Contracts because you didn’t have Visual Studio Premium edition or higher and thus couldn’t take advantage of static checking, then your wait is finally over.

Code Contracts 1.4.41228.0 was recently released, along with some great news: The Code Contracts static checker is now available for all editions of Visual Studio (excluding Express).

On a sad note, it seems as though Code Contracts will remain in DevLabs through the next Visual Studio cycle.

But to end on a brighter note, apparently the team is still trying to make it an official product.

From the forum announcements section:

We are trying to make the case for Code Contracts to be included in Visual Studio as an official product. Please send us your feedback as soon as possible, especially if you have any details about how exactly it has helped your development process.

So start using it and send them all of your success stories!

Here are some links to get you started:

Editor Extensions (Optional)


Research Site

Release Notes


November 12, 2011

Resources for Learning Rx

(Edit: Rx is now open source!)

Start by bookmarking the Rx hub on MSDN:

Especially the beginners guide and the .NET Resources and Community pages, which together provide several links to official videos, blog posts and articles, all of which are worth reviewing.

Next, bookmark the Rx forum.  This is the official place to search for answers and to ask new questions:

(Edit: 10/3/2014 - There's still a lot of good stuff on the Rx Forum, though Stack Overflow appears to be more active now.)

Rx Forum

Depending upon your current level of knowledge with Rx, you may want to continue by going through the official hands-on labs.  They are a bit out-dated although differences between the labs and the latest Rx class library should be minimal.  When you do come across differences, a quick search on the Rx forum will show you what changes need to be made.

Hands-On Labs
For further reading, I recommend starting off with the official conceptual documentation:
followed by the recommended design guidelines.  This document is a bit outdated, although where there may be differences between the examples and the latest Rx class library, the purpose and reasons for the guidelines still apply.

Recommended Design Guidelines

After that there's plenty of community resources to choose from.  A new book has been written about Rx recently:

[Edit (8/14/2012)]

Here's another new book.  This one's written by Lee Campbell and is available online for free:



And here's a list of blogs and open source projects that may interest you:

As for open source projects, I’m particularly fond of Rxx, and not just because I’m the co-founder and author along with James Miles.  ;)


.NET | Rx | Rxx

November 09, 2011

Rxx 1.2 Released

If you’re not familiar with the Rxx project yet, it's something I’ve been working on for several months now along with James Miles.

Rxx is a library of unofficial reactive LINQ extensions supplementary to Microsoft's Reactive Extensions for .NET (Rx).  Rxx is developed entirely in C# and targets the .NET Framework 4.0, Silverlight 4.0 and Windows Phone 7 (WP7).

Why should you use Rx and Rxx?

Well, if you’re doing any kind of programming that involves asynchrony or concurrency, such as what is commonly found in UI layers, business layers, data access layers, middle tier services – basically everywhere, then you’ll definitely find Rx to be very useful.  Rxx adds a whole bunch of useful features on top of Rx and the .NET Framework Class Library (FCL), making it quite easy to introduce common patterns of asynchrony into any .NET program through the use of IObservable<T> and LINQ.

Rxx provides the following features.  See the Documentation for details.

  1. Many IObservable<T> extension methods and IEnumerable<T> extension methods.
  2. Many useful types such as CommandSubject, ListSubject, DictionarySubject, ViewModel, ObservableDynamicObject, Either<TLeft, TRight>, Maybe<T> and others.
  3. Various interactive labs that illustrate the runtime behavior of the extensions in Rxx.  Individual labs' source code included.

The latest release of Rxx is now available:

New features for Rxx 1.2 include:

  1. Compatible with Microsoft's Ix Experimental library.
  2. UI extensions for WPF and Silverlight, including AnonymousCommand, CommandSubject, a Subscription XAML markup extension for binding UI elements to observables, EventSubscription trigger and a reactive view model infrastructure.  (Download the labs application for examples.)
  3. N-ary Zip and CombineLatest combinators.
  4. Several parser updates, including new operators, non-greedy (lazy) quantifiers and major performance and memory improvements, such as avoiding stack overflows due to recursion in quantifiers.
  5. Cursor types and extensions (Rx and Ix).
  6. ListSubject and DictionarySubject.
  7. Consume extensions that generalize the producer/consumer pattern over observables.
  8. ApplicationSettingsBase extensions.
  9. ObservableSyndication for RSS 2.0 and Atom 1.0.
  10. ObservableFile and additional ObservableDirectory extensions.
  11. Stream, FileStream and TextReader extensions.

More details can be found in the latest release notes: http://rxx.codeplex.com/wikipage?title=Release%20Notes

We’d really appreciate your feedback.  Please let us know about your experiences with Rxx by starting a new discussion or submitting an issue.  Thanks!

Tags: ,

.NET | CodePlex | Open Source | Rx | Rxx

September 13, 2009

Sandcastle with Code Contracts

Microsoft Dev Labs has released a new version of Code Contracts.  (For more information about Code Contracts in the .NET Framework 4.0, see System.Diagnostics.Contracts and this article).  If you haven't yet, I highly recommend installing the latest Code Contracts release.  If you're using Visual Studio 2008 then you'll also get a shiny new tab in your project's Properties window.

Shiny New Properties Tab

Figure 1: Code Contracts tab in Visual Studio 2008 project Properties window

Along with the latest features is a new tool that adds documentation for Code Contracts to an assembly's XML documentation file.  The latest release also includes a Sandcastle patch that allows the vs2005 style to include the code contract documentation within an expandable Contracts section, as shown in the following screenshot.


Figure 2: Example Sandcastle output with Contracts section

The following blog post describes how you can patch Sandcastle so that it uses Code Contracts in XML documentation files.  The author chose to use DocProject to automate the build process, but I suspect that it will work with or without any automation tools for Sandcastle.


Note that I've tested this procedure with success in Visual Studio 2008 standard edition.

Just don't forget to create a new DocProject or DocSite project after applying the Sandcastle patch.  You can use the Import Topics and Settings step in the New Project Wizard to copy over the important stuff from an existing project if you'd like.


On a side note, I think this also about ruins my ContractN project ;).  I've abandoned it anyway, so I'll be doing the honorable thing and will close down the ContractN project on CodePlex at some point (to help keep CodePlex clean - just doin' my part).  I'll continue to host the code on my blog in case anybody's interested in learning how to use Context-Bound Objects, in some form or another.