The Architect´s Napkin

Software Architecture on the Back of a Napkin
posts - 69 , comments - 229 , trackbacks - 0

My Links



Introduction to the Flow Execution Engine NPantaRhei

How can data processing flows be implemented in an easy manner?

What I call data flow processing – or flow design (FD) - I´ve defined here and described here for example. (Although this might look like Flow-Based Programming (FBP) it´s just related. FBP is all about concurrent programming, but FD starts much simpler with synchronous sequential programming.)

And how such flows can be translated into a modern OO language like C# is described here.

Such translation is pretty conventional, although it might look strange. Event-Based Components (EBC) are executed directly. They are written in a 3GL imperative way.

This has several drawbacks:

  • Execution cannot be controlled. Once started it just runs unto its end.
  • Although EBC code can be generated from some FD notation it´s imperative and not easy to reverse model. To get the model behind the code is hard at least for the untrained eye.
  • It´s not so easy to switch from sync sequential processing to async or parallel processing.

Due to these and other drawbacks I decided to switch flow execution from direct execution to managed execution. That´s what the Flow Execution Engine is about. It interprets a flow definition thereby allowing for much more dynamic and fine grained control over the execution. Here´s how it works…

Example Flow

Let me use an example as the guiding rail for my tour through the Flow Execution Engine. The problem to solve: compile all .txt files in a directory tree and count their words. The FD model for this could look like this:


  1. The first operation finds all the files in a given dir tree defined by a path.
  2. The file names are passed to the second operation which processes each file by counting its words.
  3. The word count for each file is passed to the totalling operation which outputs the total number of words in all files found plus the number of files processed.

The initial path is passed into the flow via the .in input port. The result can be retrieved from the output port .out.

All operation are feeded through implicit input ports; their output ports are also implicit. No names for their input/output ports are yet necessary, because there is only one of them on each operation.

Encoding the Model

With EBC such a model would be translated into C# code like this:

var ff = new Find_files();
var cw = new Count_words();
var tot = new Total();

ff.Result += cw.Process;
cw.Result += tot.Process;
tot.Result += result => {…};

Each operation would become a class with a method (Process()) as input port and an event (Result) as output port. Both, flow as well as operations would be described using the same 3GL.

The flow execution engine uses a different approach. Since it executes flows in a managed way it needs a flow definition it can understand. The simplest form of such a definition is a list of streams between operations, for example:

.in, Find Files
Find Files, Count Words
Count Words, Total
Total, .out

A stream is what connects operations, the arrow in the above diagram. It´s described by an output port as its source and an input port as its sink. Since input/output ports are implicit for such simple operations as above only their names are used in the stream table. Which port is meant is obvious from the position of the operation: input ports are referenced if an operation is noted in the right column, output ports are referenced in the left column.

To initialize the execution engine with a list of streams like this is easy:

using (var fr = new FlowRuntime())
    fr.AddStream(".in", "Find files");
    fr.AddStream("Find files", "Count words");
    fr.AddStream("Count words", "Total");
    fr.AddStream("Total", ".out");

Implementing Operations

The flow execution engine makes implementing operations easier than EBC. No classes are required anymore. Operations are functions or methods, if you like. This is trivial as long as there is one input port and no or just one output port on an operation. The implementations for the example operations can look like this:

static IEnumerable<string> Find_Files(string path)
    return …;

static IEnumerable<int> Count_Words(IEnumerable<string> filenames)
    return …

static Tuple<int,int> Total(IEnumerable<int> wordCounts)
    return new Tuple<int, int>(wordCounts.Count(), wordCounts.Sum());

Registration of the operations is done through a special container which wraps each function in an IOperation:

var foc = new FlowOperationContainer()
    .AddFunc<string, IEnumerable<String>>("Find files", Find_files)
    .AddFunc<IEnumerable<string>, IEnumerable<int>>("Count words", Count_words)
    .AddFunc<IEnumerable<int>, Tuple<int, int>>("Total", Total);

IOperation is the interface all operations need to implement:

public interface IOperation {
    string Name {get;}
    OperationAdapter Implementation {get;}

public delegate void OperationAdapter(IMessage input, 
                                      Action<IMessage> outputContinuation, 
                                      Action<FlowRuntimeException> unhandledException);

This way the flow execution engine is independent of how you specifically implement your operations. The flow operation container serves as the adapter between your implementation and the engine. For the most common way of implementing operations – functions, simple methods – the container readily provides registrations methods.

If you want more flexibility, though, you can provide IOperation implementations yourself. The abstract class AOperation makes this easy, e.g.

class Total : AOperation
    public Total(string name) : base(name) {}

    protected override void Process(IMessage input, 
Action<IMessage> continueWith,
                                    Action<FlowRuntimeException> unhandledException)
        var wordCounts = (IEnumerable<int>)input.Data;
        var result = new Tuple<int, int>(wordCounts.Count(), wordCounts.Sum());
        continueWith(new Message(base.Name, result));

Such an operation then is registered directly with the flow execution engine:

fr.AddOperation(new Total(“Total”));

Executing a Flow

One the flow is defined and all operations registered, executing it is straightforward:


fr.Process(new Message(".in", "c:/"));

Tuple<int, int> result = null;
fr.WaitForResult(5000, _ => result = (Tuple<int, int>)_.Data);

Console.WriteLine("{0} words in {1} files", result.Item2, result.Item1);

  1. Start the flow execution engine by calling Start(). It runs on its own thread in the background. That means a flow is always executed in parallel to whatever else your programm is doing. However, this does not mean, the flow operations itself are run concurrently with regard to each other. They still are executed sequentially.
  2. Send messages to the flow hosted by the engine using Process(). Messages are tuples consisting of some data and a source port.
  3. Collect any results from the Result event by assigning an event handler or wait for them like above by calling WaitForResult().

How does this work?

The flow execution engine takes the message sent to it via Process() and puts it in a queue.

The main engine thread runs in a loop taking the next message from this queue. This happens on its own thread.

It looks at the message port (source) and looks it up in the list of streams. For the initial “.in” it finds “Find Files” as the sink.

The message then is sent to each sink port. That means a task is created which combines the message with the sink operation.

Then the operation is executed by calling it with the message as parameter.

Whatever the operation produces as a result it wraps up in an output message assigned to an output/source port of the operation.

These output messages are enqueued to be picked up by the execution engine main loop.

Messages flowing to sink ports starting with a “.” (like “.out”) are sent to the execution engine´s environment. They can be picked up from the Result event. Please note they arrive on a different thread than the caller´s. To make it easier to use the execution engine in synchronous programming use WaitForResult() to pick up messages on the same thread which called Process().


So much for a first introduction to the basics of the flow execution engine. You can download the sources of it from github:

But wait, there´s more! In future articles I´m going to show you how to switch from sync to async and parallel execution of flow operations. And how to handle execeptions occuring during execution.

Print | posted on Friday, April 27, 2012 6:11 PM | Filed Under [ Event-Based Components ]


No comments posted yet.
Post A Comment

Powered by: