A while back I became quite interested in messaging, I have always used things like Signalr,WCF, MSMQ, and NServiceBus, which are all cool tech for sure. I did however, feel I wanted to learn a bit more, so spent quite a few months experimenting with thing like
- EasyMQ (RabbitMQ based servicebus stylee API)
- Azure ServiceBus
- NetMQ
Whilst in this messaging stage of my life I came across something that really spiked my interest which is the use of "Actor(s)" for distributed programming. Actors pop up in various frameworks such as:
This article will be about using Akka.NET which is a pretty complete port of the original Akka, so you will pretty much see how that works by the end of this too I hope.
You can grab the demo code for this article from my github account : https://github.com/sachabarber/AkkaWintail
This is not the 1st time I have written about Actor models, in fact a while back I wrote quite a long post (which is good background reading) about an Actor model that I wrote for NetMQ (the .NET port of ZeroMQ). You can read that post here:
https://sachabarbs.wordpress.com/2014/09/05/zeromq-7-a-simple-actor-model/
I was very pleased but this actually made its way into the the actual NetMQ codebase, and is fully documented here:
http://netmq.readthedocs.org/en/latest/actor/
This next bit of text is lifted directly from the post/documentation that I had previous written, though you should still read those if you are interested in seeing the specific NetMQ implementation works.
Anyway lets dive it shall we........
Here is what Wikipedia has to same in the introduction to what an Actor Model is.
The actor model in computer science is a mathematical model of concurrent computation that treats “actors” as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.
….
….
The Actor model adopts the philosophy that everything is an actor. This is similar to the everything is an object philosophy used by some object-oriented programming languages, but differs in that object-oriented software is typically executed sequentially, while the Actor model is inherently concurrent.
An actor is a computational entity that, in response to a message it receives, can concurrently:
- send a finite number of messages to other actors
- create a finite number of new actors
- designate the behavior to be used for the next message it receives.
There is no assumed sequence to the above actions and they could be carried out in parallel.
Decoupling the sender from communications sent was a fundamental advance of the Actor model enabling asynchronous communication and control structures as patterns of passing messages.
Recipients of messages are identified by address, sometimes called “mailing address”. Thus an actor can only communicate with actors whose addresses it has. It can obtain those from a message it receives, or if the address is for an actor it has itself created.
The Actor model is characterized by inherent concurrency of computation within and among actors, dynamic creation of actors, inclusion of actor addresses in messages, and interaction only through direct asynchronous message passing with no restriction on message arrival order.
http://en.wikipedia.org/wiki/Actor_model
How I like to think of Actors is that they may be used to alleviate some of synchronization concerns of using shared data structures. This is achieved by your application code talking to actors via message passing/receiving. The actor itself may pass messages to other actors, or work on the passed message itself. By using message passing rather than using shared data structures, it may help to think of the actor (or any subsequent actors its send messages to) working on a copy of the data rather than working on the same shared structures. Which kind of gets rid of the need to worry about nasty things like lock(s) and any nasty timing issues that may arise from carrying out multi threaded code. If the actor is working with its own copy of the data then we should have no issues with other threads wanting to work with the data the actor has, as the only place that data can be is within the actor itself, that is unless we pass another message to a different actor. If we were to do that though the new message to the other actor would also be a copy of the data, so would also be thread safe.
I hope you see what I am trying to explain there, may be a diagram may help.
A fairly common thing to do is have multiple threads running to speed things up, but then you realise that your threads need to mutate the state of some shared data structure, so then you have to involve threading synchronization primitives (most commonly lock(..) statements, to create your user defined critical sections). This will work, but now you are introducing artificial delays due to having to wait for the lock to be released so you can run Thread X’s code.
To take this one step further, lets see some code that may illustrate this further, imagine we had this sort of data structure representing a very slim bank account
namespace ConsoleApplication1
{
public class Account
{
public Account()
{
}
public Account(int id, string name,
string sortCode, decimal balance)
{
Id = id;
Name = name;
SortCode = sortCode;
Balance = balance;
}
public int Id { get; set; }
public string Name { get; set; }
public string SortCode { get; set; }
public decimal Balance { get; set; }
}
}
Nothing fancy there, just some fields. So lets now move onto looking at some threading code, I have chosen to just show two threads acting on a shared Account instance.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Management.Instrumentation;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
private object syncLock = new object();
private Account clientBankAccount;
public Program()
{
clientBankAccount = new Account(1,"sacha barber","112233",0);
}
public async Task Run()
{
try
{
var addToAccountTask = Task.Run(() =>
{
Console.WriteLine("Tread Id {0}, Account balance before: {1}",
Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
lock (syncLock)
{
Console.WriteLine("Tread Id {0}, Adding 10 to balance",
Thread.CurrentThread.ManagedThreadId);
clientBankAccount.Balance += 10;
Console.WriteLine("Tread Id {0}, Account balance before: {1}",
Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
}
});
var subtractFromAccountTask = Task.Run(() =>
{
Console.WriteLine("Tread Id {0}, Account balance before: {1}",
Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
lock (syncLock)
{
Console.WriteLine("Tread Id {0}, Subtracting 4 to balance",
Thread.CurrentThread.ManagedThreadId);
clientBankAccount.Balance -= 4;
Console.WriteLine("Tread Id {0}, Account balance before: {1}",
Thread.CurrentThread.ManagedThreadId, clientBankAccount.Balance);
}
});
await Task.WhenAll(addToAccountTask, subtractFromAccountTask);
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
static void Main(string[] args)
{
Program p = new Program();
p.Run().Wait();
Console.ReadLine();
}
}
}
I have possible picked an example that you think may not actually happen in real life, and to be honest this scenario may not popup in real life, as who would do something as silly as crediting an account in one thread, and debiting it in another…we are all diligent developers, we would not let this one into the code would we?
To be honest whether the actual example has real world merit or not, the point remains the same, since we have more than one thread accessing a shared data structure, access to it must be synchronized, which is typically done using a lock(..) statement, as can be seen in the code.
Now don’t get me wrong the above code does work, as shown in the output below:
Perhaps there might be a more interesting way though!
The actor model, takes a different approach, where by message passing is used, which MAY involve some form of serialization as the messages are passed down the wire, which kind of guarantees no shared structures to contend with.
So that was a little introduction/catch up, so what will the rest of the article be about. Well the rest of the article will be about how to develop code in .NET which makes use of an Actor Model. We will be proceeding on to look at how to use Akka.NET, I will not be covering a Akka.NET version of the sample above but shall instead build apon a new Akka.NET sample.
As stated already Akka.NET is a direct port of Akka, and its a pretty complete port at that.
There is a truly excellent training guide with start to finish examples that you can go through, where it is nicely broken up into bite sized labs, there are 3 parts with around 6 labs in each. You can grab that from this url :
https://github.com/petabridge/akka-bootcamp
I strongly (see what I did there with my awesome <strong>strongly</strong>
html skills) suggest you read this, and have a go.
The rest of this article will be about Akka.NET internals, and also shall briefly talk about the demo app. I have shamelessly stolen the demo app from the completed part 1 labs from the https://github.com/petabridge/akka-bootcamp training guide. Its a great place to start, and the way I see it I am sharing the Akka.NET love, so they would be happy.
I will of course be adding a bit of new stuff, which is not covered by the labs.
Akka.NET is an actor framework for .NET. Akka.NET is made up of a couple of system level actors (which are not within your control, they are part of Akka.NET). These top level actors are known as "Guardians". You never really deal with these directly. Instead you are expected to create actors within one of the following 2 contexts:
- Within the context of the Akka.NET actor system
- Within the context of an existing actor, where the newly created actor will be a child actor of the context in which it is created. And as such will be supervised by its parent (the chap within whos context this new actor was spawned)
Lets see an example of how to create an actor within the actor system (don't worry for now if this looks weird, the Props
stuff here will be explained in just a moment)
MyActorSystem = ActorSystem.Create("MyActorSystem", config);
Props consoleWriterProps = Props.Create<ConsoleWriterActor>();
IActorRef consoleWriterActor = MyActorSystem.ActorOf(consoleWriterProps, "consoleWriterActor");
Props tailCoordinatorProps = Props.Create(() => new TailCoordinatorActor());
IActorRef tailCoordinatorActor = MyActorSystem.ActorOf(tailCoordinatorProps, "tailCoordinatorActor");
MyActorSystem.AwaitTermination();
It can be seen there is an Akka.NET ActorSystem
which we are using here to create actors, where we use some strange Props
voodoo.. These actors will be supervised by the Akka.NET guardians. Again we will get onto supervision in a while.
Props
is the object you use to create an actor. You MUST use Props
to create an actor within the main ActorSystem
or within the scope of another actor. But how can we use this Props
thing?
There are different ways to create to create actors using Props
, let's see a few examples.
If the actor itself has a default constructor, we can use this form of Props
Props consoleWriterProps = Props.Create<ConsoleWriterActor>();
If however the actor constructor expects parameters, we can use this form of Props
Props someNonDefaultProps = Props.Create(() => new SomeNonDefaultActor(someDependency));
IActorRef someNonDefaultActor= MyActorSystem.ActorOf(someNonDefaultProps , "someNonDefaultActor");
There is also another way where you add a static factory method to the actual actor class and have that return Props
, and you make use of that.
In all of these cases a thing called an IActorRef
is returned. What's that I hear you ask. Well quite simply a IActorRef
is a handle to an actor within the system. If you have a IActorRef
to an actor, you can Tell
/Ask
that actor things. You can of course pass IActorRef
s about.
The other thing to note with the Props
class, is that it acts as a way of capturing (via closures that capture the input parameters the 1st time the actor is created) the input parameters, such that the Props
also act as a factory for recreating the actor should it die (either by programatic choice or via a supervising strategy).
We have seen how to create actors in the overall Akka.NET system, and I also elluded to the fact that you could create actors within the context of other actors. So how do you do that. Well it is quite simple, and it builds on the Props
stuff we just covered. Here is an example:
public class TailCoordinatorActor : UntypedActor
{
protected override void OnReceive(object message)
{
if (message is StartTail)
{
var msg = message as StartTail;
Context.ActorOf(Props.Create(() => new TailActor(msg.ReporterActor, msg.FilePath)));
}
}
}
The important line is this one
Context.ActorOf(Props.Create(() => new TailActor(msg.ReporterActor, msg.FilePath)));
As stated above when you use the Conntext.ActorOf(..)
you are creating a new actor in the context of this actor. That means the newly created actor is a child of the current actor. As such, it becomes job of the current actor to "Supervise" this new child actor. We will talk more about this next.
Within Akka.NET we now know that we can create our own heirarchies of actors. The leaf actors (subordinate(s)) will be supervised by their parents and so on, all the way up through the kka.NET top level system actors (the "Guardians")
When a subordinate detects a failure (i.e. throws an exception), it suspends itself and all its subordinates and sends a message to its supervisor, signaling failure. Depending on the nature of the work to be supervised and the nature of the failure, the supervisor has a choice of the following four options:
- Resume the subordinate, keeping its accumulated internal state
- Restart the subordinate, clearing out its accumulated internal state
- Stop the subordinate permanently
- Escalate the failure to the next parent in the hierarchy, thereby failing itself
It is important to always view an actor as part of a supervision hierarchy, which explains the existence of the fourth choice (as a supervisor also is subordinate to another supervisor higher up) and has implications on the first three: resuming an actor resumes all its subordinates, restarting an actor entails restarting all its subordinates (but see below for more details), similarly terminating an actor will also terminate all its subordinates. It should be noted that the default behavior of the PreRestart hook of the Actor class is to terminate all its children before restarting, but this hook can be overridden; the recursive restart applies to all children left after this hook has been executed.
Each supervisor is configured with a function translating all possible failure causes (i.e. exceptions) into one of the four choices given above;
http://getakka.net/docs/concepts/supervision
In Akka.NET there are 2 classes of supervision strateegy, which are as follows:
- OneForOneStrategy : This means that only the failed child would get the parental directive (see above) applied to it
- AllForOneStrategy : This means that all children would the get the parental directive (see above) applied to it
So that is the theory, so how does this translate to code. Each actor will allow you to override the supervising strategy (the default is OneForOneStrategy
), as shown below. I think the code block below is fairly self explanatory.
Lets see an example.
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy (
10,
TimeSpan.FromSeconds(30),
x =>
{
if (x is ArithmeticException) return Directive.Resume;
else if (x is NotSupportedException) return Directive.Stop;
else return Directive.Restart;
});
}
As stated already an IActorRef
is an astracted handle to an actual actor. By using the IActorRef
you can perform all the common requirements you could expect from a Akka.NET actor.
Shown below is an example of what sort of methods are available to you:
There are a few special IActorRef
(s) that you should be aware of.
Self
: That is the current actor, and is only valid within an actor Sender
: That is the source of a message, and is only valid within an actor
Whilst I highly recommend that you ensure that you always have the correct IActorRef
at hand, should you wish to send a message to it. With possibly deep/complex heirarchies this may not always be possible. But fear not Akka.NET has you covered there. Recall I stated that there were several Akka.NET level actors which you could not use, known as "Guardians".Well these guys form a navigation/supervision system. So you may use thier Path
s along with any other actors/child actors you may have created, which allow to get an IActorRef
based on what is known as an "actor selection" string.
Here is an example:
Context.ActorSelection("akka://MyActorSystem/user/validationActor").Tell(message);
Part of that Path ("MyActorSystem
") comes from the place where you created the overall Akka.NET actor system, which was like this for the demo
MyActorSystem = ActorSystem.Create("MyActorSystem");
The next part is one of the special guardians path values ("user
") within the overall Akka.NET actor system. From there it is just a case of specifying the IActorRef
we want.
You can read more about paths/selection and guardians within Akka.NET here: http://getakka.net/docs/concepts/addressing
Ok so we have covered a bit of ground so far, we now know the following:
- There is a concept of a overall system, with guradians and our own actors in it
- There is a concept of a supervisor (and if none is provided one of the guradians will take this job on)
- We can use
IActorRef
(s) to do things with actors.
So its about time we did something with an actor isn't it. So lets do this. Lets see how we deal with sending (Tell
) a message and receiving that within the destination actor.
The act of sending a message in Akka.NET is done using a IActorRef.Tell(..)
. An example of which is as follows:
consoleReaderActor.Tell(ConsoleReaderActor.StartCommand);
So now let' see how the receiver deals with this.
NOTE : I am using UntypedActor
here but there is also a TypedActor
if you prefer to use that, which allows you to use a generic Receive<T>(..)
methods within the constructor of your actor (Say. There are a whole bunch of other life cycle methods that you may override in an actor, I will not be covering those though, I will leave that as an excercise for the reader) instead of having just one single overriden OnReceive(..)
method, where you will neeed to decode the incoming messages seen.It is a matter of taste.
class ConsoleReaderActor : UntypedActor
{
public const string StartCommand = "start";
protected override void OnReceive(object message)
{
if (message.Equals(StartCommand))
{
....
}
}
}
Although I do not cover this in the demo app presented here, providing you have a valid IActorRef
you may of course ask it things.
This is done using the Ask()
method.
Within Akka.NET there are a few special types of message. The 2 that spring to mind are:
Sending a Akka.Actor.PoisonPill
to an will stop the actor when the message is processed. Akka.Actor.PoisonPill
is enqueued as ordinary messages and
will be handled after messages that were already queued in the mailbox.
Actor.Kill
causes the actor to throw an Akka.Actor.ActorKilledException
when it processes the message, which gets handled using the normal supervisor mechanism, and Akka.Actor.IActorContext.Stop
(Akka.Actor.IActorRef
) which causes the actor to stop without processing any more messages
Each actor in Akka.NET has certain lifecycle events that you may hook into. As such any actor you create you may choose to override and use these lifecycle events. The demo code doesn't cover this, but this diagram should help you to understand how it all works.
CLICK FOR BIGGER IMAGE
You can read more about this here : http://getakka.net/docs/Actor%20lifecycle
Akka.NET has good support for quite a few differrent logging frameworks. These days I tend to use NLog, so that is what I will be showing here. There are several steps you need to follow to get logging workinh, which will go through below
Step 1 : Grab The Correct Packages
The 1st step is to ensure you have the correct logging Dlls installed, which I tend to do through NuGet. So I install this package via NuGet
Step 2 : Ensure App.Config Is Correct
Then we need to make sure our main App.Config logging section is correct, which for the demo is as follows:
="1.0"="utf-8"
<configuration>
<configSections>
<section name="nlog" type="NLog.Config.ConfigSectionHandler, NLog"/>
</configSections>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<targets>
<target name="default" xsi:type="File"
fileName="C:\temp\WinTail.txt"
archiveFileName="C:\temp\WinTailArchives\WinTail-log.{#}.txt"
archiveEvery="Day"
archiveNumbering="Rolling"
maxArchiveFiles="7" />
<target name="errors" xsi:type="File"
fileName="C:\temp\WinTail_Errors.txt"
archiveFileName="C:\temp\WinTailArchives\WinTail_Errors.{#}.txt"
archiveEvery="Day"
archiveNumbering="Rolling"
maxArchiveFiles="7" />
</targets>
<rules>
<logger name="*" writeTo="default" />
<logger name="*" minlevel="Error" writeTo="errors" />
</rules>
</nlog>
</configuration>
Step 3 : Create The Logger File
We then need to create a logger that may be used with Akka.NET, which suprisre suprise is itself an Actor. Let' see that now shall we:
using System;
using Akka.Actor;
using Akka.Event;
using NLog;
using NLogger = global::NLog.Logger;
namespace WinTail
{
public class NLogLogger : ReceiveActor
{
private readonly ILoggingAdapter log = Context.GetLogger();
private static void Log(LogEvent logEvent, Action<NLogger> logStatement)
{
var logger = LogManager.GetLogger(logEvent.LogClass.FullName);
logStatement(logger);
}
public NLogLogger()
{
Receive<Error>(m => Log(m, logger => logger.ErrorException(m.Message.ToString(), m.Cause)));
Receive<Warning>(m => Log(m, logger => logger.Warn(m.Message.ToString())));
Receive<Info>(m => Log(m, logger => logger.Info(m.Message.ToString())));
Receive<Debug>(m => Log(m, logger => logger.Debug(m.Message.ToString())));
Receive<InitializeLogger>(m =>
{
log.Info("NLogLogger started");
Sender.Tell(new LoggerInitialized());
});
}
}
}
Step 4 : Include An Akka.Config (HOCON) file
We then need to create an Akka.NET config file called "akka.config" which should be set to "copy always" to the snure the file is present in the output directory. This file looks like this
akka {
stdout-loglevel = INFO
loglevel = INFO
log-config-on-start = on
loggers = ["WinTail.NLogLogger,AkkaDemo.WinTail"]
}
And is loaded like this
var config =
ConfigurationFactory.ParseString(
File.ReadAllText(Path.Combine(Environment.CurrentDirectory, "akka.config")));
MyActorSystem = ActorSystem.Create("MyActorSystem", config);
Step 5 : Let The Logging Commence
Now with all that good stuff in place all that is left to be done is do some actual logging. Here is how we do that:
using System;
using Akka.Actor;
using Akka.Event;
namespace WinTail
{
class ConsoleWriterActor : UntypedActor
{
private readonly ILoggingAdapter log = Context.GetLogger();
protected override void OnReceive(object message)
{
if (message is Messages.InputError)
{
var msg = message as Messages.InputError;
log.Error("Messages.InputError seen : " + msg.Reason);
....
}
else if (message is Messages.InputSuccess)
{
var msg = message as Messages.InputSuccess;
log.Info("Messages.InputSuccess seen : " + msg.Reason);
....
}
else
{
log.Info(message.ToString());
....
}
....
}
}
}
In order to test Akka.NET we must install a few more libraries, namely:
- Akka.TestKit
- Akka.TestKit.NUnit
Both of which are available as Nuget packages. So once you have those we can create a simple test (I am using NUnit here), where the test may look something like this:
[TestFixture]
public class TailActorTests : TestKit
{
[Test]
public void Show_NotifyTailActor_When_Change_Is_Made_To_File()
{
string fullFilePath = @"c:\temp\AkkaTest.txt";
FileObserver fileObserver = new FileObserver(TestActor, fullFilePath);
fileObserver.Start();
File.WriteAllText(fullFilePath, "A test message from TailActorTests" +
DateTime.Now.ToString());
ExpectMsg<TailActor.FileWrite>(x => fullFilePath.Contains(x.FileName) );
}
}
There are a couple of things to note here, such as:
- We inherit from a special base class called "
TestKit
" this is a special Akka.NET base class that allows things such as the ExpectMsg<T>(Predicate<T>> pred)
that you see above - There is a special
TestActor
which you may make use of - As just stated we can use a
ExpectMsg<T>(Predicate<T>> pred)
helper methods to see if certain messages were seen by the TestActor
As I stated a while ago the demo app has been pretty much lifted from the Akka.NET training material. Let's just go through what the demo app does:
- There is a
ConsoleReaderActor
, which expects the user to type the name of an open text file to monitor for changes. - There is a
ValidationActor
, which verifies whether the user typed a correct file name in - There is a supervising
TailCoordinatorActor
, which gets a input file, and sets up a TailActor to starts listening for changes to the input file - There is a
TailActor
, which will listen for changes in the input file - There is a
ConsoleWriterActor
, which will write out any changes observed (Such as new text being input into the orginal text file that is being tailed) to the console.
Steps to run the demo app:
- Create a text file somewhere. Open this text file
- Run the demo app code. When asked, tell it the path of the text file you created
- Edit the text file you created, save it after each edit, and then watch the actor system demo show you the changes
That's is all I wanted to say this time, I hope it may has been of interest to at least a few of you. As always if you like what you have seen, please take a minute to cast a vote, or leave a comment this are always appreciated