Stateful Akka.NET actors with F#

Akka.NET is the .Net implementation of Java's Akka framework, which allows developers to build "highly concurrent, distributed, and resilient message-driven applications" via an actor model. One of the many nice features of Akka.NET is that it comes with an F# API. However, there are some scenarios which the F# API does not specifically cater for, and one of those is stafeul actors.

In this post I will describe how to implement a simple stateful Akka.NET actor in a functional style without mutable variables.

Stateful actors

When talking about stateful actors I am specifically talking about actors that behave like finite state machines - i.e. they have some concept of their current state and can change their behaviour depending on their inputs. Akka.NET supports this kind of actor via switchable behaviours. Aaron Stannard's blog post "Building Finite State Machines With Actors: Switchable Behavior" gives an excellent overview of how you would implement a stateful actor in Akka.NET using C#.

Akka.NET's F# API provides a lot of handy tools for working with actors in a functional style but unfortunately switchable behaviours are not covered, so in order to use them you have to create a type which implements ReceiveActor as per Aaron's blog post. Implementing stateful actors in this way will also likely require the use of mutable variables.

Given how elegant actors created using the actorOf functions provided by the F# API can be, stateful actors created by inheriting from ReceiveActor can seem clunky and out of place, and mutable variables are generally frowned upon in functional programming.

It would be useful to have an equivalent of actorOf for creating stateful actors in a functional style and without mutable variables for the sake of consistency if nothing else.

C# example

Before we look at implementing stateful actors in F# let's look at a simple C# example.

One usage of stateful actors is to act as an aggregator for data - e.g. an actor which collates data from multiple sources into a single message. This actor might have two states - waiting and collecting. When the initial request for the collated data arrives it sends requests to various other actors for data and then moves to the collecting state and waits for replies. When all of the data has been received it sends the collated data back to the original requester and then moves back into the waiting state or stops, depending on the lifecycle.

Here is a simplified version of that stateful actor written in C#. This actor starts in the waiting state. When it receives the StartCollecting message it moves to the collecting state. When in the collecting state the value of any Collected messages are recorded. When the maximum number of values have been collected or the StopCollecting message is received the actor prints the values it has stored and moves back to the waiting state.

public class ExampleActor : ReceiveActor {

    private int _count;
    private readonly List<String> _values = new List<string>();

    public ExampleActor() {
        this.Waiting();
    }

    private void Waiting() {
        this.Receive<Messages.StartCollecting>(message => {

            Console.WriteLine("Starting collection");

            _values.Clear();
            _count = message.Count;

            this.Become(this.Collecting);
        });
    }

    private void Collecting() {
        this.Receive<Messages.Collected>(message => {

            _values.Add(message.Value);

            if(_values.Count() == _count) {

                Console.WriteLine(
                    "Finished collecting ({0}/{0}): {1}", 
                    _count, 
                    String.Join(",", _values)
                );

                this.StopCollecting();
            }
        });

        this.Receive<Messages.StopCollecting>(message => {

            Console.WriteLine(
                "Stopped collecting ({0}/{1}): {2}", 
                _values.Count(), 
                _count, 
                String.Join(",", _values)
            );

            this.StopCollecting();
        });
    }

    private void StopCollecting() {
        this.Become(this.Waiting);

        //Or stop
        //ReceiveActor.Context.Stop(this.Self);
    }
}

While it is possible to implement this actor in F# using the same approach let's look at a more elegant approach to creating stateful actors by building on Akka.NET's F# API.

Stateful actors using F#

The basic implementation of an Akka.NET actor in F# is a simple recursive function which receives the next message, processes it and then loops.

let actor = 
    spawn system "Actor" (fun mailbox ->

        let rec run () = actor {

            let! message = mailbox.Receive ()

            //Process the message

            return! (run ())
        }

        run ()
    )

We can use the recursive nature of the run function to keep a track of the actor's current state, both in terms of its behaviour and any associated data. We might try and implement this like so:

let initialState = ...
let initialHandler = ...

let actor = 
    spawn system "Actor" (fun mailbox ->

        let rec run data handler = actor {

            let! message = mailbox.Receive ()

            let data', handler' = (handler data message)

            return! (run data' handler')
        }

        run initialState initialHandler
    )

Here the actor's behaviour is defined by the handler function which accepts the current state's data and the message and returns udpated data and the next handler to be used. However we run into problems when we try to define the signature for handler as it is self recursive - i.e. the second element in the tuple it returns has the same signature as itself.

To address this we can use recursive types to create a definition of our handler.

[<AutoOpen>]
module Types = 

    type Handler<'TData, 'TMessage> = 'TData -> 'TMessage -> Instruction<'TData, 'TMessage>

    and Instruction<'TData, 'TMessage> = 
        | Continue of 'TData
        | Become of ('TData * Handler<'TData, 'TMessage>)
        | Unhandled
        | Stop

First we define a handler type which is a function which accepts 'TData and 'TMessage and returns an Instruction<'TData,'TMessage>. Secondly we define Instruction<'TData, 'TMessage> as a union of four possible values:

Now that we have defined what our handler function should look like we can try and implement our actor again:

let handlers = ... //Handler<'TData, 'TMessage> list
let initialData = ... //'TData

let actor = 
    spawn system "Actor" (fun mailbox ->

        let rec run data handler = actor {

            let! message = mailbox.Receive ()

            //Process the message
            let next, handled = 
                match (handler data message) with
                | Continue data' ->             Some (data', handler), true                 
                | Become (data', handler') ->   Some (data', handler'), true
                | Unhandled ->                  Some (data, handler), false                 
                | Stop ->                       None, true

            //Report unhandled messages
            if (not handled) then
                mailbox.Unhandled (message)

            //Continue or exit the loop
            match next with
            | None -> 
                mailbox.Context.Stop (mailbox.Self)
                return ()

            | Some (data', handler') ->
                return! (run data' handler')
        }

        run initialData (List.head handlers)
    )

Now we have a working stateful actor. The run function receives a message and passes it and any current data to the handler function. The handler returns an instruction telling the actor what do next. It translate this into either the arguments for the next iteration of the run function or as a stop signal (None) for the actor. It also sets a flag indicating whether the current message should be marked as unhandled. The function then either recurses or stops the actor and exits as appropriate.

The run function is started with the first handler in the handlers list and initialData, and will loop until a Stop instruction is issued.

This code can be packaged up into a function similar to Akka.NET F# API's actorOf:

//Handler<'TData,'TMessage> list -> 'TData -> Actor<'TMessage> -> Cont<'TMessage, unit>
let statefulActorOf handlers initialData (mailbox : Actor<_>) = 

    let rec run data handler = actor {

        let! message = mailbox.Receive ()

        //Process the message
        let next, handled = 
            match (handler data message) with
            | Continue data' ->             Some (data', handler), true                 
            | Become (data', handler') ->   Some (data', handler'), true
            | Unhandled ->                  Some (data, handler), false                 
            | Stop ->                       None, true

        //Report unhandled messages
        if (not handled) then
            mailbox.Unhandled (message)

        //Continue or exit the loop
        match next with
        | None -> 
            mailbox.Context.Stop (mailbox.Self)
            return ()

        | Some (data', handler') ->
            return! (run data' handler')
    }

    run initialData (List.head handlers)

We can now use this function to create stateful actors from a list of handlers and some initial data:

let actor = 
    spawn system "Actor" (statefulActorOf Example.handlers Example.initialData)

Finally, we can implement our example actor in a functional style without mutable variables:

[<RequireQualifiedAccess>]
module Example = 

    [<AutoOpen>]
    module private Helpers = 

        let concat (values : String list) = String.Join (",", values)

    type Message = 
        | StartCollecting of Int32
        | StopCollecting
        | Collected of String

    let initialData = (0, [])

    let rec private waiting _ = function
        | StartCollecting count -> 

            printfn "Starting collection"

            Become ((count, []), collecting)

        | _ -> Unhandled

    and private collecting (count, values) = 

        let stopCollecting () = 
            Become (initialData, waiting)
            //Or stop
            //Stop

        function
        | Collected value ->

            let values' = value :: values

            if (List.length values') = count then

                printfn "Finished collecting (%u/%u): %s" 
                <| count 
                <| count 
                <| (concat values')

                stopCollecting () 

            else
                Continue (count, values')

        | StopCollecting -> 

            printfn "Stopping collecting (%u/%u): %s" 
            <| (List.length values) 
            <| count 
            <| (concat values)

            stopCollecting () 

        | _ -> Unhandled

    let handlers = [ waiting; collecting; ]

Here we use a union to model the messages received by the actor and then simply use pattern matching in the handler functions. The 'TData for this actor is (Int32 * String list) and contains both the number of values to collect and the values collected so far. The collecting handler simply adds to the list of values with each message received until the target number of values is reached, or the StopCollecting message is received.

Notice that the handler functions are mutually recursive so that the waiting state handler can return the collecting state handler and vice versa.

Conclusion

In this post I have shown how to create stateful actors in Akka.NET using a functional style. This allows you to create actor systems where all actors are created using a familiar style without the need to inherit base types or use mutable variables.

The code for both the C# and F# implementations of examples are available on GitHub.

Comments