April 19, 2017

Monads, and Comonads, and LINQ! Oh, my!

Programming through the lens of mathematics offers new ways to solve old problems and old ways to solve new problems. Mathematics is full of surprising relations and dualities.

In this blog post, we’ll journey through an alternate reality in which an OOP language like C# becomes more abstract and expressive by adopting some functional programming concepts, rooted in a branch of mathematics called Category Theory.

Over The Rainbow

Our journey begins with a few familiar types in a probably unfamiliar categorization.

Sync

Async

Monad

IEnumerable<T>

IObservable<T>

Comonad

Lazy<T>

Task<T>

Don’t worry if you don’t understand these concepts yet, we’ll meet them again later...

IEnumerable<T> is the list monad.

IObservable<T> is the continuation monad.

Task<T> is the continuation comonad.

Lazy<T> is the singleton comonad (for lack of a better name).

These types appear to have different purposes, yet there is actually a pattern hiding in there somewhere. The pattern can be stated in structural terms as follows.

Monads have bind and unit functions.

Comonads have cobind and extract functions.

Bind

Unit

IEnumerable<T>

SelectMany; a.k.a., flatMap

Constructors of Array, List<T>, etc.

IObservable<T>

SelectMany; a.k.a., flatMap

Observable.Return; a.k.a., just


Cobind

Extract

Task<T>

ContinueWith; a.k.a., then

get_Result

(a property named Result)

Lazy<T>

?

get_Value

(a property named Value)

You may have noticed that Lazy<T> is incomplete. Can we define a cobind function for Lazy<T>? What would it look like? What would it do?

And what’s the relationship between monads and comonads? Is there a pattern?

Perhaps if we were to examine these monadic and comonadic types through a lens of abstraction, then we could identify the pattern and figure out how to implement cobind for Lazy<T>.

“Professor Marvel Never Guesses”

Let's start by examining the bind function for IEnumerable<T>. Here’s its signature:

public static IEnumerable<R> SelectMany<T, R>(this IEnumerable<T> source, Func<T, IEnumerable<R>> selector)

So the bind function takes an instance of the monadic type for which it's defined, in this case IEnumerable<T>, and a function that projects the element type, T, into another instance of the monadic type, IEnumerable<R>.

Its purpose may not be clear yet, so we’ll just try and implement it by heeding Erik Meijer’s advice: “let the types guide you”.

Well, we have a selector function that accepts a T, but we don’t have an instance of T yet. Since the source is a sequence, it seems that we must invoke the selector function for each T that it contains. So let's start there. We’ll do foreach over source and apply selector to each value.

public static IEnumerable<R> SelectMany<T, R>(this IEnumerable<T> source, Func<T, IEnumerable<R>> selector)
{
foreach (var value in source)
{
var results = selector(value);

// ...
}
}

Great, we're done with the Select (a.k.a., map) part. Now let’s check the return value.

The selector returns an IEnumerable<R> for a given T, and we need to return a single IEnumerable<R>. We can’t simply return the IEnumerable<R> for the first T, because we must return them all. But how do we convert many IEnumerable<R> into a single IEnumerable<R>?

How about we iterate each of the sequences and yield all of the results?

(An Iterator Block makes this easy in C# – it spares us from having to implement IEnumerable<T> and IEnumerator<T> ourselves.)

public static IEnumerable<R> SelectMany<T, R>(this IEnumerable<T> source, Func<T, IEnumerable<R>> selector)
{
foreach (var value in source)
foreach (var innerValue in selector(value))
yield return innerValue;
}

And now we're done with the Many (a.k.a., flat) part.

We've just implemented the bind function in the list monad by iterating over the source sequence, mapping each element into a nested sequence, and flattening all of the nested sequences into the output sequence. The name flatMap makes perfect sense! The name SelectMany makes sense too, although it’s not a typical naming convention.1

The significance of this implementation may not be clear yet, but one thing is certain: it was easy to derive this implementation by following the types.

It’s A Twister! It’s A Twister!

Shouldn’t all monads have similar signatures for their SelectMany implementations?

Let's look at bind for IObservable<T>. And for comparison, IEnumerable<T> as well.

public static IObservable<R> SelectMany<T, R>(this IObservable<T> source, Func<T, IObservable<R>> selector)

public static IEnumerable<R> SelectMany<T, R>(this IEnumerable<T> source, Func<T, IEnumerable<R>> selector)

Okay, so there's clearly a pattern here. It’s the same exact signature, just with different generic types.

How can we implement bind for IObservable<T>?

Since the source is a sequence, it seems that we must invoke the selector function for each T that it contains. And there's only one way to get T's out of an observable: Subscribe

public static IObservable<R> SelectMany<T, R>(this IObservable<T> source, Func<T, IObservable<R>> selector)
=> source.Subscribe(value =>
{
var results = selector(value);

// ...
});

We're done with the Select (a.k.a., map) part. Now let’s check the return value.

The selector returns an IObservable<R> for a given T, and we need to return a single IObservable<R>. Again, we’re in a situation similar to IEnumerable<T> where we have many instances of the monadic type yet the return type requires a single instance.

To convert many IObservable<T> into a single IObservable<T>, how about we Subscribe to all of the sequences and push all of their values to the observer? It’s a similar concept to the inner foreach that flattens the IEnumerable<T>.

public static IObservable<R> SelectMany<T, R>(this IObservable<T> source, Func<T, IObservable<R>> selector)
=> source.Subscribe(value => selector(value).Subscribe( ? ));

Uh oh, from where do we get an observer to pass to Subscribe? And how do we return an IObservable<R> from this expression? Subscribe returns IDisposable, so that's not helpful.

What we need is an implementation of IObservable<T> that we can return from SelectMany, so that when Subscribe is called with an IObserver<T>, that observer is available to our expression. Then we can pass the observer to the projected observables' subscriptions.

If this seems backwards, it’s because it is. The reactive IObservable<T> interface is dual to the interactive IEnumerable<T> interface. They are structural opposites, so we must think oppositely when solving structural problems like this one.

IEnumerable<T> is synchronous because the caller of IEnumerator<T>.MoveNext blocks while the next value is computed. IObservable<T> is asynchronous because after Subscribe is called with an IObserver<T>, the caller is not necessarily blocked and may continue executing. OnNext is called when values are computed in the future.

Using the same observer to subscribe to each of the projected sequences is similar to yielding from all of the projected enumerables in our IEnumerable<T> implementation above; however, we don't have a language integrated feature like Iterator Blocks, so we’ll have to use Rx's Observable.Create function instead. The Observable.Create function takes a function that has an IObserver<T> argument, which we’ll use to make the inner subscriptions.

public static IObservable<R> SelectMany<T, R>(this IObservable<T> source, Func<T, IObservable<R>> selector)
=> Observable.Create<R>(observer => source.Subscribe(value => selector(value).Subscribe(observer)));

And now we’re done with the Many (a.k.a., flat) part.

Note: I’ve avoided addressing the following issues in the code example above for the sake of simplicity – the focus here is on the essence of the monadic operations, not the implementation details of how to properly implement operators.

  1. This implementation violates one of Rx's contracts: Serialized notifications per observer (§4.2 in the Rx Design Guidelines). If any of the inner observables push notifications on different threads, then a downstream observer may receive overlapping notifications, which is very bad. I'm not including a proper implementation in this blog post, but we can imagine that synchronizing calls to OnNext within a lock would suffice.
  2. This implementation fails to properly support cancellation since we’re ignoring the IDisposable object that Subscribe returns. A proper implementation must hold the disposable for each inner observable, until it terminates. Furthermore, the inner disposables must be tied to the lifetime of the outer disposable, such that if the outer disposable is disposed, then all of the inner subscriptions are cancelled as well.
  3. This implementation fails to properly handle OnError and OnCompleted notifications from the source observable. Furthermore, it improperly handles OnCompleted notifications from the inner observables by completing the sequence as soon as one of the inner observables completes, rather than completing when all observables, including the source, have completed.

The implementation of bind for IObservable<T>, like bind for IEnumerable<T>, is a flattening and a mapping; however, due to the natural asynchrony of the IObservable<T> interface, bind provides an asynchronous merging rather than a synchronous flattening.

In other words, the SelectMany operator for IObservable<T> is actually Select + Merge, whereas SelectMany for IEnumerable<T> is actually Select + Concat.

The bind operator for IEnumerable<T> accepts a sequence as input. For each value, an inner sequence is projected and iterated. Due to the synchronous nature of IEnumerable<T>, only one inner sequence can be iterated at a time. The bind operator will not yield any values from subsequent projections until it has completely iterated over the current inner sequence.

Conversely, the bind operator for IObservable<T> merges all inner subscriptions together, such that when any inner subscription produces a value, it’s immediately pushed downstream.

In both implementations of bind, the inner sequences are being flattened down into a single output sequence.

We’re Not In Kansas Anymore

Language Integrated Query (LINQ) introduces the monad as a first-class citizen in C#. Query Comprehension Syntax is a syntax for monads.2

They are monads because, other than satisfying the monad laws, each monad is a generic type with a specific purpose, paired with a bind function that implements sequential composition over that type, while preserving its purpose.

We can understand how bind represents sequential composition by following this logic:

  1. The bind function SelectMany(M<T> source, Func<T, M<R>> selector)has two parameters.
  2. The source parameter is of the type M<T>. Think of M as the monadic type, which contains values of type T. Monadic types are generic types. (This is what is meant by “amplification” or “embellishment”.)
  3. The selector function projects a value of type T into M<R>, which is the monadic type M containing values of type R.
  4. M<T> represents a computation of values of type T. More precisely, M<T> represents an antecedent computation, because a value of T must be computed before it can be passed to selector.
  5. The selector function represents a subsequent computation, because it accepts a value of type T and returns M<R>. And just like M<T>, M<R> represents a computation of values of type R.
  6. Both M<T> and M<R> are of the monadic type M, although they may contain different types. M<R> can be used as the source parameter in a subsequent bind operation.
  7. The bind operator can be chained together across may projections over the monadic type M, such that M<T> goes to M<R> goes to M<Q>, and so on. Each projection through bind is a linear step in a larger computation.
  8. Therefore, the bind function sequentially composes antecedent computations with subsequent computations, while preserving the purpose of the monadic type.
  9. However, the cardinality of M may be greater than 1, depending on the actual type of M. Containing more than one T means having more than one M<R> because each T will be projected into an instance of M<R>.
  10. The bind operator flattens M<M<R>> into a single M<R> to ensure that its result is a single container of the monadic type M. Flattening preserves sequential composition regardless of the cardinality of M.

For example, IEnumerable<T> represents a lazily-computed, synchronous sequence of values of type T. That’s its purpose, which SelectMany must preserve. As the source for SelectMany, IEnumerable<T> represents an antecedent computation of values of type T, because each T must be computed before it can be projected. SelectMany iterates the source, invoking the selector function for each value of type T, and projecting it into IEnumerable<R>. IEnumerable<R> represents a subsequent computation, because it can be subsequently iterated to generate values of type R. To continue the computation sequentially, IEnumerable<R> can be used as the source for another call to SelectMany. Therefore, SelectMany implements sequential composition.

IObservable<T> represents a lazily-computed, asynchronous sequence of values of type T. It’s similar to IEnumerable<T>, except asynchronous. And how do we consume an asynchronous sequence? Not by foreach, but by passing in a continuation (IObserver<T>). The SelectMany operator for IObservable<T> subscribes to the source with its own IObserver<T>. Sequential composition is implemented by awaiting values pushed into the IObserver<T> and then pushing them as input into a subsequent computation, represented by IObservable<R>. As far as sequential composition goes, it doesn’t actually matter that IObservable<T> is asynchronous, because going from T to R remains sequential.

IObservable<T> is asynchronous because in order to get data out of it, you must pass in a function (a continuation) to be invoked later. It’s the so-called, “continuation passing style”, or CPS for short. Thus, IObservable<T> is the continuation monad.

The bind operator enables us to write sequential composition in a declarative style. SelectMany sequentially composes two parts of our query, while hiding the imperative- or CPS-style mechanisms that are required to read values from the monadic type. We can think of a monadic type as a container of values, and bind lets us compose them without having to explicitly specify how to extract their values. We don’t have to use foreach for enumerables or call Subscribe for observables, until we want to leave the monad. That’s why we use foreach and Subscribe only at the ends of our LINQ queries.

This pattern works with more than just sequences. There are many types of monads; e.g., the “Maybe” monad implements null propagation, as Wes Dyer describes here, while introducing monads. There’s a State monad that represents mutability in a pure functional language like Haskell, an I/O monad that represents input and output, also in Haskell, there’s the Reader monad for computing within an environment, similar to having global or static fields (you guessed it: Haskell), among others.

Monads provide a declarative programming model, but note the relation to imperative-style code. Two consecutive from clauses in a LINQ query is compiled into a single call to SelectMany. It’s like having a semicolon at the end of each line. The first from expression is evaluated, and then the second from expression is evaluated, with the value of the previous expression still in scope.

from x in Xs //;
from y in getYs(x) //;
select (x, y)

This query looks similar to our implementation of SelectMany for IEnumerable<T>, which consisted of nested foreach loops! And in fact it works exactly the same, because this query simply compiles into a call to SelectMany.

A query is a single expression pieced together by applying a sequence of operators, rather than a series of statements that are pieced together by a sequence of semicolons. It’s the composability of the monad that enables us to write a query as a single expression, and it’s the expression syntax that makes the declarative programming style appear similar to the imperative style.

This style of programming is declarative because we can use operators like where, join and group by without sacrificing the readability of the program with all of those foreach loops that we’d need to write if we weren’t using the list monad, or all of those continuations that we’d need to write if we weren’t using the continuation monad.

Essentially, monads enable us to declare what we want the program to do rather than how the program should do it. The “how” part is hidden by the monad, and it differs between monads.

from x in Xs
where x != null
from y in getYs(x)
join z in Zs on y.P equals z.P
select (x, y, z)

Is the above an interactive query over IEnumerable<T> or a reactive query over IObservable<T>?

Or maybe it’s a hybrid? Perhaps Xs is an IObservable<T> and getYs returns an IEnumerable<T>?

Each is a valid possibility. There’s no way of knowing by looking at the query alone!

The monadic pattern is a generalization on composition with embellishments, so it’s no surprise that queries over different types can look the same; however, some operators don’t make sense in some monads. For example, Rx defines the Sample(TimeSpan) operator for IObservable<T> to sample elements by time, which only makes sense because IObservable<T> is asynchronous. Sampling by time doesn’t make sense in IEnumerable<T> because it’s synchronous; however, enumerables do have the ability to sample elements by data; i.e., the Where operator. We can also sample elements by data in IObservable<T>, thus Rx provides a similar Where operator.

Essentially, a monad consists of an embellishing type and a function, bind, that preserves the embellishment through sequential composition.

A monad must also have a unit function, which enters the monad. For example, Rx provides Observable.Return (a.k.a., just).3

Technically, a monad must also satisfy some simple mathematical laws: left identity, right identity and associativity. I won’t go into them here.

Perhaps what makes monads seem mysterious and confusing is how such a simplistic functional trick can be so powerful as to warrant an entirely new syntax in languages like Haskell and C#. Are monads confusing because they mysteriously turn complex imperative-style code into a neat declarative-style code that looks eerily similar to imperative style? I’m confused even writing that question.

A comprehension syntax isn’t technically required, it just makes monads nicer to use! C# doesn’t have keywords for every useful operator, so most of the queries we write are in the form of fluent-method call syntax, whereby we chain operators together using dot notation and pass lambda expressions as function arguments.

For example, here’s the same query again written in fluent-method call syntax:

Xs.Where(x => x != null)
.SelectMany(x => getYs(x).Select(y => (x: x, y: y)))
.Join(Zs, xy => xy.y.P, z => z.P, (xy, z) => (x: xy.x, y: xy.y, z: z));

It’s not great, although it’s probably how most languages use monads. Regardless, both syntaxes (comprehensions and fluent-method call) form declarative expressions. The comprehension syntax is often nicer though because it hides all of those lambdas and automatically broadens the scope of query variables; e.g., in the query comprehension written earlier, x is still in scope after join. The C# compiler accomplishes this simply by changing the type of T to a compiler-generated type that carries all of the variables in scope. We must do the same trick ourselves when writing in the fluent-method call syntax if we want to keep earlier values in scope. We typically do that by projecting anonymous types or tuples throughout our query, as I’ve done in the example above.

One last interesting thing to note about monads…

Because of its declarative nature, applying bind forms a lazily-evaluated expression, just like monads in lazily-evaluated functional programming languages. SelectMany and other operators do not cause any side effects when they are applied. They simply return a new instance of the monad, so that we may continue applying operators to build our computation. When we are ready for side effects, then we leave the monad and the computation may begin. How it begins depends on the purpose of the monadic type.

IEnumerable<T> requires code to pull each value through the composition, thus we use foreach.

IObservable<T> pushes values through the composition, once we’ve called Subscribe.

And that’s the mysterious monad. You can get deeper into the concept of monads in Category Theory, but I think that the essence of why they’re useful to us as programmers is described above.

All good, but monads are only half the story!

If I Only Had a Codomain

According to my chart at the beginning of this post, Task<T> is a comonad. Why?

If you don’t believe me, then we’ll try and disprove that claim by implementing bind for Task<T>. If we can implement bind, then it’s probably a monad, right?

We already know how the signature for bind must look based on the pattern that we’ve identified above for enumerables and observables. They each have similar signatures. To go from the enumerable bind to the observable bind we must only do a monadic type replacement. So let’s grab the bind signature for IEnumerable<T> or IObservable<T> and simply replace the monadic type with Task<T>:

public static Task<R> SelectMany<T, R>(this Task<T> source, Func<T, Task<R>> selector)

Looks good, no problems so far.

Since the source is a Task<T>, it seems that we must invoke the selector function with the T that it contains.

public static Task<R> SelectMany<T, R>(this Task<T> source, Func<T, Task<R>> selector)
=> selector(source.Result);

Okay, that compiles, but it’s totally incorrect! Reading the Result property may block, and blocking isn’t preserving the purpose of Task<T>. The purpose of Task<T> is to represent the asynchronous computation of T. The SelectMany function must allow us to sequentially compose asynchronous computations, which means that we must be able to project a Task<T> into a Task<R> without blocking to read T. Essentially, SelectMany must return a lazily-evaluated expression, just like the implementations for IEnumerable<T> and IObservabe<T>.

Great, so how do we get the T out of the Task<T> asynchronously?

For the C# devs out there, our instinct is to define SelectMany as an async method and then await source. And that may seem appropriate, but clearly await must be doing something under the covers in order to get the T from the Task<T> without blocking. How does it do it? Whatever the C# compiler is doing for await, we must be able to do ourselves…

Well, the only way to get the result asynchronously is to pass a continuation to ContinueWith (a.k.a., then). But what is this strange method anyway? Let’s take a look at its signature (one overload in particular4).

public static Task<R> ContinueWith<T, R>(this Task<T> source, Func<Task<T>, R> selector)

Hmm, now that method looks familiar. Is it bind? Not quite. Here’s bind again:

public static Task<R> SelectMany<T, R>(this Task<T> source, Func<T, Task<R>> selector)

The type arguments in the selector function are reversed! Well, they are sort of reversed. The T and R type arguments remain in the same positions, but the generic Task type is in the opposite position. The selector function for ContinueWith has a Task parameter, rather than returning a Task.

So what is the selector in ContinueWith supposed to do?

Instead of being given a value and returning a new, embellished value, it’s given an embellished value and it has to return a new, unembellished value. In other words, it has to extract the value from the comonadic type inside the projection! This makes sense if we think of comonads (and monads) as containers in general.

And that’s how to safely (i.e., in a non-blocking manner) extract a value from Task<T>. We must read the Result property only from within the selector (continuation) supplied to ContinueWith, in order to preserve the purpose of Task<T>.

public static Task<R> SelectMany<T, R>(this Task<T> source, Func<T, Task<R>> selector)
=> source.ContinueWith(task => selector(task.Result)).Unwrap();

Unwrap is also required, because selector returns Task<R>, yet the selector in ContinueWith requires an R. So we make R a Task<R>, thus ContinueWith returns Task<Task<R>>. SelectMany requires Task<R>, so we must apply Unwrap one time to ditch the outer Task.

Recall that bind is a flattening projection (flat map; it’s like going from IEnumerable<IEnumerable<T>> to IEnumerable<T>), whereas here we’re using ContinueWith to compose over an extending projection (Task<R> becomes Task<Task<R>>). The concepts of flattening and extending are opposites. The bind and cobind functions are opposites; therefore, ContinueWith is cobind.

Here, Unwrap is merely the flattening of our bind implementation. It’s how we convert our cobind’s extending projection into bind’s flattening projection.

The Task<T>.Result property (technically, the get_Result method) is the extract function of the Task<T> comonad. The extract function is to comonad as the unit function is to monad. The unit function gets us into the monad, extract gets us out of the comonad. They are opposites!

And now we see why Task<T> is a comonad, and not a monad. Another way of looking at: we can only implement bind in terms of cobind; i.e., we can’t implement SelectMany for Task<T> without calling ContinueWith.

Other than the comonadic laws (analogous to the monad laws), comonads are a triple consisting of a generic type, a cobind function and an extract function. Comonads are the opposite of monads, which are a triple consisting of a generic type, a bind function and a unit function.

As you can see from the code above, it’s fairly easy to convert from cobind to bind. Therefore, we can convert comonads into monads. We can also convert monads into comonads, although I’m not showing it here, but you can imagine that one indicator of a type being monadic rather than comonadic is that when implementing cobind, you’d discover that you must implement it in terms of bind. Try it for yourself.

A major reason that we don’t use Task<T> as a monad is because its comonadic form lends itself naturally to a really useful compiler trick: coroutines. A coroutine is syntactic sugar that allows us to write imperative-style code for continuations, hiding the messy details of the underlying state machine that the compiler must generate. In C#, it’s known as async methods. The await keyword essentially compiles into a call to ContinueWith.

Although we can’t use this compiler trick with monads directly, we can convert a monad into a comonad, and that’s exactly what Rx’s ToTask extension method does. But even better than that, Rx offers an extension method that implements C#’s awaitable pattern for IObservable<T>, thus we can directly await an IObservable<T> to safely (i.e., in a non-blocking manner) extract the final value of the sequence. This works well for singleton observables, especially for aggregations like Sum and ToList.

Because C# has great support for async methods, it’s often recommended to use Task<T> rather than IObservable<T> when your computation only has a single result; however, it appears that for a future release of Rx, Bart De Smet has implemented the new awaitable pattern in C# 7. We’ll be able to define async methods that directly return an observable rather than a Task<T>. This is certainly not a replacement for Task<T> in all async methods, but it may come in handy. To return an observable from an async method, we’ll have to use a new interface, ITaskObservable<T>, which derives from IObservable<T>. It’s just a small grammatical difference, perhaps. One neat thing about ITaskObservable<T> is that it has singleton semantics; i.e., it only calls OnNext once, similar to the fact that Task<T> can only contain a single Result.

Along The Yellow Brick Road…

At the beginning of our journey we had asked the following questions:

“Can we create a cobind function for Lazy<T>? What would it look like? What would it do?”

Along the way we’ve met IEnumerable<T>, IObservable<T> and Task<T>. Now we know how to proceed.

We’ll take the signature for Task<T>’s cobind function and replace all Task<T> with Lazy<T>.

public static Lazy<R> ContinueWith<T, R>(this Lazy<T> source, Func<Lazy<T>, R> selector)

We have a Lazy<T> and a function that requires a Lazy<T>, so should we apply it? (#WWEMD5)

public static Lazy<R> ContinueWith<T, R>(this Lazy<T> source, Func<Lazy<T>, R> selector)
=> selector(source);

Sure, but what about the return value now? We have an R but we require a Lazy<R>.

That’s simple. Let’s make the whole expression lazy by wrapping it in a new Lazy<R>. That makes sense because we must preserve the purpose of Lazy<T>, which is lazy initialization, right?

public static Lazy<R> ContinueWith<T, R>(this Lazy<T> source, Func<Lazy<T>, R> selector)
=> new Lazy<R>(() => selector(source));

When reading Lazy<R>.Value, the selector will first extract the value of Lazy<T>, and with it compute R. In effect, Lazy<T> isn’t evaluated until we attempt to evaluate Lazy<R>, because Lazy<R> depends on Lazy<T>.

Thus, laziness is preserved through composition.

How might our code look when using this ContinueWith implementation for Lazy<T>?

Let’s first define a simple function named Create to take advantage of type inference. It’s just a bit more declarative than having to construct a Lazy<T> ourselves and specify T explicitly.

public static class Lazy
{
public static Lazy<T> Create<T>(Func<T> valueFactory)
=> new Lazy<T>(valueFactory); }

Here’s a simple program that composes two lazy computations:

var i = Lazy.Create(() =>
{
Console.WriteLine("Initializing ‘i’");
return 42;
});

var d = i.ContinueWith(lazy => lazy.Value * 2);

Notice that the construction of Lazy<T> assigned to d is hidden by the cobind function. This is the declarative style of programming, and it appears the same in comonads as in monads.

Using i and d as defined above, what does the following console program print?

Console.WriteLine(i.IsValueCreated);
Console.WriteLine(d.IsValueCreated); Console.WriteLine(d.Value);

Console.WriteLine(i.IsValueCreated);
Console.WriteLine(d.IsValueCreated);
Console.WriteLine(i.Value);

Scroll down to see the output…

.

.

.

.

.

.

False
False
Initializing ‘i’
84
True True
42

Notice that i.IsValueCreated returns False at first, even though d is defined in terms of i. Invoking the ContinueWith function doesn’t cause any side effects, just like how applying SelectMany doesn’t cause any side effects for monads. We can build up a computation with cobind and evaluate it later; i.e., lazy evaluation.

When the program reads d.Value, then i.Value is initialized. Afterwards, i.IsValueCreated returns True, even though the program hasn’t read i.Value directly yet. That’s because d reads i.Value in its continuation, so evaluating d requires i to be evaluated as well. Evaluating the outermost Lazy<T> propagates the act of evaluation up to the source.

Finally, when the program reads i.Value at the end, notice that it’s not reinitialized. Lazy<T> caches the result of its factory function.

We’re Off To See The Wizard

Where does laziness come from? Is it something in the water?

No, it’s something in the function.

A function provides an identifiable name for a sequence of statements in the imperative style. Imperative style programming languages are eager, in that when a program encounters a statement, it’s evaluated immediately. That’s what imperative means. Therefore, laziness is achieved simply by wrapping statements in a function. A function may be evaluated at any time by invoking it, by name or delegate.

A statement evaluates eagerly.

A statement in a function evaluates lazily.

A lambda expression represents an anonymous function with a body consisting of a single statement. We bind a lambda expression to a named variable or parameter, and that becomes its name for the purpose of invoking it. A lambda expression isn’t evaluated at the site at which it’s defined, but at the site at which it’s invoked. Therefore, lambda expressions are lazily-evaluated. They are functions.

In a pure functional language like Haskell, a function consists of expressions, and expressions are interchangeable with values.6 Replacing an expression with its value cannot be done safely in imperative-style languages because an expression may have side effects, and you’d expect those side effects to occur each time that the expression is evaluated. Evaluating a Haskell program is similar to reducing a mathematical expression into its simplest form, yet expressions in Haskell are, perhaps surprisingly, lazily evaluated. Evaluation is deferred until an expression’s value is actually needed by the program. And once an expression has been evaluated, its value may replace every instance of that expression in the program. Perhaps expressions in Haskell are similar to Lazy<T> in C#?

So how is Lazy<T> related to a function?

Well, the Lazy<T> constructor turns a parameterless function into a type

public Lazy(Func<T> valueFactory)

that has a property

public T Value { get; }

which is technically a parameterless function

public T get_Value()

that has the same exact signature as valueFactory, the Func<T> that was used to construct the Lazy<T> instance in the first place.

So can we shake off the Lazy<T> type and implement Func<T> as a comonad?

Let’s try it. We’ll start with the signature of ContinueWith by simply replacing all Lazy<T> with Func<T>.

public static Func<R> ContinueWith<T, R>(this Func<T> source, Func<Func<T>, R> selector)

That’s wonderfully weird and magical looking. And for our next trick, the types will guide us…

We have a Func<T>, and we have a function that takes a Func<T> as its argument.

Note: A function with a function parameter is known as a higher order function. In this case, we’ve got two higher order functions: ContinueWith and selector.

Functions are given data as arguments.

Higher order functions are given functions as data.

Let’s pass our source function to selector.

public static Func<R> ContinueWith<T, R>(this Func<T> source, Func<Func<T>, R> selector)
=> selector(source);

Just like ContinueWith for Lazy<T>, we notice that our return types don’t match. The selector function returns R, but we need a Func<R>.

We must wrap the entire expression in another function!

public static Func<R> ContinueWith<T, R>(this Func<T> source, Func<Func<T>, R> selector)
=> () => selector(source);

Well, that was easy. It looks the same as the implementation for Lazy<T>, but without a Lazy<R> wrapper. The outer function provides the laziness.

For comparison, here’s the Lazy<T> definition from above:

public static Lazy<R> ContinueWith<T, R>(this Lazy<T> source, Func<Lazy<T>, R> selector)
=> new Lazy<R>(() => selector(source));

Is the Func<T> comonad the same as the Lazy<T> comonad?

No, because Func<T> will always compute a value when you invoke it, whereas Lazy<T> only computes its value once. Furthermore, Func<T> may return a different value each time that it’s invoked (in a side-effecting language like C#, this is common.)

Therefore, Func<T> is lazy, but it may also cause side effects each time that you invoke it, whereas Lazy<T> may only cause side effects once.

Should we use the Func<T> comonad at all?

Probably not. It offers no real advantage compared to composing lambda expressions, although it does hide the closure (for better or worse). Perhaps Func<T> is the closure comonad? A blog post for another day.

Meet The Lazy Wizard

The laziness of Lazy<T> is related to the idea that applying co/bind forms a lazily-evaluated expression.

We can declare computations over IEnumerable<T> and IObservable<T> that are lazy; i.e., there are no computational side effects until we iterate or subscribe, respectively. And likewise for Task<T>, we can compose together many ContinueWith calls without causing any side effects; that is, until the Task<T> completes. There’s obviously a race condition here, but the point is that ContinueWith doesn’t block. ContinueWith forms a lazily-evaluated computation, although it’s possible that Task<T>.Result was already computed before ContinueWith was applied, which may result in the continuation executing synchronously.7

And so it goes for Lazy<T>. The purpose of Lazy<T> is simply lazy evaluation. That’s what ContinueWith needs to preserve across compositions; although, it’s possible that the lazy value may have already been computed by the time the program attempts to extract it. Just like Task<T>.Result, once it’s computed, we can keep reading Lazy<T>.Value over and over again without recomputing it. We can even build new computations on top. In the case of Lazy<T>, a new computation over a pre-computed value would simply reuse the pre-computed value as the source for the new computation, without having to reevaluate the original source; i.e., memoization of a parameterless function. This is illustrated in the code example above with the explicit call to i.Value at the end of the program: the side effect of writing to the console upon initializing i.Value had already occurred earlier in the program, and it’s not repeated again at the end.

All of these co/monadic types are lazy, but they also differ in behavior when it comes to laziness. The laziness of Lazy<T> and Task<T> isn’t permanent, it’s a one-time use. The laziness of IObservable<T> and IEnumerable<T> is potentially repeatable.

To put it another way: Lazy<T> and Task<T> cache their value to avoid duplicating side effects, whereas IObservable<T> and IEnumerable<T> may cause side effects each time that they are “invoked”. I smell a pattern.

Pulling Back The Curtain

Lurking in the shadows, as usual, are side effects. Being able to control side effects is a major benefit of having these co/monadic constructs in C#.

The point of Lazy<T> is to ensure that any side effects only occur once, when computing a single value of T. That’s why Lazy<T> is often used to represent a static singleton in C#. If computing the value causes side effects, then we don’t want to compute the value until it’s actually needed by the program. And once it’s been computed, we don’t want to repeat the computation. Of course, there are other reasons to have singletons, but using a static readonly field without Lazy<T> is generally good enough for most singletons. It’s only when we want to defer side effects that we use Lazy<T> instead.

A synchronous sequence of values is similar to Lazy<T> but with a larger cardinality: Lazy<T> only has a single Value, whereas IEnumerable<T> has many. But there’s another significant difference: IEnumerable<T> doesn’t necessarily cache its results. Each time that you iterate, you may get different values. Furthermore, an enumerable query is a computation that may have its own side effects regardless of whether the source IEnumerable<T> caches its results or not, but those side effects aren’t realized until you iterate it.

Task<T> represents an asynchronous function that eventually computes a Result. We use Task<T> to avoid duplicating the side effects of an asynchronous computation. Once it completes, subsequent reads of the Result property will always return the same value, thus side effects won’t occur again. If we actually want to repeat the side effects, then we must invoke the async function again, which will return a new instance of Task<T>.

An asynchronous sequence of values is similar to Task<T> but with a larger cardinality: Task<T> only has a single Result, whereas IObservable<T> has many. But there’s another significant difference: IObservable<T> doesn’t necessarily cache its results. Each time that you Subscribe, you may get different notifications. Furthermore, an observable query is a computation that may have its own side effects regardless of whether the source IObservable<T> caches its notifications or not, but those side effects aren’t realized until you Subscribe.

Side effects are related to temperature:

Cold means that side effects may occur.

Hot means that side effects do not occur.

So how does the laziness of co/monads relate to temperature?

Let’s consider the properties of the types of the co/monads separately from their co/bind implementations.

The Func<T> type may return a different value each time that it’s invoked, and that’s the cold programming model. Therefore, functions are cold.

The Func<T> comonad described above defers side effects until the outer-most function is invoked. That’s also the cold programming model, and essentially its purpose. The Func<T> comonad composes functions lazily. But it’s also pointless, given that Func<T> is already cold. The Func<T> comonad simply adds an unnecessary layer of cold indirection.

The Lazy<T> type is initially cold, because it defers side effects until its Value property is read, yet it becomes hot for subsequent reads of the Value property. In other words, the first time that Value is read it may cause side effects, but subsequent reads definitely will not. (There’s also a race condition if Value may be read by multiple threads simultaneously, but that's similar to Task<T>.Result, so let’s address the race condition in our analysis of Task<T> that follows.)

The Lazy<T> comonad is initially cold, because cobind defers side effects until the outer-most Value property is read, yet it becomes hot for subsequent reads of the outer-most Value property.

The Task<T> type is initially cold, because it can be constructed by passing in a function and it defers side effects until its Start method is called; however, it can only be executed once. Calling Start again throws an exception. Furthermore, the Task<T>.Result property is always hot; therefore, Task<T> transitions to hot when Start is called.

However, it’s rare to acquire a Task<T> that hasn’t already been started. Typically we use Task.Run, Task.Factory.StartNew, or invoke some BCL or user-defined function that returns a Task<T>. In these cases, Start has already been invoked, thus the computation may have already begun even before the caller acquires a reference to the Task<T>. The caller can’t invoke Start again, and reading the Result property never causes side effects, thus our typical usage of Task<T> is hot, yet the Task<T> type itself is technically cold. In other words, notice that we typically acquire an instance of Task<T> by invoking a function that returns it. Functions are cold, as stated previously. Therefore, each time that we invoke a function that performs an asynchronous computation, it may return a new instance of Task<T> to represent its Result, which is always hot. Invoking the function is essentially like invoking the Start method yourself. In that case, Task<T> is hot because it has already been started by the function that returned it, yet the Task<T> type itself is technically cold because it must be started at some point.

Alternatively, there’s another way to get an instance of Task<T> (a.k.a., “future”): use a TaskCompletionSource<T> (a.k.a., “promise”). TaskCompletionSource<T> replaces the need to call Start by essentially making the Result property assignable. The Task<T> that it returns is hot because it’s constructed without a function (via an internal constructor), thus it has no computation to be deferred. A consumer of the Task<T> is unable to call Start, thus it’s unable to cause side effects, and so this usage of Task<T> is hot. This doesn’t change the fact that the Task<T> type itself is cold, at least as far as its public API is concerned. The promise mechanism is useful because it decouples the two features of a future: It separates its ability to defer a computation from its ability to compute asynchronously; i.e., TaskCompletionSource<T> makes Task<T> hot without having to begin a computation by calling Start. Instead, the computation occurs externally and terminates when Result is assigned (or the task faults, or it’s canceled – but we’ll ignore these states for the purposes of this analysis).

The Task<T> comonad is initially cold, because cobind defers side effects until the Task<T> completes. It’s cold because the continuation is a function, and functions are cold. However, it becomes hot once the Task<T> completes. If you apply ContinueWith after the Task<T> has completed, then it may execute the continuation synchronously. There’s a race condition.

Therefore, the Lazy<T> and Task<T> types have the same temperature and similar behavior; i.e., they are both initially cold and then transition to hot by caching their value, which avoids duplicating side effects. Lazy<T>.Value blocks until its computation has completed; likewise, Task<T>.Result blocks until its computation has completed. The difference between them is that Lazy<T> computes its Value synchronously, and Task<T> computes its Result asynchronously. This means that to extract the Value from Lazy<T>, a program must block. To extract the Result from Task<T>, a program can do other things while Result is being computed. Since the only purpose of Lazy<T> is to defer its computation of Value, it doesn’t make sense to have a method like Run; it would simply block until Value has been computed, thus defeating the purpose of Lazy<T>. Since the purpose of Task<T> is asynchrony, in addition to deferring its computation of Result, it’s actually useful to hide its coldness behind a function that returns a hot Task<T>.

As comonads, both Lazy<T> and Task<T> are initially cold, yet they become hot once their value has been computed. In the case of Lazy<T>, cobind is synchronous (it blocks until Value is computed), whereas for Task<T>, cobind is asynchronous (it doesn’t block, although it may execute the continuation synchronously when Result has already been computed and the ExecuteSynchronously flag was set). In either case, the transition from cold to hot means that there’s a race condition.

Therefore, comonads and their comonadic types are cold, which is the definition of deferred execution. In addition, comonads that transition from cold to hot have a race condition.

The IEnumerable<T> type is either cold, as in C# Iterator Blocks, or it’s hot, as in pre-computed lists and dictionaries.

The IEnumerable<T> monad is cold, because bind defers side effects until the enumerable that it returns is iterated. This is true even if the IEnumerable<T> that is the source of the computation is hot. Although no side effects will occur when iterating a hot enumerable such as a List<T>, the computation itself (a.k.a., the query) may cause side effects when it’s iterated, and it can only cause side effects when it’s iterated.

The IObservable<T> type is either cold, as in Rx generators like Return, Range, Generate, Timer, Interval and Create, or it’s hot, as in Subject<T>, events and hot Task<T> conversions.

The IObservable<T> monad is cold, because bind defers side effects until an observer subscribes to the observable that it returns. This is true even if the IObservable<T> that is the source of the computation is hot. Although no side effects will occur when subscribing to a hot observable such as a Subject<T>, the computation itself (a.k.a., the query) may cause side effects when an observer subscribes, and it can only cause side effects when an observer subscribes. Furthermore, due to the asynchrony of IObservable<T>, some interesting and useful combinations of cold and hot temperatures are also possible. (See Hot and Cold Observables for details.)

Therefore, the IEnumerable<T> and IObservable<T> types can either be cold or hot, and they have similar behaviors; i.e., they both represent sequences, which may or may not cause side effects when computing their elements. The difference between them is asynchrony. IEnumerable<T> computes its elements synchronously. IObservable<T> computes its elements asynchronously. This means that to pull the next value from an IEnumerable<T>, a program must block. To be pushed values from an IObservable<T>, a program calls Subscribe and can immediately do other things. In either case, iterating an IEnumerable<T> or subscribing to an IObservable<T> doesn’t necessarily cause side effects. It depends on the implementation; e.g., List<T> does not cause side effects, so it’s hot. An Iterator Block may cause side effects, to it’s cold. Observable.FromEventPattern returns an observable that wraps an event, which is hot. Observable.Range generates notifications each time that you subscribe, so it’s cold.

As monads, both IEnumerable<T> and IObservable<T> are initially cold. Applying bind on an IEnumerable<T> or IObservable<T> won’t cause any side effects until it’s iterated or subscribed, respectively. However, depending upon the implementation, they may or may not transition to hot.

Therefore, monads are cold regardless of the temperature of their types, which is the definition of deferred execution. In addition, monads that transition from cold to hot have a race condition.

Monads and comonads have identical temperatures with regard to composition – they are both cold. Therefore, consecutively applying co/monadic operators is free of side effects. Co/bind implements deferred execution, such that side effects won’t occur until you leave the co/monad.

There’s No Place Like Home

We use monads and comonads to control side effects. It’s the purpose of the property of laziness in co/bind. Co/monads allow us to write queries declaratively, without causing side effects. We control when side effects may occur by reading/invoking/iterating/subscribing at a later time.

Lazy<T> is a comonad with the purpose of causing side effects only once, attached to a blocking computation. The Func<T> comonad is even closer to the metal in that it represents repeatable side effects; although, perhaps it’s not very useful as a comonad.

Task<T> is a comonad with the purpose of causing side effects only once, attached to an asynchronous computation. However, the laziness of Task<T> is subtle. Due to a race condition that is common in the Task<T> implementation of cobind, we must either live with side effects possibly occurring immediately (although asynchronously in general), or we can build our computation on a promise (TaskCompletionSource) and glue it to the future (Task<T>) when the program is ready for side effects. One really nice way of doing that is to define a coroutine; a.k.a., an async method in C#. An async method let’s you await a Task within the imperative style of programming, meaning that typical control flow constructs are allowed without having to resort to implementing a complex state machine over CPS (because the compiler does it for you). Upon the first await, if the Task didn’t complete synchronously, then a promise is returned to the caller of the async method and the remainder of the computation from that point is referenced in a continuation that is attached to the Task via cobind. Side effects of the downstream computation can be controlled easily by simply holding the Task rather than applying await. The program can apply await at any time in the future to continue the computation with its Result. The race condition, in this case, is irrelevant.

IEnumerable<T> is a monad with the purpose of causing repeatable side effects, attached to a blocking computation.

IObservable<T> is a monad with the purpose of causing repeatable side effects, attached to an asynchronous computation.

Even though a sequence may be hot or cold, applying the bind operator (or any operator) itself doesn’t cause any side effects, until the sequence is iterated/subscribed. Furthermore, an operator may cause side effects each time that its return value is iterated/subscribed. The monadic bind function is about repeatable side effects, regardless of the temperature of its monadic type.

In conclusion:

Comonads represent lazy computations of single values, are initially cold, and transition to hot when the computation completes.

Monads represent lazy computations over zero or more values, depending on the type. They are initially cold, and may transition from cold to hot, depending on the type.

And why is any of this important?

Because a language like C# is full of side effects. Being able to introduce side effects so easily is often what makes C# really useful, but it’s also what makes many C# programs really confusing and buggy.

Controlling side effects in a declarative manner makes programs much easier to reason about, and to prove correctness. Although, some parts of a program are just much easier to write using imperative side effects.

Therefore, the careful combination of declarative and imperative styles within the same program is often the best choice. Functions generally help with the former, as in the procedural programming style. Monads also help with the former, computing over sequences in a declarative style. Comonads (via coroutines) generally help with the latter, when single-valued asynchronous computations are required.

These techniques are becoming even more important as modern programs increasingly depend on concurrency.

References

Notes

  1. The name of SelectMany is similar to the name of the Select operator, which is the same as the SELECT operator in SQL. Likewise, the Where operator in LINQ is named the same as the WHERE operator in SQL, and so on. When LINQ was invented, apparently the designers had chosen SQL-like names for the operators to make developers more comfortable. Most developers were used to SQL, but didn't have a clue about monads. And this naming convention works of course because SQL is a monadic comprehension syntax!
  2. The C# compiler actually looks for a special signature of a method named SelectMany rather than the one shown above. It must have an intermediate selector parameter in addition to the result selector. It's just a performance optimization and so I won’t elaborate on it here.
  3. Rx actually provides several generators, including Range, Generate, Timer, Interval and Create. There are many ways to enter the continuation monad. Likewise, there are many ways to enter the list monad, IEnumerable<T>, because there are many types that implement IEnumerable<T>. And C# offers another option: Iterator Blocks.
  4. The FCL implementation of ContinueWith is defined as an instance method. Here, I’ve rewritten it as an extension method and renamed its parameters, for parity with the SelectMany method. Other than those insignificant changes, it remains identical to the definition in the FCL.
  5. What Would Erik Meijer Do?   https://twitter.com/headinthebox/status/543919077752729600
  6. https://wiki.haskell.org/Referential_transparency
  7. The TPL doesn’t invoke continuations synchronously unless you opt-in, and even then it’s not guaranteed. For the most part, the TPL is designed to introduce concurrency. This is in contrast to the design of Rx, which is free-threaded and rarely introduces concurrency. Operators introduce concurrency when you explicitly specify a concurrency-introducing scheduler, although operators that involve some kind of timing may default to a concurrency-introducing scheduler if you don’t pass in one; e.g., Throttle, Sample, Buffer, etc. This is because Rx is primarily about taming concurrency, not introducing it.

Tags: , ,

Async | C# | Patterns | Rx

October 10, 2014

Reactive Reactive! Read all about it!

Like the late 1800's American newspaper boy used to shout, in this blog post we'll be taking a look at something "extra" in Rx.

Reactive Reactive

Nope, it's not a typo.

It’s like acceleration to speed, or C# to IL, or higher-order functions to functions.

Sometimes fast isn't fast enough.  Sometimes abstract isn't abstract enough.  Sometimes functions aren't... functiony enough?

Sometimes Rx operators aren’t reactive enough.

Many operators in Rx force us to supply static arguments, resulting in an operator's static behavior.  By static I mean upfront, immutable, unvarying values; e.g., typically scalar primitives like Int32, TimeSpan and DateTimeOffset.  Furthermore, stateful operators with static parameters prevent us from varying behavior without a potential loss of data.

Here's a simple example of a stateful operator with a scalar parameter.  Let's say that we have an observable and we want to skip the first 5 notifications:

var xs = observable.Skip(5);

That's great, but what if we don't know how many notifications we want to skip yet?  Or what if we want to change that value later while the observable is active?

Defer doesn't work if we want to subscribe now.

We could attempt to change the value, without causing side effects, by writing some complicated dance within the Publish operator involving subscriptions, disposing, recreating the query and resubscribing; however, we'll lose the state within the operator that tells us how many values have already been skipped.  We could keep track of the number that have already been skipped ourselves, but then what's the point of the Skip operator?

Ok, easy solution, we define a new overload:

public static IObservable<T> Skip<T>(this IObservable<T> source, IObservable<int> count);

Now we can change the count at any time without resubscribing and without losing any state.  We'll simply push in a new value through count whenever we want to change it.

Subscribe now, decide later.

Maybe we'll never push in a value and so everything will be skipped.  Maybe we'll push in a value of 10 before the first 10 notifications have been skipped, thus only the first 10 notifications will be skipped.  Maybe we change our minds and push in a value of 5, but it's too late – we've already skipped the first 7 notifications, thus only the first 7 notifications are skipped.  At this point, the operator unsubscribes from count.1

But wait, are the values in count even necessary anymore?  It seems overly complicated.  Can't we just generalize our solution in terms of time?

public static IObservable<T> SkipUntil<T, U>(this IObservable<T> source, IObservable<U> other);

In other words, instead of an observable count, we'll simply use an observableT is ignored.  When the observable signals or terminates, the operator stops skipping.

This "reactive reactive" pattern is already in use by several operators in Rx, such as SkipUntil and TakeUntil as of Rx 1.0, and some overloads of Window, Buffer, Throttle, Sample, Delay and Timeout as of Rx 2.0, but upon further inspection it seems that Rx is potentially missing a lot of similar opportunities.

Reactive Overloads for Static Parameters
https://github.com/Reactive-Extensions/Rx.NET/issues/18

Merge – A Case Study

In one such example, we can see how data loss is a critical problem.  The Merge(maxConcurrency) operator is very powerful, but its behavior is static.  While inner observables are pushed in, Merge subscribes to them until maximum concurrency is reached, at which point any observables received are enqueued for later.  Often these observables capture non-recoverable application state.  To change the maximum level of concurrency in response to the changing capabilities of the environment requires disposing of the subscription to Merge's observable, reforming the query with a new static argument and resubscribing; however, in the process any observables enqueued by Merge are lost, and their captured application state is lost along with them.

Merge with Dynamic Maximum Concurrency
https://github.com/Reactive-Extensions/Rx.NET/issues/48
https://social.msdn.microsoft.com/Forums/en-US/62743ffd-befd-474a-8f0a-19dbaec7a926/design-question-throttling-hits-on-external-systems

Conclusion

By providing a full suite of reactive/reactive overloads for Rx operators, developers would be enabled to write queries that adjust the behavior of operators dynamically based upon dynamic application state, user input and/or heuristics about the runtime environment to improve the performance of their applications, without losing data and without the costs of having to repeatedly solve similar problems themselves.

 

  1. One could also imagine the next logical scenario in which we raise the value back to 10, thus perhaps the first 7 notifications and the 9th and 10th notifications are skipped, but the 8th notification makes it through.  Although these semantics are interesting, they're not the semantics of the Skip operator.  Skip is all about skipping consecutive notifications from the beginning of the sequence.  I'm not saying that these semantics aren't useful, it just seems more like the behavior of the Sample operator to me and surely a similar analysis applies to that operator.

Tags:

Rx

September 13, 2014

Reactive Extensions: The Power of T

The T in IObservable<T> is more powerful than you may realize.

Structural T

T carries data, its most obvious purpose.

T can be any type, such as Int32, String or MyAmazingObject.

T can be void, as in System.Reactive.Unit.

T can be anonymous, as in:  new { Value = "T", Awesome = true }

T can be a choice, as in Either<TLeft, TRight>, which also seems to fix so-called "glitches".

T can be a group of T, as in IObservable<IGroupedObservable<K,T>>; e.g., GroupBy.

T might be there or it might not, as in Maybe<T>.

Meta T

T carries notification.  Observables notify observers by pushing T.

can be notification reified, as in Notification<T>.

T can be collection change notifications, as in CollectionNotification<T>.

T can be directed mutations, as in CollectionModification<T>.

Temporal T

T carries time.  Observables push T asynchronously (though sequentially-§4.2).  Even pushing synchronously takes time though.

T can be a point in time; implicitly, the time at which it's observed, as in Amb or CombineLatest.

can be time reified, as in Timestamped<T>, TimeInterval<T>, Recorded<T> or simply DateTimeOffset.

Temporal Window T

T carries duration.  A window is a duration of time that has a start and maybe an end.

T can start a window, as in Join, GroupJoin, Window or Buffer.

T can end a window, as in Join, GroupJoin, Window(closingSelector), Buffer(closingSelector), etc.

T can be a window, as in IObservable<IObservable<T>>.

T can be the end of time, as in that which precedes it, interpreted relative to the time of subscription (e.g., Timer), relative to a previous T (e.g., Interval or Throttle), or relative to some out-of-band time (e.g., Sample).

Channel T

T can be a channel or callbackObservers invoke T to affect an observable.

T can be a delegate, as in Action or Func<TResult>.

T can be a subject, as in IObservable<ISubject<T>>.  This establishes a full duplex, asynchronous communication channel.  The observable is a singleton and both ends subscribe to the subject; e.g., a chat application. (See also this blog post.)

Future T

T can be the promise of a future T, as in IObservable<Task<T>>; although, it's somewhat redundant since IObservable<T> is also a promise of a future T and it has a wider cardinality [0,∞].  Task's cardinality is strictly [0,1].

Conclusion

T carries data, notificationtime and duration.  It can also have communication channel or promise semantics.  Best of all, T carries any additional semantics that you choose.

So the next time that you find yourself stuck trying to figure out which Rx operator is the right one to solve your particular problem, pause and give T the attention that it deserves.  You may just find that the answer to your problem lies within how you make use of T.

So, have I missed any categorically interesting uses for T?  How do you use T?

Tags:

Patterns | Rx | Rxx

April 30, 2014

Checklist for Creating Rx Operators

This blog post was originally posted here on the Rx forum back in 2012. I've noticed through reference tracking to my blog that it still gets some hits once in a while, so perhaps people are finding it to be useful. I figure it may get more visibility if I copy it to my blog. Note that I've made a few small elisions, corrections, clarifications and additions here, though I've left it essentially the same as the original post.

Prerequisites

  • Know the existing Rx Design Guidelines document.  You will find references to it throughout this checklist.
  • Know how to properly use existing Rx operators.  This checklist is not about how to properly use existing Rx operators, it's about how to define new operators for use in your business applications, or for inclusion into custom reactive frameworks that are based on the same design principles as Rx.

For the purposes of this checklist, an observable operator is any instance, static or extension method that returns IObservable<T>, including those that are typically considered to be static factory methods or combinators. (§6.3)

Also for the purposes of this checklist, the term observable refers to any local variable, parameter or member that is of the type IObservable<T> or a delegate to a function that returns an instance of IObservable<T>.

Checklist

Below is a categorized list of factors that I think are important to consider when creating new observable operators, ordered in general from most important to least important. This is not intended to be an exhaustive list, yet.  I'll be happy to update it based on your feedback.

Behavior

  1. Implement your new operator by composing existing Rx operators together, whenever possible; otherwise, just use any overload of Observable.Create.  Do not implement IObservable<T> yourself.  (§§6.1, 6.2)
  2. Ensure that your operator satisfies the Rx grammar and assume that IObservable<T> parameters are well-behaved. (§§4.1, 6.6)
    OnNext* (OnCompleted|OnError)?
  3. Ensure that your operator serializes notifications and assume that IObservable<T> parameters are well-behaved. (§§6.7, 6.8, 4.2)
  4. IObservable<T> is a model for concurrency; therefore, assume that observable parameters execute concurrently when Subscribe is called. Ensure that your operator is thread-safe and, when merging or combining the notifications of multiple observable parameters, ensure that your output observable serializes notifications.  (§§4.4, 5.9)
  5. Assume that IObservable<T> parameters are cold. Convert their temperature before subscribing multiple times to avoid duplicating subscription side effects, if your operator does not have retry or repeat semantics.  (§5.10)
  6. Protect calls to user code.  (§6.4)
  7. Do not catch exceptions thrown by observers; i.e., calls to OnNext, OnError and OnCompleted.
  8. Implement lazy (deferred) execution when generating a cold observable.  Check arguments up front, but do not cause any side effects until Subscribe is called.  This includes scheduling work, iterating enumerable parameters, and mutating external state or parameters.  (§6.16)
  9. Implement unsubscription, and do it properly.  (§§6.17, 6.18)
  10. Parameterize scheduling when appropriate.  (§§6.9, 6.10, 6.11, 5.4)
  11. Avoid deep call stacks caused by recursion.  (§6.15)
  12. Watch for reentry when executing user code and assigning the result to a SerialDisposable.  Use the double indirection pattern to avoid the effects of race conditions.

Performance and Memory

  1. Do not block.  Execute asynchronously instead.  (§6.14)
  2. Do not introduce extra asynchrony. Avoid the ObserveOn operator, whenever possible. Rely on your notification source, if any, to handle asynchrony itself; alternatively, consider offering parameterized scheduling, especially if you're generating your own notifications.  (§§5.5, 6.9, 6.12)

Semantics

  1. Choose a name that is semantically appropriate for the operator based on its business requirement and intended usage rather than behavioral details; e.g., TakeUntil is a better name than SecondStopsFirstGetCustomerOrders is a better name than SendOrdersRequest or GetServerResponses.
  2. Do not include implementation details in names except to distinguish between otherwise ambiguous operators and parameters.
  3. Use pluralization to indicate that an observable may generate more than one notification; e.g., LoadImages.
  4. Consider naming an extension method returning a hot observable as if it's a property; e.g., MouseMoves.
  5. Add an "Observable" suffix to distinguish an operator from existing synchronous and asynchronous methods that have similar names; e.g., Stream.ReadObservable.

Documentation

  1. Specify whether the observable returned by your operator is synchronous, asynchronous or concurrent when Subscribe is called.
  2. Specify whether the observable returned by your operator is hot or cold.  Be specific about what, if any, side effects may occur when the operator is called and/or when Subscribe is called.

Style

  1. Consider whether creating an asynchronous method(C# 5; VB 11) is a better fit.  This may be true when the generated observable is a singleton (cardinality = 1) and callers aren't necessarily dependent on Rx.  Any Task-returning operator is easily converted by callers into an observable via the ToObservable method.  Note that when complex control flow is required or the operator depends on Task-returning methods (e.g., when await is useful) you don't necessarily have to define an async method.  Instead, Rx 2.0 Release and Rx 1.1 Experimental define overloads of Observable.Create for defining async iterators.
  2. Avoid using subjects.  It's alright to use them implicitly if necessary; e.g., Publish.  (To Use Subject or Not To Use Subject)
  3. Avoid closing over local variables defined in the outer method body.  Sometimes this pattern is useful, but often it's a mistake that causes an otherwise cold observable to behave unpredictably because state is shared among multiple calls to Subscribe.

Tags:

Rx

August 05, 2013

Hot and Cold Observables

I've touched on the topic of observable temperature while answering questions on the Rx forum and in my previous post on subjects.  I tried explaining it more deeply in this particular discussion, but now I'm finally ready to provide a deep and comprehensive1 analysis in this blog post.

I'll answer the following questions: 

What really makes an observable hot or cold?
Why should I care?
How can I tell if an observable is hot or cold?
How can I change the temperature of an observable?

The analysis that follows is not intended for beginners, though you may find the following section and the conclusion to be informative.  You may also want to watch this first and look here for additional information.

Common Sense

We like to think of hot observables from two different perspectives as follows.

From the perspective of observers, there's the potential to miss notifications.  Hot observables are "live" sequences, meaning that they are happening whether we observe them or not.  The canonical example is a mouse, which of course is free to move regardless of whether you subscribe to an IObservable<MouseMoveEventArgs> or not.  In general, if I hand you an observable and tell you that it's hot, then as an observer you'd infer that you might have missed notifications that happened before you subscribed.  Pretty simple.

From the perspective of observables, hot observables broadcast notifications to all observers.  The canonical example is converting an event into an observable; e.g., FromEventPattern.  If three observers subscribe to IObservable<MouseMoveEventArgs>, then each observer will serially2 observe the same notifications, as opposed to observing different notifications or concurrent notifications.

We like to think of cold observables from the same perspectives as follows.

From the perspective of observers, there's the potential for each observer to get different notifications.  Cold observables are "generated" sequences, meaning that they can generate different notifications for every observer.  Furthermore, observers may receive notifications asynchronously, with respect to each other.  The canonical example is Create, which can asynchronously generate notifications whenever an observer subscribes.

From the perspective of observables, cold observables won't generate notifications until an observer subscribes, and they generate notifications each time that an observer subscribes.  The canonical example is Range, which generates a range of numbers whenever an observer subscribes.

In summary, common sense tells us that:

Hot observables are always running and they broadcast notifications to all observers.

Cold observables generate notifications for each observer.

I intend to show that these ideas are more like symptoms rather than definitions.  I'll also identify a pattern to reduce them into a primitive concept: side effects.  Ultimately, I'd like to ensure that hot and cold are well-defined terms based primarily on the concept of side effects.3

Enough skipping rocks over the topic.  Let's dive into the watery refraction and expose the peculiarities of our naïve understanding of these concepts.

Uncommon Sense

We know that an observable can be either hot or cold, but can they mix?  Can an observable be generated while broadcasting?  Can it be generated  and yet always running?  Can it be always running without broadcasting?  Can the temperature change dynamically?

That last question is particularly interesting.  Can an observable change its temperature after an observer subscribes?  E.g., Observer E subscribes first, then F subscribes; gets a hot observable while F gets a cold observable.

Can an observable change its temperature for the same observer after it subscribes?  E.g., Observer G subscribes to a cold observable that eventually transitions into a hot observable, for G, while it remains subscribed.

What about specialized subscription behaviors, such as one that causes an observable to behave like it's hot until all observers unsubscribe and then it becomes cold again for a subsequent subscription, which flips it back to being hot again -- does that behavior exist?

The answer to all of these questions is YES.  Well, it seems more like YES and SORT OF, based on our common sense definitions.  I'll try to explain.

Those behaviors might seem strange, but you've probably used them together before.  They're implemented by a couple of well-known Rx operators; e.g., Replay and RefCount.  Similarly, PublishLast and some particular overloads of Publish have strange behaviors with regard to temperature.

For example, keeping in mind our common sense definitions of hot and cold, let's assume that you have an observable O and you call O.Replay() to get an IConnectableObservable<T>, which you assign to C and then call C.Subscribe(J) and C.Subscribe(K).  I think we'd all consider C to be hot (or at least warm, whatever that means) because all of its notifications are broadcast to J and K, even though notifications won't arrive until Connect is called.  It's not entirely hot because it doesn't necessarily satisfy the always running factor, hence the introduction of the confusing term warm.

So far, Replay is identical to Publish.

Then you call C.Connect(), which may cause J and K to observe notifications from O.  If O is hot, then the always running factor was satisfied anyway, but if O is cold, then it's not satisfied; regardless, I think we'd all consider C to be hot at this point because of its broadcasting behavior alone.

Finally, you call C.Subscribe(L), and now there's a race condition on L with respect to our common sense understanding of observable temperature.  If O generated any notifications sometime between the calls to C.Connect() and C.Subscribe(L), then L will immediately observe replayed notifications, thus C was technically a cold observable when L subscribed because those particular notifications aren't rebroadcast to J and K; i.e., C generates specific notifications for L.  It means that while C is hot for J and K, it's cold for L; however, it's not permanently cold for L because after C replays all of the notifications that L missed, then L will be observing the same exact broadcast notifications as J and K, which means that C transitions from cold to hot for L.  Although, if O hadn't generated any notifications before you called C.Subscribe(L), then C would start out hot for L just like J and K.

Apparently, sometimes Replay is hot (though only if its source is hot, and only before Connect is called), sometimes it's warm (though only if its source is cold, and only before Connect is called), sometimes it transitions from warm to hot (though only for observers that subscribed before calling Connect) and sometimes it transitions from cold to hot, with a race condition (though only for observers that subscribe after Connect was called).

Part of the reason that this is confusing is because always running, broadcast and generate are actually relative concepts.  They implicitly refer to the time of subscription, though we tend to ignore this fact.  Always running refers to pre-subscription, generate refers to the time of subscription and broadcast is a post-subscription concept.  This explains how operators like Replay can offer strange combinations of these behaviors.

The common sense meanings of hot and cold are diluted by their differences in relation to the time of subscription and the connectability of operators like Replay, with a dependence on the temperature of the source and the presence of a race condition.

Technically it's possible for an observable to be always running without broadcasting; e.g., simply don't subscribe to a hot observable.  Our common sense definition tells us that it's hot even though it's not broadcasting.  Always running and generating are opposite behaviors, which makes broadcasting seem more like a symptom rather than a definition.

It's also possible for an observable to be generated while broadcasting; e.g., publishing a cold observable.  Again, common sense tells us that it's hot even though it's not always running, because we know that an underlying observable is actually responsible for generating notifications, which are broadcast to all observers.  This makes always running seem more like a symptom rather than a definition.

And finally, as shown by the Replay example above, an observable can be always running and yet generated at the same time.  They'll overlap until generation completes, like emptying a buffer, after which the observable seamlessly transitions into the always running sequence.  It's hard to say whether this kind of sequence is purely cold based on the common sense definition of generation alone.  We almost want to call it, "hot with some additional notifications".  This makes generation seem more like a symptom rather than a definition.

In general, our common sense definitions of hot and cold aren't precise enough to be useful to observers so we tend to ignore them and make assumptions based on what really matters: side effects.  All we really care about is answering one simple question:

If I subscribe to your observable, might it cause side effects?

It would be great if we could specify this behavior in a word.  The terms cold and hot should provide the answers yes and no, respectively.

Subscription Side Effects

Wikipedia defines side effect as:

Modifying some state or causing an observable interaction with calling functions of the outside world.

However, sometimes mutation is the primary effect; e.g., assignment.  Clearly the definition is relative.

So let's define side effect for our purposes as:

Any effect that is not the primary effect.

The primary effect of subscribing to an observable (i.e., calling Subscribe) is to register an observer for callbacks, which probably means that the observer will be added to the observable's internal list of observers and a disposable will be created and returned to represent unsubscription.

Likewise, the primary effect of .NET event registration is to add a handler to the list of delegates in the event's MulticastDelegate instance.  We can easily convert any event into an observable.  The primary effect of subscribing to an observable event is to convert an observer into an event handler, register it with the event, and return a new disposable that unregisters the observer from the event.

That pattern remains the same for other native asynchronous conversions as well, such as FromAsyncPattern and ToObservable (Task/Task<T>).

In addition, the compositional nature of Rx (more on that later in this post) allows operators to form subscription chains.  Subscribing to the outer-most operator's observable causes it to subscribe to the previous operator's observable, which subscribes to another observable, and so on.  The entire subscription chain is certainly part of the primary effect of subscribing.  Ultimately, the goal is to subscribe to the inner-most observable through as many intermediary subscriptions as necessary.

Subscription chains inherit side effects.  If any inner subscriptions cause side effects, then the outer-most subscription inadvertently causes them too; therefore, our definition of subscription side effects implicitly includes the sum of all inherited subscription side effects.

Based on the ideas above, a subscription side effect is:

Any side effect other than subscribing to another observable, adding an observer to an observable's "list of observers" and creating a new disposable for unsubscription.

where the meaning of "side effect" is defined by wikipedia and "list of observers" is defined by the necessary conversion to the underlying asynchronous model, if any.

Perhaps for the sake of simplicity I should reduce that definition.  Side-effect inheritance is implied, as is adding an observer to a "list of observers" and returning a disposable for unsubscription.  All of these behaviors are intuitively part of the subscription mechanism.

Therefore, a subscription side effect is:

Any effect beyond an observable's subscription mechanism.

Examples of subscription side effects include:  Calling Schedule, OnNext, OnError, OnCompletedGetEnumerator or MoveNext, mutating a field, creating an object in memory, running a CPU-intensive computation, sending a web request, reading a file, ending a process, formatting your C drive, or really anything else that you can think of that isn't merely the subscription mechanism and may cause an observable effect on the outside world, including notifications.

Covering All Bases

Duality

If you're interested in the duality between observables and enumerables, then copy and paste the Covering All Bases section into your favorite text editor and replace the following character sequences (not necessarily on word boundaries):

asynchronous synchronous
observable enumerable
observe enumerate
subscription enumeration
subscribing enumerating
subscribe MoveNext
notification element
always running pre-calculated
broadcast broadcast/share
subject
connection
connect
IBuffer
pushed
listening in to
calling OnNext, OnError or OnCompleted
yielding

It works!  Some of the grammar and spelling is a bit off though, and some of the ideas and operators may be entirely irrelevant due to the lack of asynchrony in enumerables, but the given definitions of hot and cold and the summarization are correct.

The concept of subscription side effects completely covers our common sense understanding of temperature as follows.

Recall our definition of subscription side effects from above:

Any effect beyond an observable's subscription mechanism.

Recall our common sense definitions of temperature from above:

Hot observables are always running and they broadcast notifications to all observers.

Cold observables generate notifications for each observer.

Understanding how cold behavior relates to subscription side effects is easy:

Generate means calling OnNext, OnError or OnCompleted when Subscribe is called.  This can be done synchronously or asynchronously with an IScheduler.  Either way these actions are, by definition, subscription side effects.  That's pretty simple.

Now let's jump to the common sense definition of hot and see how it compares:

Always running means that as soon as an observer subscribes it begins listening in to notifications.  It implies that notifications will not be generated as a side effect of subscribing, but that the mere act of subscribing enables an observer to receive notifications that would have occurred anyway.  In other words, subscribing to an observable that is always running does not necessarily cause subscription side effects.  The primary effect of subscribing is all that is needed to begin receiving notifications asynchronously.

Broadcast means that when a notification is ready to be observed it is pushed to all currently subscribed observers, which does not seem to imply anything about subscription side effects.  Theoretically, an observable could asynchronously generate notifications upon subscription, pushing them to the original observer that caused these subscription side effects while also broadcasting them to any observers that are fast enough to subscribe before the notifications are generated.  That's not the same behavior as Replay, for example, because Replay doesn't broadcast the notifications that it generates for an individual observer.  Apparently this behavior doesn't exist in any Rx primitives.  When an observer subscribes to a cold observable, without publishing, it observes notifications that are generated for it alone; i.e., cold observables don't broadcast.

Broadcast in Rx means that an observable is not responsible for generating notifications itself, but instead broadcasts notifications on behalf of some underlying observable.  That's the difference between the common sense definitions of cold and hot, respectively; therefore, broadcasting does not cause subscription side effects, meaning simply that when you subscribe to a broadcasting observable there will be no subscription side effects.

Broadcasting a cold observable has to cause subscription side effects once, though it's not caused by an observer subscribing to the broadcasting observable, but the broadcasting observable subscribing to the cold observable; a.k.a., "connecting".  Connection is exposed as a public operation, either by a subject's ability to act as an observer or by an operator returning IConnectableObservable<T>.  Theoretically, connection isn't necessary if the underlying observable is always running because observers that haven't subscribed yet are going to miss notifications whether the broadcasting observable is connected or not; however, if the underlying observable isn't always running (i.e., it's cold), then connection is required to ensure that observers don't miss notifications before they've subscribed to the broadcasting observable.  As a result, exposing the connection behavior of broadcasting observables defers subscription side effects of the underlying observable.  This implies again that broadcasting doesn't cause subscription side effects.  Subscribing to a broadcasting observable is all that is needed to begin receiving notifications asynchronously from the underlying observable, although notifications may be deferred until connection if the observable is connectable.

An emerging pattern here is that subscription side effects aren't expected at all in hot observables, including the case where a hot observable broadcasts notifications for a cold observable, because subscription side effects are deferred until connection.  We'll call them connection side effects instead.

Connectability may be hidden from observers.  Subjects are observables, thus the AsObservable operator can be used to hide their connectability.  The same applies to IConnectableObservable<T>.  In either case, hot behavior is retained even when observers know nothing about the underlying connectability of observables; therefore, simply knowing that an observable broadcasts implies that it doesn't cause subscription side effects.  It may have caused or will cause connection side effects, but that's beyond an observer's control when connectability has been hidden.

In addition, some stateful behaviors in Rx use connection side effects to separate notifications from other kinds of subscription side effects.  This is perhaps the most confusing case as far as the common sense definitions of hot and cold are concerned.

Recall from a previous section, when we reviewed the strange behaviors of Replay and RefCount, we determined that our common sense definitions are diluted because of their differences in relation to the time of subscription and the connectability of operators, with a dependence on the temperature of the source and the presence of a race condition.

Replay, PublishLast, and overloads of Publish that have an initialValue parameter, broadcast all subscription side effects, including notifications, to any observer that subscribes before they are connected to the underlying observable.  They also broadcast all subscription side effects, including notifications, to any observer that subscribes after connection but before the underlying observable generates any notifications.  They broadcast all subscription side effects, excluding missed notifications but including future notifications, to any observer that has missed notifications.  The missed notifications aren't broadcast, they are generated specifically for the particular observer that missed them.

These operators show that hot and cold are relative terms with regard to different kinds of subscription side effects, of which notifications are just one example.  They separate missed notifications from other kinds of subscription side effects to return a relatively cold observable or a relatively hot observable, respectively.  This is possible because broadcasting is a post-subscription concept and the ability to miss notifications is a pre-subscription concept that can be remedied at the time of subscription through generation, thus allowing an observable to be hot for some observers and cold for others, simultaneously.

Furthermore, the ability of these kinds of observables to transition from the common sense definitions of cold to hot is of no importance now, given that all we really care about is subscription side effects.  In other words, the newly defined hot and cold terms are relative to the time of subscription and to different kinds of subscription side effects, so the ability to transition is irrelevant.  An observable either causes subscription side effects or it does not.

Thus we no longer need any confusing terms such as warm to describe the behavior of connectables.  A connectable observable is always hot before it's connected and is generally hot afterwards, though some operators apply special behavior to missed notifications to ensure that they aren't actually missed.  These operators return an observable that is cold with regard to missed notifications, for a particular observer, and hot with regard to any other kind of subscription side effect, including future notifications; however, because our new definitions of hot and cold make transitioning irrelevant, we can say absolutely that an observer that has missed notifications will get a cold observable from these operators.  (The race condition's effect on our new definition of cold will be addressed later in this post.)

In summary, our new understanding of temperature with regard to subscription side effects tells us that:

Hot and cold are relative terms with regard to the time of subscription and to different kinds of subscription side effects.

Hot observables either don't cause subscription side effects or defer them until connection, where they become connection side effects.

Cold observables rely on subscription side effects.

Perhaps we can distinguish between these terms further by defining whether subscription side effects may or may not occur:

Hot observables do not cause subscription side effects.

Cold observables do cause subscription side effects.  (Though keep reading because later on I'll relax this definition a bit.)

Finally, we now have a specific and useful definition of observable temperature:

Temperature indicates the propensity of an observable to cause subscription side effects.

Furthermore, we can largely ignore the relativity to particular kinds of subscription side effects since it only matters when discussing the special behaviors of Replay, Publish and PublishLast.  And those operators only distinguish missed notifications from other kinds of subscription side effects, so they can easily be described as generally hot yet cold with respect to missed notifications.

In general, the terms hot and cold cover all kinds of subscription side effects, including notifications. 

Concerning Replay, PublishLast, and Publish overloads with an initialValue parameter, they return observables that are cold for particular observers that have missed notifications, yet they are hot with respect to any other kind of subscription side effect.

Now let's test the reverse: We'll see if our original common sense definitions can be inferred from our new definitions of hot and cold.

Given an observable that is known to not cause any subscription side effects, one must infer that it's either always running, which never causes subscription side effects, or it must broadcast, which defers subscription side effects into connection side effects; therefore, we must refer to the observable as hot.

Our new definition seems to be holding up so far.  Let's see how it compares to the common sense definition of cold.

Given an observable that is known to cause subscription side effects, one can infer that it might generate notifications when an observer subscribes, thus we may refer to it as cold.

It seems like we've got a match; however, notice that I wrote "might generate notifications".  That's because, as described previously, subscription side effects are far more general than just generating notifications.  A cold observable, by our new definition, can cause any subscription side effect.  For example, it can send a web request or asynchronously read from a file, without ever generating any notifications.  Of course, generating a notification would be useful so it's quite common for these examples in particular to generate notifications asynchronously once the operation has completed.

There are other kinds of subscription side effects that do not generate any notifications - not even asynchronously.  For example, imagine tracking the number of observers subscribed to your observable; each time Subscribe is called an integer field is incremented by 1.  That's similar to the behavior of the RefCount operator, described above.  Another example is subscription logging, for diagnostic purposes, which may cause a file to be updated on disc every time an observer subscribes.  The common sense definition of cold doesn't cover these cases, but our new definition does!

It seems to me that our new definition of temperature entirely covers the common sense definition, and does so elegantly as diametrically opposed concepts.  Furthermore, our new definition of hot nicely ties together the two orthogonal concepts that make up the common sense definition of hot, while our new definition of cold identifies the important factor and generalizes it.

But why are subscription side effects the important factor?

Composition

LINQ operators enable the composition of observables into "queries".  Applying any operator to one or more observables means that it will subscribe to those observables when you subscribe to the query.

When defining a query, it's not uncommon to reference an observable multiple times, which means passing the same observable to different operators or to the same operator as multiple arguments.  Since we know that operators subscribe to the observables that you pass to them, referencing the same observable multiple times means that it will potentially have multiple subscriptions.

Some operators have the semantics of multiple subscriptions without multiple references.  For example, Retry only accepts a single observable and potentially subscribes to it multiple times.  In this case it's desirable to duplicate subscription side effects.  For example, if we pass to Retry an observable that makes a web request as its subscription side effect, then we want Retry to execute that subscription side effect every time it re-subscribes.  In other words, we want to retry the web request.

That's why subscription side effects are the important factor.  From the point of view of the Retry operator, we don't necessarily want to duplicate the observable's notifications (as indicated by our common sense definition of cold), we simply want to duplicate the subscription side effects, whatever they might be.  In other words, it's not about generating the notifications, it's about executing the side effects, which are often responsible for generating the notifications.

Sometimes an observable is referenced multiple times without the semantics of allowing duplicate subscription side effects.  For example, you could apply the Where operator twice to the same observable to create two different observables, then apply the Zip operator to merge them back into a single observable in a pairwise fashion.  The query's output semantics aren't important here; what's important is that you don't want to create two subscriptions to the original observable.  Instead, you want broadcast behavior.

Broadcasting is essentially the same thing as sharing subscription side effects among multiple observers.  In the previous example, imagine that the observable to which you are applying the Where operator is cold; e.g., each time that you subscribe it sends a web request to open a reactive socket connection to a server and then streams the responses as notifications.  The Zip operator is going to subscribe twice, but we don't want to send two web requests.  Our desire to broadcast the notifications is the same as our desire to broadcast the web request, because it's the web request (i.e., the subscription side effect) that is responsible for generating notifications in the first place.

And again, that's why subscription side effects are the important factor.  In the former case we wanted to duplicate them, while in the latter case we wanted to broadcast them.

Composition in Custom Operators

The previous cases are special because we had prior knowledge of an observable's temperature.  But what if we didn't?

Composite operators are queries themselves; however, they don't typically define contracts for the temperatures of their observable parameters.  In other words, when we define a custom operator, we must make assumptions as to the temperatures of observable parameters.

An obvious question is: Why can't we define temperature contracts on operators?

Well, technically you could, but then you'd be forcing callers in some cases to explicitly change the temperature of observables before calling your operator, which changes the declarative nature of some operators by bringing temperature into the foreground when perhaps it's unnecessary.

Furthermore, if a caller is unaware of the temperature, then an assumption has to be made anyway.  Unless the contract is propagated to a caller that can control the temperature - and now we've got a viral effect, as is common with contracts.

Alternatively, you'd probably want to have two overloads for every operator where temperature is of concern: one overload for cold observables, one for hot observables.  But how can you overload operators by temperature without defining new types?

Do we really need this complexity?  What are the disadvantages of making assumptions at the operator level?

If you were to assume that all observable parameters are hot, then it's fine if your operator relies on broadcast behavior, but it's wrong if your operator must duplicate subscription side effects; e.g., Retry.  Alternatively, if you were to assume that they are cold, then the reverse is true: It's fine if duplicating subscription side effects is what you want, but it's wrong if you need broadcast behavior.

Therefore, we need to consider the possibilities of conversion before we can determine which assumption is best in general.

Temperature Conversion

First, a quick note about language:

IObservable<T> is an immutable type; therefore, there's no way to mutate its temperature.  When I refer to "changing" an observable to a particular temperature or "making" an observable a particular temperature, I'm using those terms in the general sense of unary operator application.  For example:

var x = 10;
var y = -x;

You might say that the second line "negates x", but technically it doesn't because after the negation operation, x hasn't changed.  It's applying the negation operator to x and assigning the result to y.  The former is just a natural way of describing the essence of the operation.  Although it's ambiguous in this situation, it's not when dealing with IObservable<T> because it's immutable.  Therefore, "changing" an observable's temperature or "making" it have a different temperature simply refers to the application of some operator and the assignment of the result into a new variable, which holds a different observable with the desired temperature.

We can change a cold observable into a hot observable by applying the Publish operator, which has overloads that project the observable into a selector function as a hot observable (though the operator itself returns a cold observable) and overloads that return a connectable observable.  The latter defers subscription side effects until connection, effectively broadcasting them across multiple observers as connection side effects.

Publish makes a cold observable hot.4

Defer makes a hot observable cold.

The Defer operator allows us to add subscription side effects to a hot observable, thus making it cold.

Defer also allows us to convert invocation side effects into subscription side effects; e.g., given a function that returns a hot observable like Start or a C# 5.0 async method that returns a Task<T>, which you then convert into an observable with ToObservable, you can wrap either function in Defer to change its invocation side effects into subscription side effects, thus creating a cold observable from a hot function.

Alternatively, you can change the temperature of a hot observable by combining it in various ways with a cold observable.  For example, if you Concat a hot observable to a cold observable, then the result is a cold observable, by our new definition, because the subscription side effects of the cold observable are prepended onto the hot observable.  If you were to apply the Merge operator instead, then the result would also be a cold observable because, again, the new observable retains the cold observable's subscription side effects.  Remember, it's the subscription side effects that are important, which is why combining a hot observable with a cold observable in such a way that the subscription side effects of the cold observable are preserved yields a cold observable.

Notice, however, that converting from cold to hot simply adds broadcasting behavior, while converting from hot to cold relies on the addition of subscription side effects.  In other words, going from cold to hot is as simple as calling one operator, but going in reverse requires side effects to be added.

In the previous section, we left wondering which assumption is best for an observable with an unknown temperature: cold or hot.  Perhaps we can answer that now by examining the consequences of making wrong assumptions with respect to temperature conversions.

When we need a hot observable, assuming that it's already hot and being wrong means that subscription side effects are duplicated, which breaks the semantics of our query; therefore, let's assume that every observable is cold and apply Publish.  When we're wrong, we'll end up applying broadcast behavior to a hot observable, which would simply have no effect.  That's 1 point for assuming that observables are cold, because when we're wrong, applying Publish does no real harm.5

When we need a cold observable, assuming that it's already cold and being wrong means that subscription side effects won't be duplicated, which breaks the semantics of our query; therefore, we must assume that every observable is hot and... do what?  Remember, the semantics of our query aren't to introduce new subscription side effects, we want to duplicate the subscription side effects of the observable that was handed to us.  We can't make a hot observable cold without introducing subscription side effects; therefore, all we can do is assume that the observable is already cold and hope for the best!  That's 2 points for cold, 0 points for hot.

In summary:

We must assume that any observable with an unknown temperature is cold.

In other words, it's safest to assume that any given IObservable<T> may cause subscription side effects because it's easy to broadcast subscription side effects when necessary, but once broadcast it's impossible to reverse it.  There's no way to force a broadcasting observable's Subscribe method to cause subscription side effects that were already broadcast.

Note that there is a way to reverse the broadcast behavior of an IConnectableObservable<T>: simply dispose of the object that was returned by the Connect method.  In fact, that's exactly how RefCount works.  But this can't be done with any IObservable<T>.  Even trying to cast it to IConnectableObservable<T> won't help because casting doesn't give you a reference to the disposable that was returned by Connect.

Given that it's safest to assume that any unknown IObservable<T> is cold, it follows that sometimes we're going to be wrong.  We should probably relax our previous definition of cold to account for that fact.

Here are our latest definitions: 

Hot observables do not cause subscription side effects.

Cold observables may cause subscription side effects.

That should cover all possibilities.

Classification of Subscription Side Effects

A subscription side effect can be classified as any of the following.6

  • Synchronous
    • Notification
      E.g., directly calling OnNext, OnError or OnCompleted.
      (Technically, notifications are always Asynchronous in Rx due to its implicit use of the CurrentThreadScheduler to schedule subscriptions, though we'll ignore that fact and define Synchronous in relation to the outer-most subscription only, where synchronously calling OnNext, OnError or OnCompleted actually blocks Subscribe from returning.)
    • Out-of-band
      E.g., directly assigning a field, creating a large object in memory, executing a CPU-intensive computation or (synchronously) deleting a file on disc.
  • Asynchronous
    • Notification
      E.g., using an async mechanism, such as Task or IScheduler, to schedule calls to OnNext, OnError or OnCompleted.
    • Out-of-band
      E.g., making a web request or reading a file on disc.  Often these kinds of operations are used for Asynchronous Notification as well.
  • Composition
    Subscribing to another cold observable that causes its own subscription side effects, either synchronously or asynchronously.
  • Computation
    Executing costly or side-effecting code for each notification; e.g., a costly projection via Select or imperative side effects via Do.

Perhaps these classifications could be useful when documenting operators.

For example, the documentation for the Range overload without an IScheduler parameter could state that it returns an observable that causes Synchronous Notification subscription side effects.  The overload with an IScheduler parameter may cause Synchronous or Asynchronous Notification subscription side effects, depending upon the specified IScheduler; this is similar for all operators that are overloaded with an IScheduler parameter.

The documentation for the Interval overload without an IScheduler parameter could state that it returns an observable that causes Asynchronous Notification subscription side effects.

The documentation for the Defer operator could state that it returns an observable that may cause Synchronous Out-of-band subscription side effects, depending upon the specified function.  It could also state that it may cause Composition subscription side effects, which basically means that it inherits the subscription side effects of the observable that is returned by the specified function.

The documentation for the Select operator could state that it returns an observable that may cause Composition subscription side effects, which basically means that it inherits the subscription side effects of the source.  It could also state that it may cause significant Computation side effects, depending upon the specified selector function.

Most of these terms should be self-explanatory by now; however, Computation is particularly interesting since I haven't mentioned it yet.  I consider a computation subscription side effect to be:

Any effect beyond an observable's subscription and notification mechanisms that occurs for each notification.

The notification mechanism is the minimally required effects for notifying; i.e., directly calling OnNext, OnError or OnCompleted are the primary effects of notification.  Anything else is a computational side effect.

Although this kind of side effect is part of an observable's computation, it won't occur until subscription.  Subscribe is essentially the beginning of the computation, thus subscription side effects are technically computation side effects.  But since subscription side effects are the focus of temperature, that's why I refer to computation side effects as a kind of subscription side effect, in the sense that computation side effects don't occur until an observer subscribes.

Computation side effects are part of the definition of cold observables, though it basically means that applying any computational or side-effecting operator to a hot observable converts it into a cold observable; e.g., applying Select to a hot observable means that all observers will receive different notifications, generated specifically for them, which is the common sense definition of cold.  That's not a very useful definition of cold, so I propose to separate the notion of computation side effects from subscription side effects and to exclude them from the new definition of cold, introducing new terms: active and passive.  This separation allows a hot observable to be referred to as hot even if it causes computation side effects.  And you can still apply Publish to broadcast the computation side effects similar to how you'd publish a cold observable to make it hot.

Passive observables do not cause any significant computation side effects.

Active observables do cause significant computation side effects.  Every notification may be computed with some relative cost.

Select and Do are common examples of operators that return active observables.

These terms are context-sensitive, as indicated by use of the word "significant".  An observable that is passive in one usage can be considered active in another.  For example, a simple projection over an observable of integers, such as adding 5 to each value, probably isn't costly enough in terms of memory or performance to care if its computation side effects are duplicated.  In contrast, a projection that calculates the factorial of each value might be costly enough in certain scenarios to consider publishing.

Conclusion

Temperature indicates the propensity of an observable to cause subscription side effects.

A subscription side effect is:

Any effect beyond an observable's subscription mechanism.

Subscription mechanisms generally consist of storing an observer in a list or subscribing to other observables, and returning a disposable for unsubscription.

Examples of subscription side effects include:  Calling Schedule, OnNext, OnError, OnCompletedGetEnumerator or MoveNext, mutating a field, creating an object in memory, running a CPU-intensive computation, sending a web request, reading a file, ending a process, formatting your C drive, or really anything else that you can think of that isn't merely the subscription mechanism.

Hot observables do not cause subscription side effects.

Cold observables do cause subscription side effects; however, we must assume that any observable with an unknown temperature is cold, and sometimes that assumption will be wrong; therefore, a more accurate definition is:

Cold observables may cause subscription side effects.

Again, we must assume that any observable with an unknown temperature is cold.

In the simplest case, an observable may be permanently hot or permanently cold, though some observables change their temperature over time so that different observers may get different temperatures; therefore:

The temperature of an observable is relative to the time of subscription.

Temperature is also relative to the kind of subscription side effect; however:

In general, the terms hot and cold cover all kinds of subscription side effects, including notifications.

Replay, PublishLast, and Publish overloads with an initialValue parameter return observables that, once connected, are cold for observers that have missed notifications yet hot with respect to any other kind of subscription side effect.

We must also consider an observable's computation side effects.  A computation side effect is:

Any effect beyond an observable's subscription and notification mechanisms that occurs for each notification.

Technically subscription side effects are initial computation side effects, although I've separated these concepts so that hot and cold only refer to subscription side effects, thus ensuring that hot is a more useful term.  As a result, the phrase computation side effects can be applied to either hot or cold observables.  I've assigned the following terms:

Passive observables do not cause any significant computation side effects.

Active observables do cause significant computation side effects.

The significance of computation side effects on performance is relative to the operation, query or application.  What is significant in one context may be insignificant in another.

Now I can finally answer the original questions from the top of the post with absolute precision and certainty.

What really makes an observable hot or cold?

Short answer:

An observable's temperature is its propensity to cause subscription side effects.

Long answer:

Hot observables do not cause subscription side effects.

Cold observables may cause subscription side effects.

In addition, there are two related concepts that may be applied to both cold and hot observables:

Passive observables do not cause any significant computation side effects.

Active observables do cause significant computation side effects.

Why should I care?

You need to be aware of whether subscribing to an observable multiple times is going to duplicate side effects; i.e., cold behavior.

When defining a query that references the same observable multiple times, you'll typically want to broadcast notifications among the operators to ensure that they observe the same exact notifications.  In other words, you want a hot observable.  A cold observable may generate different notifications for each observer.  You'll also want to broadcast other subscription side effects; e.g., given a cold observable that makes a web request each time an observer subscribes, you don't want two operators in the same query to make two separate web requests.

Alternatively, sometimes you want duplicate subscription side effects in your queries; e.g., given a cold observable that makes a web request each time an observer subscribes, applying the Retry operator means that you want to keep making web requests until one succeeds.  The very purpose of the Retry operator is to duplicate subscription side effects.  It's perhaps a fairly common mistake to pass a hot observable to Retry and then wonder why Fiddler isn't showing a second web request after the first attempt failed.

Furthermore, you must consider computation side effects to determine whether they may have a noticeable performance impact on your query.  If they do, then you'll probably want to broadcast them as well.

How can I tell if an observable is hot or cold?

There's no way to be sure from an observer's point of view.  Subscription side effects might be directly observable or they might not.

Furthermore, subscription side effects that generate notifications might be indistinguishable from the notifications that would be generated if you assumed that the observable is hot.

The only way to know for sure is to look at the documentation source code.  :)

When in doubt, it's safest to assume that an unknown observable is cold.

Temperature by Operator

(You may want to review Rx operators by category, although it's somewhat outdated as of Rx 2.0).

All of the Rx generator methods return observables that are of course cold by definition: Return, Range, Timer, IntervalGenerate and Create.

There are 4 additional generators that are kind of edge cases: Empty and Throw are cold; Start is cold too, but we'll get back to it later; Never is technically hot, but it doesn't really matter.

In general, it's safe to assume that Rx's built-in conversion operators for .NET events return hot observables; e.g., FromEvent and FromEventPattern.  In reality, there's no guarantee.  For example, you can define a custom .NET event and implement an add method that synchronously invokes each handler that it receives.  That event would convert into a cold observable!  Luckily, events generally don't do strange things like that, so it's safe to assume that they are hot.

You can be sure that Replay, Publish and PublishLast overloads that return IConnectableObservable<T> are hot until you call Connect, though after Connect it's more complicated.  Publish overloads without an initialValue parameter are always hotReplay, PublishLast, and Publish overloads with an initialValue parameter, become cold when Connect is called since they introduce a race condition.

Each of those operators also have overloads that accept a selector function.  The selector overloads return cold observables, though the observable passed to the selector function is similar to above.  When the selector is called, the observable passed in is hot, but when the selector returns, the observable passed in is automatically connected and starts to behave as described previously.

FromAsyncPattern and ToAsync return functions that return observables similar to PublishLast, but connected when the function is invoked.  It's safest to assume that the observable is cold, just like PublishLast.  You can think of the function as a parameterized connectable since it acts like the Connect method of IConnectableObservable<T>.

Start is similar to ToAsync and PublishLast, though it's not connectable, thus it's safest to assume that it returns a cold observable.

Task and Task<T> are themselves hot, although the ToObservable conversion returns an observable similar to PublishLast, thus it's safest to assume that ToObservable returns a cold observable.

Defer returns a cold observable, though actually it depends on whether it causes subscription side effects or not.  Typically the only reason that you wouldn't know whether it causes subscription side effects is when you use it to defer an unknown function that returns an observable.  The unknown function might not cause any invocation side effects and may return an observable that doesn't cause any subscription side effects, thus Defer may simply be passing through a hot observable.  That behavior indicates either an edge case or improper usage of the operator.

StartWith, Sample and Using return cold observables.  They all cause subscription side effects.

All of the remaining operators inherit temperatures from their source observables.  Well, maybe not all remaining operators – I may have missed a couple when I reviewed them for this post ;)

Select, Do, Aggregate and Scan may cause significant computation side effects.  (See above for a description of computation side effects.)

Technically, any operator with a delegate parameter that's invoked for some or all notifications may cause significant computation side effects.  There are many more operators like this than what I've listed above.

Remotable causes significant computation side effects, assuming that you consider transmitting notifications through the .NET remoting infrastructure to be significant in terms of its performance implications.

How can I change the temperature of an observable?

Subscription side effects:

Simple Conversions

Publish is useful for broadcasting subscription side effects.  It makes a cold observable hot.

Defer is useful for adding subscription side effects and for converting invocation side effects into subscription side effects.  It makes a hot observable or observable-returning function cold.

Operators such as Merge and Concat, among many other combining/zipping/joining operators, can add subscription side effects to hot observables by combining them with cold observables, thus returning cold observables.

Complex Conversions

Using a Subject as an observer makes a cold observable hot, though it's not a good reason to use subjects.  See my previous post on subjects for details.

Replay, PublishLast, and Publish overloads with an initialValue parameter, are useful for broadcasting subscription side effects while also replaying notifications to observers that missed them.  These operators make cold observables hot in terms of all subscription side effects except for missed notifications; therefore, they remain cold in regard to missed notifications.

Computation side effects:

Publish is useful for broadcasting computation side effects.  Whether or not you want to broadcast them depends upon their significance with respect to resource usage and/or performance and how they compare to your operation, query or application.

Operators such as Select and Do, among many others, are useful for adding computation side effects.

 


  1. Not in a mathematical or academically formal sense.  Posting the idea to my blog allows me to focus on rationalizing it and elicits proper feedback.  Though if anybody can explain it in a mathematically formal sense, then by all means please describe it in the comments.
  2. Serial behavior is described in §§4.2, 6.7 of the Rx Design Guidelines document.  Note that FromEventPattern does not enforce this contract, so it's either the raiser's responsibility to ensure that it does not raise the event concurrently or it's an observer's responsibility to apply the Synchronize operator.
  3. Obviously not in any kind of official capacity.  I just need a way to reference these new concepts now.  I'd be happy to adopt better or formal naming conventions if they already exist or will emerge in the future.
  4. Publish, PublishLast and Replay also have convenient overloads accepting a selector function that acts as a scoped query in which the original observable is hot, thus allowing callers to avoid having to deal with connection themselves.  The temperature of an observable returned by these overloads is cold because it publishes and connects as a subscription side effect.
  5. Perhaps it would be nice if Rx's implementation of Publish could somehow detect whether an observable is either Subject<T> or it's hidden by the AsObservable operator, in order to avoid introducing yet another subject into the query unnecessarily.  But that would be an internal optimization and it doesn't change the fact that publishing a hot observable probably has no noticeable performance impact in most programs.
  6. I've made all of these terms up myself.  I'll gladly replace them with formal terms if someone provides them to me in the comments.

Tags:

.NET | Rx

June 22, 2013

To Use Subject Or Not To Use Subject?

That is the question!  There appears to be some confusion on the web about whether or not Subject<T> should be used, ever.  Tis a question oft asked by developers new to Rx after reading expert recommendations to avoid subjects, due to common misuse, yet subjects seem to persist an air of mystery even for seasoned reactive developers.

First, a primer on Subjects.  Take a moment to read that.

Done?  Great.  Let's continue...

As you may already know, Erik Meijer doesn't like subjects.  End of discussion, right?  Wrong!  Erik doesn't like subjects to be used in place of purely functional alternatives.1

As Erik puts it:

[Subjects] are the "mutable variables" of the Rx world and in most cases you do not need them.

So let's enumerate the cases where we should and shouldn't use subjects.

We'll start by examining two interesting properties of subjects:

  1. They implement IObservable<T> and IObserver<T> simultaneously, which makes them suitable for both output and input, respectively.
  2. They are hot(See also: Hot and Cold Observables)

Thus to answer the question of whether we should use a subject when defining an observable (output) we must consider:

  1. the input; i.e., the source of the OnNext, OnCompleted and OnError notifications.
  2. the desired temperature of the generated observable; e.g., hot or cold.

To determine whether or not to use a subject, start by asking yourself the following questions:

  1. what is the source of the notifications?  I.e., where do the values of T come from?  Who is responsible for calling OnNext?
  2. must observers share subscription side-effects or not?  I.e., does calling Subscribe execute some kind of initialization code each time it's called or does initialization occur once and its state is shared by every call to Subscribe?

Let's answer all possible combinations of these 2 questions.  I'll provide more details and examples later in this post to explain why these answers make sense.

Question 1:  What is the source of the notifications?

The source can be one of two kinds: Let's call them external and local.  An external source is any observable or event that already exists outside of your code.  A local source is when you generate an observable from your code, kind of like defining and raising a custom .NET event.  More details on these distinctions later in this post.

  1. If the source is external, then do not use a subject.
  2. If the source is local, then do use a subject.

Question 2:  Must observers share subscription side-effects or not?

  1. If not, then do not use a subject.  (Define a cold observable.)
  2. If yes, then do use a subject.  (Define a hot observable.)

Let's enumerate all possible combinations: 

  1. The source is external and I want a cold observable.
  2. The source is external and I want a hot observable.
  3. The source is local and I want a cold observable.
  4. The source is local and I want a hot observable. 

As you may have noticed, there are conflicts.

  • What if you have an external source and you do want to share subscription side-effects?
  • What if you have a local source and you do not want to share subscription side-effects?
  • What if the temperature of an external source differs from the desired temperature?

I believe these conflicts cause additional confusion among developers.  They seem to blur the line when it's appropriate or inappropriate to use subjects, though in fact the line is actually quite clear, as we'll soon see.

Let's resolve these conflicts now.

The source is external and I want a cold observable.

  1. If the source is cold, then use it as is.  You can also wrap it with Defer to add subscription side-effects.
  2. If the source is hot, then wrap it with Defer to add subscription side-effects.

The source is external and I want a hot observable.

  1. If the source is cold, then Publish it.  Alternatively, you can use PublishLast or Replay, if you need their additional behavior.
  2. If the source is hot, then use it as is.

The source is local and I want a cold observable.

  1. Define your observable with existing Rx generator methods; e.g., Return, Range, Timer, Interval, Generate or Create.

The source is local and I want a hot observable.

  1. This scenario is a potential candidate for defining your observable as Subject<T>, or even BehaviorSubject, AsyncSubject or ReplaySubject, if you need their behavior.  More details on why it's merely a "potential candidate" later in this post.

Note that Publish, PublishLast and Replay use subjects internally (via Multicast).  This isn't a coincidence.  Subscribing a subject to a cold observable broadcasts its notifications to multiple observers, thus making it hot.

So it's clear there are only two scenarios where it's correct to use subjects:

  1. The source is external and cold, and I want a hot observable.
  2. The source is local and I want a hot observable.

Let's simplify those statements.  The two scenarios where it's correct to use subjects are:

  1. Converting a cold observable into a hot observable.
  2. Generating a hot observable imperatively and statefully, without any direct external source.

Although, only in the latter scenario is it correct to use subjects explicitly.  As mentioned previously, Rx defines various operators like Publish for use in the former scenario.  Thus, only 1 scenario remains.

And so now we've answered our original questions:

When should I use a subject?  To generate a hot observable imperatively and statefully, without any direct external source.
When shouldn't I use a subject?  Everywhere else.

Perhaps that answer is a bit vague.  What do I mean by imperative and stateful?  What about local vs. external?  And why did I mention before that our final scenario is only a "potential candidate" for using a subject?

If (true == interested_in_the_answers_to_these_questions) { continue; } else { goto Conclusion; }

External Sources

Let's redefine external as quite simply a source of notifications that you do not have to generate yourself.  It comes from outside the scope of the observable that you're defining, such as a dependent object or a parameter.  It could be any number of things; e.g., an event on another object, an event on a base class, a timer callback, a Task<T> that represents the asynchronous result of reading from a file or sending a web request, an IEnumerable<T>, etc.

All of these examples have something in common: they can easily be converted into IObservable<T> using various From* and To* operators in Rx.  For example, Task<T> is converted by the ToObservable operator.  So if your external source is not already an IObservable<T>, then simply covert it using one of Rx's existing conversion operators.

Once you have an IObservable<T>, you need to determine whether it's hot or cold and whether you need it to be hot or cold.  If there's a conflict, as defined above, then use either Defer or Publish to change the temperature accordingly.  For this, there's no need to use subjects in your code.2

Finally, use LINQ to query the observable.  Again, you shouldn't use subjects here either.

The result is that you'll have a succinct observable query, controlling side-effects and meeting your specification in a declarative manner.  Subjects are simply the wrong tool for this job.  It's like hammering in a nail with a sledgehammer.  Subjects do much more than what is needed.  (Though as you'll see later in this post, in some ways subjects actually do less than what you may think, potentially introducing nasty threading bugs.)

Local / Imperative / Stateful Sources

I'm defining local as a source where you imperatively push notifications toward the outside of a stateful scope.  Another way of looking at it: a local source is external to other scopes that can't access its implementation.

Sorry if that explanation is confusing.  Let's clear it up.  Consider the following abstract requirements:

  1. You must define a hot observable.
  2. The code that generates the observable's notifications must be encapsulated by some type that you define.
  3. Your code generates notifications when callers invoke certain members of your type or when it performs certain operations.
  4. The observable must be exposed as a public property.

Now let's apply these requirements to what we've already learned earlier: 

  1. Since we're defining a hot observable, Subject<T> is certainly a candidate...
  2. We've established a scope for what is external: any source defined by another type and referenced by ours is external to our type.
  3. We must write code to push notifications (input); therefore, our source is local.  We have no external source to convert or to generate notifications for us.
  4. Our code must be able to reference something in order to push notifications, thus our type must be stateful.  We'll store a reference to "something" in a field.
  5. The field will be returned by a public property (output).
  6. Thus we need a type that represents both input and output, and is hot.

How should we model these requirements?

Scroll down for the answer:
.
.
.
.
.
.
.
.

Answer: Define a .NET event and expose it as an observable public property using FromEventPattern!

Ha, I bet you didn't see that one coming, did you?

Yes, alternatively we could use Subject<T>.  The conditions are perfect.  If we don't actually need or want a .NET event, then we should use a subject; however, if we already have an event defined, then there's no need to duplicate state and "raise" two similar events.  The conversion approach is probably better in that case.

Let's compare the similarity of subjects to .NET events in general:

  1. Events are also hot.
  2. An event's delegate shares the same scope (encapsulation) within our type as our proposed observable.
  3. We also write code to raise events (input).
  4. Our code must be able to reference the delegate in order to raise events, thus our type must be stateful.  We store a reference to the delegate in a field.
  5. The field will be returned by a public event (output).
  6. A multicast delegate represents both input and output, and is hot.

Subject<T> is kind of like Rx's implementation of an observable "event".  It broadcasts notifications to multiple observers, like raising an event for multiple event handlers.  It is the stateful component of Rx as it can be stored in a field or a variable and mutated (invoked) imperatively.  It is hot, so subscribing does not cause any side-effects (and subscribers may have already missed notifications, which is a common symptom of events and hot observables alike.)

It's correct to use Subject<T> to replace local .NET events, though .NET events are still useful sometimes due to their metadata and first-class language/tooling support.  It all depends on context, so consider this carefully before replacing all of your .NET events with subjects.  You may want to examine each event on a case-by-case basis.  And keep in mind that you can always expose an event as an observable via FromEventPattern, or perhaps it's best to leave that decision up to the consumers of your class and simply expose a .NET event.

Subjects actually have an additional related use case that events cannot cover: If we shrink our scope from type to method and apply the same logic as before, then we'll discover that subjects may still apply though events do not.

Let's specify similar method-scoped requirements:

  1. You must define a hot observable.
  2. The code that generates the observable's notifications must be within a method that you define.
  3. Your code generates notifications when it performs certain asynchronous operations within the method.
  4. The observable must be exposed as the return value of the method.

And again, let's apply these requirements to what we've already learned earlier: 

  1. Since we're defining a hot observable, Subject<T> is certainly a candidate...
  2. We've established a scope for what is external: any source passed into our method and any fields on the method's defining type are external to our method.
  3. We must write code to push notifications (input); therefore, our source is local.  We have no external source to convert or to generate notifications for us.
  4. Our code must be able to reference something in order to push notifications, thus our method must be stateful.  We'll store a reference to "something" in a local variable.
  5. The variable will be returned from the method (output).
  6. Thus we need a type that represents both input and output, and is hot.

How should we model these requirements?

Scroll down for the answer:
.
.
.
.
.
.
.
.

Answer 1: Write an anonymous event (a.k.a., anonymous method, delegate or lambda expression) and convert it using FromEvent!

Ha, did I trick you again?

You thought for sure this time I'd recommend using Subject<T>.  You'd also be correct, we could use Subject<T>.  The conditions may be perfect, though as you'll see in a moment there are better alternatives, even better than this one.

Alright, that answer was actually a joke, I don't recommend that particular approach at all.  It was meant to get you thinking about alternatives to subjects in places where you may think they're absolutely necessary.

But out of curiosity, how would that have worked anyway?

An event is backed by a delegate; it's the delegate that has all of those important properties (e.g., input, output, hot) that we discussed earlier.  For example, we could start by declaring a delegate as our observer.  By assigning the delegate to a local variable we can invoke it imperatively like a method, which is similar to calling OnNext on an observer.  We can use Task.Factory.StartNew to execute an async computation that invokes the delegate.  Finally, we can convert our delegate into an observable via the FromEvent method, which gives us our return value.

NOTE:  I don't recommend this approach, as mentioned previously.  For one thing, it doesn't support OnError or OnCompleted.  It doesn't support cancellation either, but given that we're generating a hot observable perhaps it doesn't really matter.  It's also a bit strange anyway.

using System;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace Rx.Labs
{
    static class ToUseSubjectOrNotToUseSubject
    {
        public static void Main()
        {
            IObservable<string> xs = Generate();

            using (xs.Take(1).Subscribe(Console.WriteLine))
            using (xs.Take(2).Subscribe(Console.WriteLine))
            using (xs.Take(3).Subscribe(Console.WriteLine))
            using (xs.Take(4).Subscribe(Console.WriteLine))
            {
                Console.ReadLine();
            }
        }

        static IObservable<string> Generate()
        {
            Action<string> observer = _ => { };    /* We could use null, but then at every invocation 
                                                    * we'd have to copy to a local and check for null.
                                                    */

            Task.Factory.StartNew(() =>
                {
                    Console.WriteLine("Invocation side-effect.");

                    Task.Delay(TimeSpan.FromSeconds(1)).Wait();  // Simulate work

                    observer("Generating");
                    observer("a");
                    observer("hot");
                    observer("observable.");
                });

            return Observable.FromEvent<string>(
                eh => observer += eh,
                eh => observer -= eh);
        }
    }
}

Output:

Invocation side-effect.
Generating
Generating
Generating
Generating
a
a
a
hot
hot
observable.

Answer 2 (Recommended):  Define an observable using an existing Rx generator method, then apply Publish with RefCount.

Rx provides various generators that make it easy to create an observable from thin air; e.g., Return, Range, Timer, Interval, Generate, and also Create.  If none of the former generators apply, then fallback to CreateCreate is the Swiss Army knife of observable generation.  I suspect that it also has similar performance characteristics to Subject<T>.  (See the Thread Safety section below for details.)

A particularly useful approach is to write an async iterator using a special overload of Create:

Rx 2.0 (Preferred):    Create<TResult>(Func<IObserver<TResult>, Task>)
Rx 1.1 Experimental:   Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>>)

Generators are better than subjects because they allow you to remain functional, though Create does seem quite similar to using a subject.  Create lets you take more of a declarative approach, which is hopefully easier to understand and maintain than code that uses a subject.  Note that a primary benefit of Create is that it supports cancelation and auto-detachment, though it's not very useful when you need a hot observable.

Generators aren't the whole story either.  They return a cold observable, but our specification was to return a hot observable.

That's where Publish is useful, as described earlier in this post.  Publish converts a cold observable into a hot observable; however, it returns IConnectableObservable<T>, which requires a call to Connect.  You can return IConnectableObservable<T> from your method, just make sure that you change the return type of your method to match so that callers have the opportunity to call Connect themselves.

Alternatively, you can call Publish then RefCount to get an IObservable<T> that is shared among multiple consecutive observers.  Note that this isn't truly a hot observable - it's more like a warm observable.  RefCount makes a single subscription to the underlying observable while there's at least one observer of your query.  When your query has no more observers, changing the reference count to 0, the underlying subscription is disposed.  If another observer subscribes to your query later, moving the reference count from 0 to 1 again, then RefCount makes a new subscription to the underlying observable, causing subscription side-effects to occur again.

Note that it's bad practice to call Connect yourself in this scenario.  If you're tempted to do so, perhaps because RefCount doesn't meet your needs, then you should strongly consider returning a cold observable instead and leave it up to callers to decide whether Publish should be used at all.

Also beware to avoid closures when using Create.3  Closing over a local variable that you intend to mutate may cause unexpected behavior since every observer will share the same variable.  This may seem fine when generating a hot observable, and perhaps it is, though RefCount generates a warm observable, as mentioned previously.

Here's a correct example that uses Create, Publish and RefCount instead of a subject

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace Rx.Labs
{
    static class ToUseSubjectOrNotToUseSubject
    {
        public static void Main()
        {
            IObservable<string> xs = Generate();

            using (xs.Take(1).Subscribe(Console.WriteLine))
            using (xs.Take(2).Subscribe(Console.WriteLine))
            using (xs.Take(3).Subscribe(Console.WriteLine))
            using (xs.Take(4).Subscribe(Console.WriteLine))
            {
                Console.ReadLine();
            }
        }

        static IObservable<string> Generate()
        {
            return Observable.Create<string>(observer =>
                TaskPoolScheduler.Default.Schedule(() =>
                {
                    Console.WriteLine("Subscription side-effect.");

                    Task.Delay(TimeSpan.FromSeconds(1)).Wait();  // Simulate work

                    observer.OnNext("Generating");
                    observer.OnNext("a");
                    observer.OnNext("hot");
                    observer.OnNext("observable.");

                    observer.OnCompleted();
                }))
                .Publish()
                .RefCount();
        }
    }
}

Output:

Subscription side-effect.
Generating
Generating
Generating
Generating
a
a
a
hot
hot
observable.

Thread Safety

Subject<T>, Observable.Create and ObserverBase<T> do not make the same guarantees that are made by Rx generator methods, for performance reasons.  They allow callers to violate some of Rx's contracts for IObserver<T>, though they do ensure some contracts automatically regardless.

The contracts to which I'm referring are defined in the Rx Design Guidelines document.

Section 4.2 basically states that notifications must be pushed serially for Rx operators to behave properly.  But more generally, this should be a safe assumption everywhere.  When you subscribe to an observable, you don't expect to observe concurrent notifications.  Imagine coding subscriptions against this assumption but actually receiving concurrent notifications at runtime.  Take a look at some of your existing code.  What could happen?  For example, if you subscribe on a UI thread then you may get a threading context exception.  If you subscribe on some other thread instead, then maybe you'll get some strange threading bugs or maybe you'll get an exception, if you're lucky.

All Rx generator methods, excluding Create, ensure that this contract is satisfied.  Subject<T> and ObserverBase<T> do not.  For instance, calling OnNext concurrently on a Subject<T> will generate concurrent notifications.4

Section 4.1 states the Rx grammar as follows:

OnNext* (OnCompleted | OnError)?

All Rx operators and generator methods ensure that this contract is satisfied.  ObservableBase<T>, ObserverBase<T>, Create and Subject<T> actually ensure this contract as well, but only for serial notifications.  If a caller doesn't satisfy the §4.2 contract, then it seems that these APIs do not automatically ensure the §4.1 contract is satisfied either, due to lock-free race conditions.  For instance, it seems to me that calling a subject's OnNext and OnError methods simultaneously on different threads may cause an observer to receive OnError followed by OnNext.  But if you call OnError followed by OnNext on the same thread, then the call to OnNext is correctly ignored.

Rx offers Synchronize to satisfy these contracts for a given subject.  It offers this Synchronize method to do the same for observers, in general.  And of course, just because you can violate the contracts doesn't mean that you necessarily will.  But you do have to be aware of them when using these APIs, especially considering the kinds of bugs that may arise.  Threading bugs originating from within Rx operators, due to the misuse of a subject somewhere within a query, can be very difficult to diagnose.

So when you find yourself invoking OnNext, OnError or OnCompleted on a subject or within Create, make sure that you're satisfying Rx's contracts.

The takeaway is that when a subject isn't necessary due to the presence of an external source or an appropriate generator method, it's best not to use a subject in the first place so you can avoid having to invoke it imperatively and risk introducing subtle threading bugs in your queries.

Correct Examples

As shown in this post, there's only 1 particular set of conditions in which it's absolutely appropriate to use subjects.  Here are some examples.

Reactive Property

A common example of when it's correct to use a subject is to represent a sequence of property changes for a particular property, exposing it with AsObservable to hide its identity.  It's similar to implementing the PropertyChanged event, but only raising it for a particular property.  Instead of raising an event from within the property's setter, you'd call subject.OnNext(value).

Subject<T> works fine, though more commonly BehaviorSubject<T> is used instead because it stores the latest value of the property and pushes it immediately to new observers.  If your program is highly reactive, then you may find that you don't even need to keep a backing field for the property since BehaviorSubject<T> encapsulates it.  To set the value, call subject.OnNext(value).  Unfortunately, there's no way to extract the value imperatively (at least not until Rx exposes a Current property), so you may find that a backing field is still necessary unless your program is highly reactive.  Subscribing to the subject is the natural way of extracting the current value and all subsequent changes.

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace Rx.Labs
{
    class SubjectAsProperty
    {
        public static void Main()
        {
            var foo = new Foo();

            Console.WriteLine("Normal property usage:");

            Console.WriteLine(foo.Number);
            foo.Number += 8;
            Console.WriteLine(foo.Number);

            Console.WriteLine("Reactive property usage:");

            using (foo.NumberObservable.Subscribe(n => Console.WriteLine("Observer A: {0}", n)))
            {
                foo.Number = 1;
                foo.Number = 2;

                using (foo.NumberObservable.Subscribe(n => Console.WriteLine("Observer B: {0}", n)))
                {
                    foo.Number = 3;
                }

                using (foo.NumberObservable.Subscribe(n => Console.WriteLine("Observer C: {0}", n)))
                {
                    foo.Number = 4;
                }
            }

            Console.WriteLine(foo.Number);
            Console.ReadKey();
        }
    }

    class Foo
    {
        public int Number
        {
            get
            {
                return number.FirstAsync().Wait();
            }
            set
            {
                number.OnNext(value);
            }
        }

        public IObservable<int> NumberObservable
        {
            get
            {
                return number.AsObservable();
            }
        }

        private readonly BehaviorSubject<int> number = new BehaviorSubject<int>(42);
    }
}

Output:

Normal property usage:
42
50
Reactive property usage:
Observer A: 50
Observer A: 1
Observer A: 2
Observer B: 2
Observer A: 3
Observer B: 3
Observer C: 3
Observer A: 4
Observer C: 4
4

Cross-Cutting APIs

It's common to use a subject to represent cross-cutting concerns in a reactive manner.  For example, imagine exposing a property on a purely memory-bound logging class, returning an observable that pushes a notification whenever something is logged, such as entering and exiting methods (possibly via AOP).  This could be useful for real-time performance analysis, reactive heuristic-based adjustments, post-mortem debugging and even real-time debugging.  You could have one observer writing notifications to disk and one writing them to a GUI console, while another adjusts algorithms at runtime based on performance heuristics.  ReplaySubject<T> could be used to buffer notifications in memory and replay them to new observers, if necessary.

Another example would be a sequence of alerts exposed as an observable property on an Alert class.  Various parts of the program could send alerts concurrently by calling Alert.Unimportant(...) or Alert.RequiresUserInteraction(...), for example.  One observer could send emails while another displays toast notifications.

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace Rx.Labs
{
    class AlertLab
    {
        public static void Main()
        {
            var toast = from alert in Alert.All.OfType<ToastAlert>()
                        select alert.Message;

            var email = Alert.All.OfType<EmailAlert>();

            using (toast.Subscribe(a => Console.WriteLine("Important: {0}", a)))
            using (email.Subscribe(a => Console.WriteLine("Email \"{0}\" with \"{1}\".", a.To, a.Message)))
            {
                new ToastAlert("Hello Console!").Send();
                new EmailAlert("Hello Fred!", "fred@example.com").Send();
                new ToastAlert("Goodbye!").Send();
            }
        }
    }

    public abstract class Alert
    {
        public static IObservable<Alert> All
        {
            get
            {
                return alerts.AsObservable();
            }
        }

        public string Message
        {
            get
            {
                return message;
            }
        }

        private static readonly ISubject<Alert, Alert> alerts = Subject.Synchronize(new Subject<Alert>());
        private readonly string message;

        protected Alert(string message)
        {
            this.message = message;
        }

        public void Send()
        {
            alerts.OnNext(this);
        }
    }

    public class ToastAlert : Alert
    {
        public ToastAlert(string message)
            : base(message)
        {
        }
    }

    public class EmailAlert : Alert
    {
        public string To { get; private set; }

        public EmailAlert(string message, string to)
            : base(message)
        {
            this.To = to;
        }
    }
}

Output:

Important: Hello Console!
Email "fred@example.com" with "Hello Fred!".
Important: Goodbye!

Conclusion

If you've got anything that can be converted into an observable, such as a Task or Task<T>, an event, or an enumerable, then convert it using a From* or To* operator such as these or this, or if you've already got an observable, then use it.  Now consider temperature conflicts, change the temperature if necessary via Defer or Publish, and finally query it.  Don't use subjects.

Else if you don't have an observable or anything that can be converted into one:

If you require a cold observable, then use an existing Rx generator method; e.g., Return, Range, Timer, Interval, Generate or Create.  Don't use subjects.

Else if you require a hot observable:

If the scope of your observable is a method (i.e., it's entirely encapsulated by a method), then use an existing Rx generator method.  You can also apply Publish and RefCount, but consider relaxing the conditions to simply return a cold observable.  Don't use subjects.

Else if the scope of your observable is a type (e.g., it's exposed as a public property and backed by a field):

If you need to define a similar event or a similar event already exists, then convert the event into an observable like the first case.  Don't use subjects.

Else if you don't need to define a similar event and no similar event already exists, then use a subject.

In summary:

When should I use a subject?

When all of the following are true:

  • you don't have an observable or anything that can be converted into one.
  • you require a hot observable.
  • the scope of your observable is a type.
  • you don't need to define a similar event and no similar event already exists.

Why should I use a subject in that case?

Because you've got no choice!

 


  1. At least that's my interpretation of Erik's comments.  I find it to be really good advice in practice when working with Rx.
  2. Perhaps in very advanced and rare scenarios you may find a custom ISubject<T> implementation to be useful, in which case it's safe to use Multicast rather than Publish.
  3. Which happens to be an easy-to-miss bug when implementing custom operators in Rx.  If you need to return a cold observable, as generally operators do, and you must use a closure, then wrap the closure and the call to Create in a call to Defer to ensure that the closure isn't shared across multiple subscriptions.
  4. In some advanced and rare scenarios you may want to elide this contract and permit concurrent notifications through a subject, for performance reasons, though you should do so only if you can guarantee that none of Rx's query operators will be applied to the subject and all subscribers are thread-safe.

Tags:

.NET | Patterns | Rx

November 07, 2012

Open Reactive Extensions (Rx)

It’s time to start thinking reactively!  What have you been waiting for?

Maybe you were waiting for this:
Yesterday, Microsoft announced that Reactive Extensions (Rx) is now an open source project.  The complete source code for Rx 2.0 is being hosted at http://rx.codeplex.com under the Apache License 2.0.  After an extensive prerelease period and two “officially” released versions, Rx is finally being opened up to the community for a look inside the monadic machinery that helps developers to compose asynchronous queries with ease.  Not only that, but now you can submit bug fixes and implement features yourself.*

Maybe you were waiting for this:
Interactive Extensions (Ix) for .NET and JavaScript are open as well.  They are included under the same source control root as Rx.  I’ve decided to form a habit:  Whenever I create a new project, before I even write a single line of code, I’ll add references to the Rx Package and Ix Package from NuGet.  (Note:  I just started a discussion about the future of distribution, so those links may change.)

Maybe you were waiting for this:
The Rx Team has included brand new libraries to support C/C++.  They are aptly named Rx++ and Ix++.

Or maybe this:
The Rx Team has included new “Rx binding” examples, which are basically just specialized examples illustrating how to apply Rx and LINQ to specific domains.  They’ve included Tx, which is a set of code samples showing how to use LINQ to Events; e.g., real-time queries from trace logs for ETW, Windows Events and SQL Server Extended Events.  Another Rx binding example they’ve included is LINQ2Charts, which “allows developers to use LINQ to create/change/update charts in an easy way and avoid having to deal with XML or other underneath data structures”.

They’re also looking for new examples of Rx bindings, so let them know if you’ve got any.

Or this?
Rx already has an open source community building libraries and tools around it.  And now that Rx itself is open source, that should help to strengthen the existing open source Rx community by exposing it to more developers.  So even if you can’t/won’t contribute code directly to Rx, either due to complexity or legal constraints or whatever, then at least consider contributing code and/or feedback to one of the existing open source projects that enhance Rx.

Seriously, are you still considering how to develop asynchronous applications without Rx?  Are you also considering how to simplify data extraction from a SQL Server table without using SQL?  Or concurrently poke your eyes out with the pointy ends of a Metro-style button?

My (Additional) Impressions

A primary key to success for any open source library is to be extremely useful.  Another key to success is to be unique.  Rx is both, so I have no doubt that it will flourish in the open source world.

Rx goes a long way to make asynchronous programming easy.  It’s a solid library built around core principles that hides much of the complexity of controlling and coordinating asynchrony within any kind of application.  Opening it will help to lower the learning curve and increase the adoption rate of this amazing library, enabling developers to create complex asynchronous queries with relative ease and without any spaghetti code left over.  If asynchronous spaghetti code were a disease, Rx is the cure.

The more functionality a library offers, the more it becomes imperative to be able to peruse the source code.  Understanding user error and library bugs can be very difficult without source code, especially given the inherent complexity of asynchronous programming and the many factors of Rx’s public contracts, which must be considered while developing and debugging.  Capturing every last detail about Rx operators in documentation may be overly cumbersome or even impossible.  Opening Rx can help to make building and debugging complex asynchronous queries much easier.

Opening Rx allows the community to be less entitled and more empowered.  Developers using Rx already identify bugs and request features by posting them in the Rx forum, so it makes perfect sense to allow them to fix bugs and implement features themselves in a timely manner and share their work directly with the community.

Opening Rx to the community will help to tighten their relationship.  It will help to focus the flow of feedback to Microsoft, enabling more informed decisions about the shape of features and the future of Rx.

I’ve noticed the trend at Microsoft to move very useful products into open source projects on CodePlex.  I’ve been wondering whether Rx would take this route as well.  There are many great uses for Rx and its reach will certainly grow even more as an open source project.  The communities around Microsoft’s many open source projects are continuously growing and Rx certainly deserves to benefit from community involvement as much as any other product.

*After jumping through a legal hoop or two.  Read the Rx project home page for details.

Tags: ,

Open Source | Rx

May 05, 2012

TCP Qbservable Provider Security

UPDATE: The prototype TCP Qbservable Provider library that is described in this article has become the Qactive project on GitHub, including many bug fixes and improvements.

This is the second post in a series of posts on my TCP Qbservable Provider library, which is a proof-of-concept library that enables easy hosting of queryable observable services over TCP.  In this post, we dive into security.

There are two kinds of security that I believe must be fully supported by any IQbservable Provider to be a viable option for hosting public web services that can accept queries from anonymous clients:

  1. Semantic security
  2. Resource security

I’ve recently checked into source control new security features that address these issues.  More work needs to be done before the next release, but it’s a good start.  I’ve also included new examples in the example applications: SandboxedService, LimitedService and MaliciousClient.  These new examples are explained throughout this post.

Note that the new restricted security model is enabled by default for all hosted queries.  You must opt-out of the default security model by setting specific properties and by using a specific API to host your service.  The details are explained within this post.

Semantic Security

Semantic security is about limiting the kinds of queries that a service will execute to a minimal set that makes sense for the kind of service being offered.  It can be broken down further into two subcategories:

  1. Operators and Methods
  2. Types of Expressions

Operators and Methods

Semantic security limits the kinds of operators and methods that the service permits.  Clients can technically send any kind of operators that they want – you can’t prevent anonymous clients from sending strange (or even malicious) queries; however, the service must reject queries that don’t make sense based on the operators used within the queries to prevent the server from executing code that is unrelated to or just plain wrong for the service being offered.

For example, imagine that you’ve written a service that pushes news to clients: IQbservable<Article>.  Without semantic security, the service will happily accept any kind of query from clients.  Thus clients may execute operations such as windowing, buffering, aggregation, creating timers, creating ranges, zipping, joining, filtering, projecting, among others.  That’s a lot a functionality to offer for a news service, though perhaps it makes sense for your particular implementation; e.g., it might be nice to allow a client to query for articles of two different authors joined by coincidence over a particular duration.  However, does it make sense for a client to submit a query that uses the Range operator to generate a sequence of values from 1 to 10, for each article, and then project each value without the article?  Probably not.  A client could just as easily query for all articles unconditionally and then generate a range from 1 to 10 locally, without having to burden the server with such a trivial request.  Perhaps it would be better for this service to disallow use of the Range operator.

In fact, it may be better to limit queries to just simple filtering and projection, at first.  Clients could filter news articles and select specific properties in which they’re interested.  This may provide enough functionality to support common queries and yet still provides some useful optimizations.  It also keeps your service fair and responsive by doing less work per query, assuming that clients don’t generally need any other functionality.  After going live with your service you could accept feature requests for other kinds of query operators based on user feedback, consider whether the additional semantics make sense for your particular service, and then enable those operators.  You should also consider whether your server can handle the additional load of more complex queries – the next section, Resource Security, addresses that topic.

The first release of TCP Qbservable Provider (1.0 Alpha) already supports semantic security by allowing you to host queries from your own IQbservableProvider by calling the ServeTcp extension method; however, to limit query operators you must manually create a custom ExpressionVisitor, which can be somewhat difficult to do.  So to make things easier, a new feature (recently checked into source control) of the TCP Qbservable Provider library enables easy whitelisting of operators so that you don’t have to create your own IQbservableProvider or ExpressionVistor.

To use this feature, simply create an empty instance of ServiceEvaluationContext and call the AddKnownOperators method to whitelist operators that your service must support.  Then assign this context to a QbservableServiceOptions object that is passed to the ServeQbservableTcp extension method, which is responsible for hosting your observable over TCP.

var context = new ServiceEvaluationContext(
    Enumerable.Empty<MethodInfo>(),
    includeSafeOperators: false,
    includeConcurrencyOperators: false);

context.AddKnownOperators("Where");
context.AddKnownOperators("Select");

var service = source.ServeQbservableTcp(
    endPoint,
    new QbservableServiceOptions()
    {
        EvaluationContext = context
    });

With the above configuration, client queries using any methods other than the Where and Select operators are automatically rejected by the server.  To see rejection in action, take a look at the MaliciousClient and LimitedService examples checked into source control.

Types of Expressions

The ExpressionOptions property of the QbservableServiceOptions object provides lower-level control over the types of expressions that are permitted.  By default, only basic expressions are permitted.  Expressions such as assignments, blocks, loops, constructor calls and others are rejected by the service.  The default option is recommended for public services, though you can assign this property to any combination of flags to allow additional expression types when needed.

Opting Out of Semantic Security

To disable the semantic security model entirely, set the AllowExpressionsUnrestricted property of the QbservableServiceOptions object to true.  By doing so, the provider will ignore the ServiceEvaluationContext object and the ExpressionOptions property and simply allow any expression and operator to be used within a query.  Of course, this is only recommended if you fully trust all clients, which is not the case in public scenarios.

Resource Security

Resource security is about preventing unauthorized code from accessing system resources, such as memory, CPU, the file system and networking components.  It can be broken down further into two subcategories:

  1. API Security
  2. Algorithmic Security

API Security

The FCL offers many APIs that malicious clients could abuse, such as System.AppDomain, System.Environment, System.IO.*, System.Reflection.*, System.Security.*, System.Windows.*, etc.  In order to safely host a service that can execute arbitrary code from clients, the service would have to prevent unauthorized access to certain APIs.  The whitelisting approach used by the semantic security model described previously is one way to solve this problem, though the mechanism that is generally used to prevent unauthorized access to APIs in the .NET Framework is Code Access Security (CAS).  CAS offers a way to restrict APIs through permissions.

Whitelisting has an advantage over CAS.  CAS allows certain APIs to execute that don’t make sense in a server environment, simply because they don’t demand any permissions; e.g., System.Console.WriteLine.  In order to entirely prevent use of System.Console within queries, whitelisting must be used.

However, there are advantages to using CAS over whitelisting.  For one thing, the whitelisting approach I’ve implemented only checks methods and operators, while the CAS approach applies to any operator, method, constructor, event or property.  Furthermore, the CAS approach applies generally to the entire .NET Framework and third-party assemblies, whereas the process of manually whitelisting APIs may inadvertently expose clients to a security vulnerability hidden deep within some seemingly benign API.  In other words, CAS frees you from the burden of having to ensure that your chosen whitelisted operators and methods don’t inadvertently permit clients to format your C: drive.

Therefore, using sandboxing together with whitelisting provides the strongest security model for hosting services publicly by limiting the APIs that services will permit within anonymous client queries to the set of APIs that are resource-safe and semantically acceptable for a given service.

To incorporate CAS security into the TCP Qbservable Provider library, I’ve added new overloads for the QbservableTcpServer.CreateService method.  You may already be familiar with the CreateService method, since it’s also the API that enables you to host a service accepting an argument, as described in my first blog post in this series.  The new overloads begin with an AppDomainSetup parameter.  They host services within a sandboxed AppDomain that applies minimal CAS permissions by default, though it’s easily configurable by passing in a PermissionSet object to one of the overloads.  By default, sandboxed services will only permit queries to execute code that runs within a transparent security context, denying access to any API that demands additional permissions.  In other words, basic code can be executed within queries, but any APIs that demand additional permissions are rejected by throwing a SecurityException.

To host your service in a sandbox with minimal permissions, call the QbservableTcpServer.CreateService method and pass in an AppDomainSetup object as the first argument.  You must assign its ApplicationBase property to the base path of your sandboxed AppDomain, from which dependent assemblies will be loaded.  The .NET Framework guidelines recommend specifying a different path than the application’s actual base path, for additional security.  In the example below, I’ve assigned the sandbox’s base path to the application’s sandbox_bin subfolder, which would contain only the assemblies that the sandbox requires to execute the host service.

Unfortunately, that’s not all you must do.  The service factory argument cannot be a lambda because it’s invoked within the AppDomain and, therefore, must be serializable, though the compiler does not ensure that lambdas’ closures are serializable.  You may be able to workaround this if a closure isn’t created by referencing a method on a MarshalByRefObject, but that may actually defeat the purpose of the sandbox because ultimately client queries would be executed outside of the sandbox.  Instead, you should use a static factory method to create your service, as shown in the example below.

At the moment, due to type-inference issues, you must explicitly wrap the call to the factory method within an explicit delegate, though I plan on addressing that problem in the future, if possible.

var appBase = Path.GetDirectoryName(new Uri(Assembly.GetEntryAssembly().CodeBase).LocalPath);

var newAppBase = Path.Combine(appBase, "sandbox_bin");

var service = QbservableTcpServer.CreateService<object, int>(
    new AppDomainSetup() { ApplicationBase = newAppBase },
    endPoint,
    new Func<IObservable<object>, IObservable<int>>(CreateServiceObservable));
...
public static IObservable<int> CreateServiceObservable(IObservable<object> request)
{
    return Observable.Range(1, 3);
}

With the above configuration, client queries using APIs that require any permissions other than security-transparent execution are automatically rejected by the server.  To see rejection in action, take a look at the MaliciousClient and SandboxedService examples checked into source control.

Keep in mind that your service callbacks, including the CreateServiceObservable method shown above, are executed with the reduced permission set of the sandboxed AppDomain; however, you can assert additional permissions from within your callbacks if needed.  Additional permissions may be needed when creating the service observable or executing observation side-effects, as shown in the following example.

var service = QbservableTcpServer.CreateService<object, int>(
    new AppDomainSetup() { ApplicationBase = newAppBase },
    endPoint,
    new Func<IObservable<object>, IObservable<int>>(CreateServiceObservable));

service.Subscribe(terminatedClient =>
    {
        new PermissionSet(PermissionState.Unrestricted).Assert();

        try
        {
            Log.Trace("Client shutdown: " + terminatedClient.Reason);
        }
        finally
        {
            PermissionSet.RevertAssert();
        }
    });

In the example above, the hypothetical Log.Trace method demands full trust to execute.  Since the callback is executing under the permission set granted to the sandboxed service, the demand will fail unless the required permissions are asserted, even across an AppDomain boundary.

The reason why full trust can be asserted here but not by clients’ queries is because callbacks are executed by one of your own assemblies while queries are executed within security transparent dynamic assemblies, which cannot assert permissions beyond the set granted to the sandbox.

The entry point assembly is automatically granted full trust permissions by the CreateService method and, presumably in the above example, the assembly containing the callback code is the application assembly, so it’s able to assert full trust.  Though if it’s not the application assembly, then you can specify additional full-trust assemblies by calling a specific overload of the CreateService method that accepts an array of Assembly objects.

Note that to assert full trust, assemblies must be strong-named.

Algorithmic Security

Algorithmic security is about restricting the types of expressions that the service permits based on their effect on system resources.

For example, returning to our IQbservable<Article> news service, let’s assume that we’re executing our service in a sandbox and we’ve already applied limited semantic security to prevent most operators from being executed, with the exception of a few that we think will be useful to clients for querying news articles:  Where, Select, Aggregate, Scan, GroupBy, Join, Timer, Interval and Sample.  We’re also allowing primitive types, List<T> and arrays to be created within clients’ queries.

Is our news service safe?

No, it’s not safe.  We wouldn’t want the service executing a client’s query if it contained expressions that performed any of the following actions, listed in an approximately-progressively-worsening order:

  • Performs 100 operations in a single query.
  • Creates 500 joins.
  • Allocates a 10MB array for every article containing the individual words within that article.
  • Aggregates lists of 10K articles containing the entire contents of each article.
  • Samples at a very short duration causing frequent output.
  • Starts a high-rate timer causing frequent output.
  • Starts 100K low-rate timers.
  • Allocates a single 2GB array.
  • Creates a single 10GB string.
  • Uses the GroupBy operator in such a way that it never frees memory.

These kinds of queries negatively impact CPU and memory, so their use must be restricted somehow.  To do that, we must first identify how system resources can be effected by queries, then determine how each particular API can be used within queries to effect system resources, and finally design a model for constraining each particular API, or otherwise prevent them entirely.

Generally, the issues with the example queries above are as follows:

  1. Unrestricted quantities of permitted operators.
  2. Unrestricted combinatorial use of operators.
  3. Unrestricted operator arguments.
  4. Unrestricted allocation of objects, including primitive types, collections and arrays.
    1. Unrestricted magnitude.
    2. Unrestricted quantity.
  5. Unthrottled output.
  6. Unthrottled input.  (Applies to duplex queries only; examples are not provided above.)

It seems that some of these issues are much easier to detect than others, though all of them need to be addressed to have a truly secure service model for publicly hosting queryable observables.

In general, when any potentially unsafe expressions are found within a client’s query, they must be evaluated to determine whether their particular usage is permitted and under what constraints.  You may notice some overlap with the concept of semantic security described previously.  The difference is that semantic security is only about semantics; i.e., whether the intent of a query through the expressions it uses is compatible with the intent of the service, whereas algorithmic security is about constraining the effects particular expressions have on system resources.

In order to constrain algorithmically-unsafe expressions, we must go beyond simple whitelisting and provide additional options for configuring the limits of particular expressions on a per-operation and per-type basis.

Currently, the TCP Qbservable Provider library does not offer any solutions for algorithmic security, though it’s something that I’m considering for a future release.  I plan to add configurable options such as the following:

  1. Maximum quantities for individually whitelisted operators and methods, with reasonable defaults.
  2. Maximum quantities for categorized operators and methods, with reasonable defaults; e.g.,
    1. Permit only 1 usage of concurrency-introducing operators.
    2. Permit only 1 usage of timer operators.
    3. Permit only 1 usage of buffering operators.
  3. Constraints on particular operator arguments, with reasonable defaults; e.g.,
    1. Interval:  30 seconds <= period <= 12 hours
    2. Range:  0 <= count <= 10
    3. Buffer:  2 <= length <= 100
  4. Maximum quantity and capacity of new collections and arrays, with reasonable defaults; e.g., new object[N], where N <= 10
  5. Maximum quantity and size of strings.
  6. Maximum size of incoming serialized expression trees, in bytes.
  7. Maximum size of serialized query payloads, in bytes, with separate settings for output notifications and duplex input notifications.
  8. Input/output throttling, with a default duration of 30 seconds.

In the meantime, I’ve configured the default semantic security whitelist to exclude all potentially unsafe operators, to disallow array allocations and to disallow explicit calls to constructors.  Note that your service is still capable of creating unbounded objects, including arrays and collections, though queries cannot create non-primitive objects themselves.  Assignments and void-returning methods are also disallowed by default, so all objects and collections are immutable.

However, the current security model isn’t an exhaustive solution.  It doesn’t prevent clients from using clever algorithms to cause stack overflows and out of memory exceptions.  It doesn’t even, for example, prevent clients from easily creating a 2GB string.

Conclusion

There’s more work to be done on the security model to safely host queryable observable services publicly.  Even with whitelisting and sandboxing, the TCP Qbservable Provider is easily vulnerable to simple attacks that could bring down an entire process.

The current security model should, however, prevent clients from directly damaging your system.  Direct damage is prevented by using the sandboxing APIs to create your service, which rejects queries that demand unsafe API permissions.  You should also use semantically-restricted expressions, which are enabled by default, to reject queries attempting to use operators and methods that aren’t permitted by your service’s whitelist.

Tags: ,

IQbservable | Rx | Rxx

April 25, 2012

LINQ to Cloud - IQbservable Over the Wire

UPDATE: The prototype TCP Qbservable Provider library that is described in this article has become the Qactive project on GitHub, including many bug fixes and improvements.

I’ve recently released a TCP Qbservable Provider library within the Rxx project on CodePlex.  It targets the .NET 4.5 Framework and is written entirely in C# 5 and VS 11 Beta.  If stabilized in the future, it will probably be merged into the Rxx library.  It’s currently pre-Alpha, so beware of bugs and don’t expect much consideration for security.  Feedback is welcomed.

In this blog post I’m going to introduce you to the TCP Qbservable Provider library and discuss its key features in some depth.  I'll also include a few working examples at the end.  If you want to try programming the examples yourself, then start by downloading the library and unzip it.  You’ll need to add references to all of the assemblies in the bin folder, though you can download the Rx 2.0 Beta assemblies from NuGet, if you’d prefer.

The applications in the zip file provide more detailed examples, but I won’t be discussing them in this blog post.  I encourage you to try them on your own.  You can download the complete source code, which contains the code for the example applications, the provider library and the Rxx 2.0 Beta library.  (Note that the Rxx 2.0 Beta library has not been officially released yet and is likely to change.)

What does this thing do?

In a nutshell

TCP Qbservable Provider enables you to easily create a TCP web service that offers an IQbservable<T> for clients to query using Rx and LINQ.  You simply choose an end point consisting of an IP address and a port number, then your observable becomes available to clients over TCP.

Clients can query your observable using Rx operators.  Queries can also be written in LINQ’s query comprehension syntax.  When a client subscribes to its query, a connection is made to the service and then the query is serialized and sent to the server, where it is executed.

In case you’re wondering, IQbservable<T> is the model that Rx defines to represent queryable observables.  It’s the dual to queryable enumerables, which is represented by IQueryable<T>.  IQbservable<T> is to IObservable<T> as IQueryable<T> is to IEnumerable<T>.  For a deeper explanation, see this video on Channel 9 by Bart De Smet.

The following diagram illustrates the basic communication process:

image

The basic process:

  1. The server begins hosting a TCP service at a specific end point, which includes an IP address and a port.
    1. The ServeQbservableTcp extension method is used to serve an IObservable<T> as an IQbservable<T>.
    2. The ServeTcp extension method is used to serve an IQbservable<T> directly.
    3. The factory methods on the QbservableTcpServer class are used to create queries that accept subscription arguments.
  2. The client creates an instance of QbservableTcpClient<T> to represent the service locally.  No call is made to the server yet.
    1. The client must specify a type for T that is compatible with the type of the data in the IQbservable<T> service.
    2. The client must supply the end point, which includes the IP address and port of the service.
    3. The client can optionally configure the service proxy; e.g., to enable duplex communication and specify known types.
  3. The client creates a LINQ query by applying Rx operators to the result of the Query method, either by using fluent method call syntax, query comprehension syntax or some mixture of both.
    1. All Qbservable Rx operators are supported.
    2. Arbitrary code can be written within the query to execute side-effects on the server; e.g., via the Do operator.  Clearly, this can be a security concern, but it can also be quite powerful if used properly.
    3. Anonymous types are supported.  (More on this below.)
    4. Full duplex communication is supported.  (More on this below.)
    5. Optionally, the server can require an argument that the client passes to the Query method, which must be a serializable type that the server expects.
  4. The client eventually calls Subscribe on the query, just like a normal IObservable<T>.  This causes a TCP socket to be opened with the server.
  5. The server receives the subscription request and performs a protocol negotiation with the client (currently the entire negotiation is mocked, but it may be developed in the future to support custom protocols).
  6. The client serializes the query and sends it to the server.
  7. The server deserializes the query and begins executing it.
  8. Observable notifications are pushed to the client asynchronously.

Note that all data is currently serialized using a binary formatter.  This includes expression trees and all notifications, in both directions.

Serialized Expression Trees and Anonymous Types

When you write a query against an IQbservable<T>, the compiler creates an Expression Tree to represent your query at runtime.  Often expression trees contain anonymous types that are generated by the compiler; e.g., to support let statements and closures.  The TCP Qbservable Provider must serialize and send an expression tree to the server to be executed.  Even though the Expression classes in the FCL and the types generated by compilers are not serializable, the TCP Qbservable Provider library enables serialization of Expression trees and anonymous types by automatically swapping them with internal, serializable representations.  All types of expressions are supported except for dynamic and debug info expressions, but I’ll consider supporting those in the future as well.

Expression trees are an internal implementation detail that you don’t need to be aware of to consume and host IQbservable<T> services.

Full Duplex Communication

Full duplex communication is when a client and server communicate in both directions simultaneously over a single connection; i.e., the client sends the server data while the server sends the client data, without connecting to another end point.  Full duplex communication occurs automatically for instances of IObservable<T> within a client’s query, though the server must opt-in to allow it.  Full duplex communication can also occur automatically for local members and IEnumerable<T> objects that are generated by iterator blocks, though a client must opt-in to allow it; otherwise, the default behavior is for local members and IEnumerable<T> iterators found within a query to be evaluated locally and then replaced with constants before the query is sent to the server.

While the server is executing the client’s query, if it comes across any local member, including any static or instance methods, properties or fields, and if full duplex communication has been enabled on both the client and the server, then the server will send an invoke message to the client and wait for the return value, synchronously.  The return value is not cached.  A new message is sent each time a local member must be executed within the query.

For full duplex IEnumerable<T> objects generated by iterator blocks, the server sends a synchronous message to the client to get an enumerator.  Subsequent calls to enumerate the enumerable from within the query (e.g., MoveNext, Reset and Dispose) are sent to the client as synchronous invoke messages, similar to the behavior previously described for local members.

While the server is executing the client’s query, if it comes across an instance of an IObservable<T> that was either a closed-over local variable or the return value of a full duplex local member, and if full duplex communication has been enabled on the server (it is automatic on the client for local instances of IObservable<T>), then the server sends a subscribe message to the client and waits for a subscription response, synchronously.  The client is then free to asynchronously push notifications to the server; e.g., OnNext, OnError and OnCompleted.

A large number of local members, IEnumerable<T> iterators and IObservable<T> objects are supported simultaneously for individual clients.  The maximum is currently bounded by the number of keys that fit into a generic Dictionary<TKey,TValue>, per client, though each category also has its own dictionary.  This means that a given client probably supports over 2 billion unique local members, 2 billion IEnumerable<T> iterator instances and 2 billion IObservable<T> instances.  It’s highly unlikely that local members will ever be a problem given the complexity required to generate a query containing over 2 billion expression objects; however, the latter two are dealing with instances of objects that may be generated by the query as it executes.  Though I doubt these capacity constraints will ever be a problem given the context of network communication, it’s something to consider if, for example, you’ll be writing large multi-layered full duplex SelectMany queries that generate many notifications very fast and must execute uninterrupted for a very long period of time.  It’s possible that you may eventually hit 2 billion observables, but it does sound a bit crazy.

Why should I create an IQbservable<T> service?

There are a few major advantages to using a queryable observable instead of a traditional web service:

It’s Asynchronous

Web services typically process requests synchronously, or are fairly complicated to make asynchronous, but a queryable observable service can easily be entirely asynchronous.  By processing requests asynchronously, the server is free to handle additional requests.  This makes for very scalable services.

It’s Reactive

Web services are not typically reactive, but a queryable observable service can easily be entirely reactive.  Clients don’t have to poll your service.  Instead, the server can determine when to send notifications to clients and clients can concentrate on doing other things.

Furthermore, the timing of notifications can be controlled by various internal factors, such as the semantics of the source or the current load on the server.  This is better than clients hitting the server as fast as possible trying to squeeze out more data that simply isn’t available yet, while the server still tries to process requests in a fair manner.  Though a truly reactive service does, however, require mostly persistent TCP connections to be entirely beneficial in this way, perhaps.

It’s Queryable

Reactive web services are typically blunt data hoses that simply receive a request and then begin pushing data to the client as fast as possible, but a queryable observable service receives the client’s query and processes it on the server, so it’s able to return only the data that the client has requested.  Not only is this better for bandwidth usage and client performance, but it’s also potentially better for server performance and overall throughput because the service can interpret the client’s query to make optimizations on-the-fly.

Imagine a social media service that exposes an IQbservable<PersonSaidSomething>.  A client could send a query to the service asking to receive notifications whenever a particular person says something, instead of receiving notifications whenever anyone says something and filtering them on the client.  Admittedly, this particular example isn’t so impressive considering that most social media services probably offer a simple parameterized option to filter notifications for individual people.  So what about a query that asks the service for notifications whenever any person says something about a particular character on a TV show during the date and time on which that show is airing?  Do any social media services offer parameters for that?  What about a query that asks the service for notifications whenever person A writes something longer than 50 characters, between 5 to 10 minutes after person B says something about a particular product?  You get the idea…

It’s Simple

Web services are quite simple to create nowadays, considering all that ASP.NET, WCF and IIS offer, but queryable observable services are simple too if you’re already familiar with Rx and LINQ.  Don’t be frightened by all of the strange terms and confusing explanations – they’re my fault.  If you’re still confused, then try experimenting with it yourself.  Begin with the examples later in this post.

Hosting an IQbservable<T> Service

A service returns an IQbservable<T> so that clients can write queries against it, but the source doesn’t have to be an IQbservable<T>.  The source can be another IQbservable<T> or just any plain old IObservable<T>.

If the service implementation is an IObservable<T>, then the TCP Qbservable Provider uses the local IQbservableProvider (Qbservable.Provider) on the server to execute queries that it receives from clients.  It happens automatically, so you don’t need to be concerned at all about how your observable is being translated into an IQbservable<T>.  You can thank the Rx team for their fantastic work on that (and of course, Rx in general).

If the service implementation is an IQbservable<T>, then think of the TCP Qbservable Provider as merely a communication/serialization proxy.  It takes your existing query and hosts it over TCP.  Furthermore, writing a custom IQbservableProvider allows you to constrain the Rx operators that clients can use, which is good from a security standpoint.

For example, if you want to expose LINQ to WQL (a.k.a., WMI Events) on a server, then simply call the ServeTcp extension method to host your query over TCP.  If you want to expose a typical IObservable<T> on a server, e.g., Observable.Interval, then simply call the ServeQbservableTcp extension method to host your observable over TCP as an IQbservable<T> service.

When a client queries your service, its query is serialized and sent to the server by the TCP Qbservable Provider running on the client. The TCP Qbservable Provider running on the server deserializes the client’s query and then glues it to your hosted query; e.g., Observable.Interval or LINQ to WQL, etc..  From the perspective of the source observable, it’s as if the client’s query was written within the same process.  When data is pushed from the hosted observable, the TCP Qbservable Provider serializes it to the client. From the perspective of the client, it’s as if the provider is running within the same process.

Consuming an IQbservable<T> Service

Clients consume web services by creating proxies that encapsulate all of the communication logic.  A queryable observable service is no different.

A client begins by creating an instance of QbservableTcpClient<T>, which represents a single queryable observable service end point.  This object exposes an IQbservable<T> for the service by calling its Query method.  At this point, the client is able to write a LINQ query and then call Subscribe to initiate communication.  All of this is covered earlier in this blog post, in the section about the basic communication process.

Alright, let’s do some coding…

Examples

Add the code from each of the following examples to the Main methods of new console application projects in VS 11 Beta, targeting the .NET 4.5 Framework.  I recommend creating separate console application projects for the server and client examples.

Add the following using directives to the top of each file.  You may need to include others as well, depending upon the example.

using System;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using QbservableProvider;

As mentioned earlier, you’ll need to add references to all of the assemblies in the bin folder of the zip file you’ve downloaded, though the Rx 2.0 Beta assemblies can be downloaded from NuGet, if you’d prefer.

Example 1: Timer

Let’s start out easy and create a simple queryable observable timer service.  Here’s the spec:

  1. When a client subscribes, start a 2 second timer on the server.
  2. When the timer elapses on the server, send a notification to the client.
  3. The client’s connection persists until either the client or the server cancels the query, or the query completes.
IObservable<long> source = Observable.Timer(TimeSpan.FromSeconds(2));

var service = source.ServeQbservableTcp(new IPEndPoint(IPAddress.Loopback, port: 49593));

using (service.SubscribeEither(
    client => Console.WriteLine("Timer service acknowledged client shutdown."),
    ex => Console.WriteLine("Timer service error: " + ex.Message),
    ex => Console.WriteLine("Timer service fatal error: " + ex.Message),
    () => Console.WriteLine("This will never be printed because a service host never completes.")))
{
    Console.ReadKey();
}

Now let’s create our first client.  Here’s the spec:

  1. Subscribe to the service that we’ve just created.
  2. Print the timer notification from the service.
var client = new QbservableTcpClient<long>(new IPEndPoint(IPAddress.Loopback, port: 49593));

IQbservable<long> query =
    from value in client.Query()
    select value;

using (query.Subscribe(
    value => Console.WriteLine("Timer client observed: " + value),
    ex => Console.WriteLine("Timer client error: " + ex.Message),
    () => Console.WriteLine("Timer client completed")))
{
    Console.ReadKey();
}

To test these examples, make sure that you run the server application first.  Then run the client application.

After 2 seconds the client application prints the following:

Timer client observed: 0
Timer client completed

And the server application prints:

Timer service acknowledged client shutdown.

That example was boring, I know.

Example 2: Confirm the location

Let’s add a side-effect to the query so that we can see where the observable is running.  Here’s the new client:

IQbservable<long> query =
    from value in client.Query().Do(_ => Console.WriteLine("Where am I?"))
    select value;

After 2 seconds the client’s output is the same as before, but the server application now prints the following:

Where am I?
Timer service acknowledged client shutdown.

So it’s clear that our query was running on the server.  But what’s really awesome about this query is that the Do operator executed its lambda expression on the server!  Essentially, we told the server to write something to its own console window from within the query sent by the client.

Example 3: A more interesting client

Now that we know we can have the server do anything that we want, let’s have it download a web page for us.  Here’s the spec:

  1. Subscribe to the service that we’ve just created using the following query:
    1. When the service generates a timer notification, download the web page from http://blogs.msdn.com/b/rxteam.
    2. When the web page is downloaded, send its length to the client.
  2. Print the value received from the service.

Our goal here is to execute the entire query on the server, including downloading the web page.  We know how to verify that the entire query is executing on the server by using the Do operator, so I’ll add that to the end of the query.

IQbservable<int> query =
    (from value in client.Query()
     from page in new WebClient().DownloadStringTaskAsync(new Uri("http://blogs.msdn.com/b/rxteam"))
     select page.Length)
    .Do(result => Console.WriteLine("Where am I? " + result));

Notice that the type of the query has changed from IQbservable<long> to IQbservable<int>.  We’re using the select statement to project a different type of value from the service.

After 2 seconds the client application prints the following (of course, the value may be different in your tests):

Timer client observed: 110429
Timer client completed

And the server prints:

Where am I?  110429
Timer service acknowledged client shutdown.

The server has downloaded a web page and sent back the length of the page to the client.

Example 4: Service arguments

Let’s modify the service to accept the timer’s duration as an argument.  That way the client can control the delay before the web page is downloaded on the server.

To accept an argument, we’ve got to make a slight change to the way in which we’re creating the service.  Instead of simply using the ServeQbservableTcp extension method on our timer observable, we’ll need some way of receiving the argument before we can create the observable.   The QbservableTcpServer class provides static factory methods for creating services based on a function that accepts an IObservable<T>, where T is the type of the argument that the service expects, and returns an IObservable<U> that represents the service.  It’s quite natural to express the concept of clients subscribing to our service with arguments, sometime in the future, as an IObservable<T>.  So we’ll define our service beginning with an IObservable<T> that represents client subscriptions.  I’ll name the lambda parameter that represents this observable as request.

var service = QbservableTcpServer.CreateService<TimeSpan, long>(
    new IPEndPoint(IPAddress.Loopback, port: 49593),
    request =>
        from duration in request.Do(arg => Console.WriteLine("Client sent arg: " + arg))
        from value in Observable.Timer(duration)
        select value);

Notice that we must specify the input and output types of the query when we call the generic CreateService<TSource, TResult> method.  I’ve also added the Do operator to print out the argument that is received from the client.

Now let’s update our client to pass in a duration for the timer.

IQbservable<int> query =
    (from value in client.Query(TimeSpan.FromSeconds(5))
     from page in new WebClient().DownloadStringTaskAsync(new Uri("http://blogs.msdn.com/b/rxteam"))
     select page.Length)
    .Do(result => Console.WriteLine("Where am I? " + result));

Notice that I’m passing in the TimeSpan argument to the Query method.  The rest of the query is unchanged.

The server immediately prints the following when the client application starts:

Client sent arg: 00:00:05

And then after 5 seconds the client and server print the same results as before.

For much more advanced examples, including local member evaluation, anonymous types, full duplex communication of observables and iterator blocks, see the applications that are included in the download.  Grab the complete source code for the applications and the TCP Qbservable Provider library from the Rxx project on CodePlex.

Conclusion

I’ve barely scratched the surface of possibilities offered by the TCP Qbservable Provider service model.  This blog post only provides a high level view with extremely primitive examples.  I hope it has whet your appetite and you’re interested in using it in your apps.

I should also mention that the Rx team has done an amazing job with the core Rx libraries, and the IQbservable<T> stuff is in another league of its own.  I’ve also seen Bart mention a couple of times in the Rx forum that his team is investing more in the IQbservable space regarding remote queries and serializable expression trees, so I’m really curious to see what they’ve done and when/if it will be released.

In the mean time, if you’re interested in developing applications using the TCP Qbservable Provider library, then please let me know.  I enjoyed working on it as a proof-of-concept and I plan to invest more time stabilizing it, unless the Rx team releases something similar.  It will help me to prioritize knowing that people are actually using it.

Your feedback can help guide the direction of this project, so please report bugs and ideas to the Rxx project on CodePlex.  Thanks!

Tags: ,

IQbservable | Rx | Rxx

March 01, 2012

Async Iterators

You’ve already heard about async in C# 5.  This new feature is going to improve UI responsiveness and developer productivity simultaneously, to say the least.  I’ll assume that you’re also familiar with Task<T>, the generic type in the .NET Framework that provides a common model for asynchronous functions, and one of the types on which the async feature depends.

Task<T> has a limitation: it only represents a scalar value T.  This is fine for async methods that only compute a single value, but what if we need to write an async method that lazily computes a sequence of T?  In other words, the entire operation is asynchronous just like Task<T>, but instead of computing a single value we want to potentially compute many values, sequentially, while also being able to await other asynchronous operations.

The best we can do with Task<T> is to return Task<IList<T>>, gathering each T into a list.  There’s no way to yield each individual T as they are computed when using Task<IList<T>>.

The following C# console application illustrates this behavior.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class TaskExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start().Wait();
        }

        static async Task Start()
        {
            Output.WriteLine("Starting");

            var list = await AsyncScalar();

            Output.WriteLine("Async Completed");

            foreach (var value in list)
            {
                Output.WriteLine(value);
            }

            Output.WriteLine("Done");
            Output.WriteLine("");
        }

        static async Task<IList<int>> AsyncScalar()
        {
            Output.WriteLine("AsyncScalar Called");

            var list = new List<int>();

            for (int i = 0; i < 3; i++)
            {
                Output.WriteLine("Computing " + i);

                await Task.Delay(TimeSpan.FromSeconds(1));

                list.Add(i);
            }

            return list;
        }
    }
}

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): AsyncScalar Called
(Thread 1): Computing 0
(Thread 4): Computing 1
(Thread 4): Computing 2
(Thread 4): Async Completed
(Thread 4): Value = 0
(Thread 4): Value = 1
(Thread 4): Value = 2
(Thread 4): Done

Note that the Output class is also used by subsequent code snippets in this post, but I won’t be repeating it again.  Here’s the definition:

using System;
using System.Threading;

namespace ConsoleApplication1
{
    static class Output
    {
        public static void WriteLine(string message)
        {
            Console.WriteLine("(Thread {0}): {1}", Thread.CurrentThread.ManagedThreadId, message);
        }

        public static void WriteLine(int data)
        {
            WriteLine("Value = " + data);
        }
    }
}

Notice that the AsyncScalar method in the previous example computes values asynchronously, but they’re gathered into a list that is eventually returned when the computation completes.  The caller awaiting the task gets back a pre-computed list of data (referred to as a hot sequence), which can only be iterated synchronously.

Ultimately, our goal is to somehow replace list.Add with something like yield.  We want to be able to intermix awaiting and yielding in an imperative-style method that supports control flow statements, such as for loops.  And the result must be an asynchronous sequence, which doesn’t require synchronous iteration.

async IEnumerable<T> = Error

C# doesn’t allow us to apply the async keyword to iterator blocks.  To use the async keyword on a method it must return void, Task or Task<T>.  To define an iterator block, we must return IEnumerable<T>.  Clearly these features aren’t compatible based on return type alone.

But why not support IEnumerable<T> as another possible return type for async methods?

The answer (if I may assume what the C# team was thinking) is that Task<T> represents an asynchronous function, which means that it provides the necessary model for awaiting a function’s return value.   IEnumerable<T> only represents an interactive sequence, which means that it doesn’t provide a model for awaiting an iterator block’s return value(s).  Instead, values must be pulled synchronously from the sequence.  As a result, async iterator blocks would always be forced to execute synchronously, defeating the purpose of async.

Ok, so how about Task<IEnumerable<T>>?

Nope.  It’s similar to my first code example above, which uses Task<IList<T>>.  It models an asynchronous function that eventually returns a sequence, but that sequence remains synchronous.  Recall that Task<T> represents a scalar-valued asynchronous function, which means that the C# compiler can’t do any tricks with an iterator block to convert Task<T> into something that represents an asynchronous sequence.  The best it can do is allow us to asynchronously invoke an iterator block, but the generated sequence must be enumerated synchronously, as shown in the previous example.  That’s not our goal at all.

Then how about IEnumerable<Task<T>>?

This might seem correct at first, but actually it doesn’t meet our requirements either.  It represents a synchronous sequence of asynchronous functions.  Interesting, but not what we want.  Let’s assume for a moment that the C# compiler transforms the code between yield return statements into Task<T> objects.  We could then write something like this:

// Doesn't compile
static async IEnumerable<Task<int>> AsyncIterator()
{
    for (int i = 0; i < 3; i++)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));

        yield return i;
    }
}

But consider the behavior when consuming it.  We’d probably use a foreach loop, which calls GetEnumerator to get an IEnumerator<Task<int>>.  Then we’d call MoveNext, which would synchronously enter the for loop and hit the first await.  At this point, we’d expect MoveNext to return true and set Current to a Task<int>.  When the Task<int> completes it should execute the current continuation, which is the code following the await.  So then what happens if we ignore the Task<int> and call MoveNext again?  Perhaps it could block until the current Task<int> completes, but that behavior doesn’t meet our requirements.  We don’t want to block while iterating the sequence.  Furthermore, what happens if when Task<int> completes we don’t call MoveNext right away?  The iterator could begin computing the next Task<int> asynchronously, which means that it would have to buffer the sequence until our synchronous iteration catches up.  This is like the producer/consumer pattern with an implicit queue.  Again, it’s interesting but not what we want.  When a value is computed, we want our continuation to execute right away, without buffering.

So what would happen if we reversed it?  MoveNext returns Task<bool> and Current returns T.

Well that can’t be modeled with any combination of IEnumerable<T> and Task<T>, but if it existed it would allow us to await between yields, keeping the entire computation asynchronous and its iteration asynchronous as well.

IAsyncEnumerable<T>

In the Ix Experimental release (Ix_Experimental-Async on NuGet), there’s a type named IAsyncEnumerable<T>.  It has a single method named GetEnumerator that returns an IAsyncEnumerator<T>, which returns a Task<bool> from MoveNext and a T from Current.  Can we use it to write an asynchronous iterator block?  Let’s take a look at an example.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class AsyncEnumerableExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start().Wait();
        }

        static async Task Start()
        {
            Output.WriteLine("Starting");

            IAsyncEnumerable<int> sequence = DelayedSequence().ToAsyncEnumerable();

            Output.WriteLine("Awaiting");

            await sequence.ForEachAsync(Output.WriteLine);

            Output.WriteLine("Done");
            Output.WriteLine("");
        }

        static IEnumerable<int> DelayedSequence()
        {
            Output.WriteLine("DelayedSequence Called");

            for (int i = 0; i < 3; i++)
            {
                Task.Delay(TimeSpan.FromSeconds(1)).Wait();

                Output.WriteLine("Yielding " + i);

                yield return i;
            }
        }
    }
}

 

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): Awaiting
(Thread 4): DelayedSequence Called
(Thread 4): Yielding 0
(Thread 4): Value = 0
(Thread 4): Yielding 1
(Thread 4): Value = 1
(Thread 4): Yielding 2
(Thread 4): Value = 2
(Thread 4): Done

At first glance it looks like we’re closer to our goal.  At least we can iterate the sequence asynchronously and values are computed asynchronously.  The ForEachAsync extension is provided as a convenience because the foreach statement can’t handle a MoveNext method that returns Task<bool>, but you could instead call GetEnumerator and loop over a call to await MoveNext manually, if you prefer.

So we’re fine on the consumer side, but what about the producer?  Unfortunately, in order to introduce a delay we must block the current thread (note the call to Wait).  That’s because the iterator block returns IEnumerable<T>, which as we already know doesn’t model an asynchronous sequence.  It seems that IAsyncEnumerable<T> creates a Task<T> for each call to MoveNext to make our sequence consumable in an asynchronous fashion.  This means that our iterator block can’t control its own asynchrony.  Concurrency is injected for us automatically when the consumer decides that it’s ready to await the next value.  So essentially it’s still a pull model even though it’s asynchronous.

And of course we can’t simply add the async keyword to our iterator block, as discussed previously.  We’re now back where we started.

We need a better model for our asynchronous sequence.  A type that represents an asynchronous function like Task<T> as well as a sequence of values like IEnumerable<T>, but in a push-based fashion.  Then we can define out iterator block in terms of this new type by yielding values as they are computed asynchronously.

IObservable<T>

IObservable<T> and IEnumerable<T> overlap in that they both provide a lazily-computed sequence of values; however, they differ in that IObservable<T> represents an asynchronous (push) sequence while IEnumerable<T> represents a synchronous (pull) sequence.  A sequence that pushes values enables reactive computations, while a sequence from which values are pulled enables interactive computations.  LINQ provides a declarative abstraction over any computation.  Reactive Extensions for .NET (Rx) provides LINQ operators for IObservable<T> similar to the LINQ to Objects operators that require IEnumerable<T>.  If you’re not familiar with Rx yet, then you may want to get started with this blog post.

Now back to our problem at hand.  How can we write an iterator block with IObservable<T>?  Does the following work?

// Doesn't compile
static async IObservable<int> AsyncIterator()
{
    for (int i = 0; i < 3; i++)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));

        yield return i;
    }
}

Unfortunately, no.  But wouldn’t it be nice?  Conceptually, it meets our requirements.  We can subscribe to the observable to receive values asynchronously.  The iterator block can define the observable by awaiting other asynchronous operations and yielding values as they are computed, sequentially yet asynchronously.

[Edit: 8/21/2012: 
Rx 2.0 offers a better alternative than the Rx 1.1 Experimental method that is described in the remainder of this blog post.  It enables you to write code that looks very similar to the previous hypothetical example.  For more information, read this forum thread.]

Well the Rx team has already thought about this limitation in C# and provided a solution.  The Rx Experimental release (Rx_Experimental-Main on NuGet) provides several overloads of the Observable.Create method, one of which has the following signature:

public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod);

This method allows us to generate an observable using a normal iterator block.  But since we can’t mark the iterator block as async, we can’t await other asynchronous operations.  Instead, we’ll yield observables to await them.  But if we’re yielding observables, then how do we also yield computed values?  That’s where the IObserver<T> object comes in handy.  Here’s an example:

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class ObservableExample
    {
        public static void Main()
        {
            Output.WriteLine("Main Thread");

            Start();

            Console.ReadKey();
        }

        static void Start()
        {
            Output.WriteLine("Starting");

            var xs = Observable.Create<int>(observer => AsyncSequence(observer));

            Output.WriteLine("Subscribing");

            xs.Subscribe(Output.WriteLine, () => Output.WriteLine("Done"));
        }

        static IEnumerable<IObservable<object>> AsyncSequence(IObserver<int> observer)
        {
            Output.WriteLine("AsyncSequence Called");

            for (int i = 0; i < 3; i++)
            {
                yield return Task.Delay(TimeSpan.FromSeconds(1)).ToObservable().UpCast();

                Output.WriteLine("Pushing " + i);

                observer.OnNext(i);
            }
        }
    }

    static class ObservableExtensions
    {
        public static IObservable<object> UpCast<T>(this IObservable<T> source)
        {
            return source.Select(value => (object) value);
        }
    }
}

Output:

(Thread 1): Main Thread
(Thread 1): Starting
(Thread 1): Subscribing
(Thread 1): AsyncSequence Called
(Thread 5): Pushing 0
(Thread 5): Value = 0
(Thread 4): Pushing 1
(Thread 4): Value = 1
(Thread 6): Pushing 2
(Thread 6): Value = 2
(Thread 6): Done

This behavior is exactly what we wanted: asynchronous subscribe, observations of values as they are generated, the ability to await other asynchronous operations and the use of normal control flow statements, such as for.

However, the implementation is a bit strange.  We’ve replaced list.Add from the first example with observer.OnNext instead of yield, which isn’t so bad, but then we also had to replace await from the first example with yield.  Not very intuitive, perhaps.

Maybe in the future we’ll get first-class compiler support for writing observable iterators.  Until then, this should do fine.

[Edit: 8/21/2012: 
Rx 2.0 offers a better alternative than the Rx 1.1 Experimental method that is described above.  It enables you to write code that looks very similar to what first-class compiler support might look like, although it requires the use of a callback.  For more information, read this forum thread.]

Side note:

C# doesn’t support anonymous iterator blocks.  Here’s something else interesting: VB 5 does!

Do you agree that anonymous observable iterators is an apt use case for such a feature in C#?

Tags:

Async | C# 5 | Rx