April 27, 2019

Tasks and Exceptions (C#.NET)

Kill Tasks and handle Exceptions

Check out the basic article about Tasks and Async if you haven't already.

To Infinity... and Beyond!

Now to get into some serious parallelism. Imagine we have a list of projects that all need to be processed in some way AND we want to calculate pi (once). We can do that... with our old friend ContinueWith and a List<Task<Task>>:

//starting by creating a collection of tasks that we need to keep track of...
var tasks = new List<Task<Task>>()
foreach(Project p in projectList){
    tasks.Add(Renderer.GaussianMapAsync(p.newdata)
        .ContinueWith((rendering)=>{
            p.rendering = rendering;
            ProjectDAL.SaveAsync(p);
        )
    );
}
//below code will run simultaneously to above
var pi = slowPiCalculator(40000);
//wait for ALLL the tasks to finish:
foreach(Task<Task> tt in tasks){
    await await tt;
}

The above is NOT great code. It will do the job, sure, but man it is ugly.

Here's a better solution taking advantage of LINQ and async lambdas.

var tasks = projectList.Select(async (p)=>{
    p.rendering = await Renderer.GaussianMapAsync(p.newdata);
    await ProjectDAL.SaveAsync(p);
});
var pi = slowPiCalculator(40000);
await Task.WhenAll(tasks);

Even though the above is neater, consider if a lambda is necessary - C#7 introduced inline functions, which can sometimes help readability without exposing a function to the rest of the class in the way a new private method would:

async Task renderAndSaveAsync(Project p) {
    p.rendering = await Renderer.GaussianMapAsync(p.newdata);
    await ProjectDAL.SaveAsync(p);
}
var tasks = projectList.Select(renderAndSaveAsync)
var pi = slowPiCalculator(40000);
await Task.WhenAll(tasks);

What happens if there's an exception?

If there is an exception inside a Task, the exception will not be thrown until you "await" the task.

You can absolutely await a task inside a try block and catch the exception of the task. You can also check if a Task has "Faulted" (i.e. will throw an exception on await) by checking out it's "Status" property, and retrieve it's exception by

  • awaiting the Task and capturing the exception or
  • reading the "Exception" property of the Task.

Awaiting on Task.WhenAll will cause an AggregateException to be thrown if any of the tasks in the WhenAll list have an exception to throw.


Note

ContinueWith() will still trigger if an exception occurs in the Task it continues.

If any task waited on with WhenAll() throws an exception, WhenAll() will still wait for all other tasks to complete (or fault) before it throws the AggregateException.


How To Stop Running Tasks

Task.Run can accept a cancellation token as part of it's argument list. On the surface of it, this promises to cancel a running task if we ask it to... but it doesn't.

Calling cancel() on the CancellationTokenSource for the token you pass to Task.Run will not cancel the task for you. CancellationTokenSource/CancellationToken's only job is to provide a thread-safe construct for you to check if a cancellation was called.

Because WhenAll will not throw it's exception until all Tasks have completed, we can't simply wrap a try/catch around the WhenAll call and cancel the token if there is an exception. This becomes the responsibility of each Task being run.

Here is an example on how to use CancellationToken to stop all Tasks when any the database throws an exception.

async Task renderAndSaveAsync(Project p, CancellationTokenSource tokenSource) {
    try{
		p.rendering = await Renderer.GaussianMapAsync(p.newdata, token);
    	await ProjectDAL.SaveAsync(p, token);
	}
	catch{
		tokenSource.cancel();
	}
}
decimal slowPiCalculator(int limit, CancellationToken tokenSource)
    => 2 + 2 / piWorker(limit,1, tokenSource);

decimal piWorker(int limit, decimal start, CancellationToken tokensource){
    if(tokensource.token.IsCancellationRequested){
        tokensource.token.ThrowIfCancellationRequested();
    }
    if(limit===0){
        return start;
    }
   try{
    return 1 + 1 / (1 / (start) + piWorker(limit-1, start+1, token));
   }
   catch{
        tokensource.Cancel();
        throw;
   }
}
var canceller = new CancellationTokenSource();
List<Task> tasks = projectList.Select(
    (p) => Task.Run(renderAndSaveAsync(p, canceller), canceller.Token) );
Task<string> piTask = Task.Run(
    ()=>slowPiCalculator(40000, canceller.Token), canceller.Token);
tasks.Add(piTask);
string pi = null;
try
{
    await Task.WhenAll(tasks);
    pi = await piTask;
}
catch
{
    throw;	//most likely an AggregateException containing
            //one or more InnerExceptions from some of the Tasks in list.
            //calling cancel here would only trigger once all Tasks complete.
}

In above example, if we needed to rollback the database changes, we could do so by constructing a single transaction around the WhenAll and passing the transaction through to the renderAndSaveAsync database methods.

I wish there was a nicer way to get Tasks to cancel themselves. The above method is an infuriatingly manual process, but at least the lack of a black box does make the behaviour clear.

The method of calculating pi is based on Thomas J. Pickett and Ann Coleman's continued fraction published in American Mathematical Monthly (2008).