User Tools

Site Tools


notes:csharp:multithreading

Multithreading & Asynchronous Programming in C#

I/O-bound code & network-bound operations

  • Use asynchronous programming: async/await.
  • Await an operation which returns a Task or Task<T> inside of an async method.

CPU-bound operations

  • Use multi-threading and parallel programming.
  • Await an operation which is started on a background thread with the Task.Run method.

Multiple threads are best for tasks that require different resources such as file handles and network connections. Assigning multiple threads to a single resource is likely to cause synchronization issues and having threads frequently blocked when waiting for other threads which defeats the purpose of using multiple threads. Other use cases for multi-threaded code are long-running, CPU-bound processes.

Rules:

  • Don't create synchronous methods that block waiting for asynchronous work to complete.
    • Different exception semantics.
    • Possible deadlocks.
    • Resource consumption.
  • Avoid async methods that wrap synchronous long-running CPU-bound operations.

Not all asynchronous tasks start a new thread. For example, file I/O is asynchronous, but uses I/O completion ports rather than threads. Web requests are asynchronous, but use network interrupts rather than threads. In these cases, using async frees a thread to do useful work.

Terminology

  • Thread - the basic unit to which an operating system allocates processor time. Each application is assigned at least one thread.
  • Process - operating systems use processes to separate applications that they are executing.
  • Multi-threaded code - multiple treads executing inside one process.
  • Thread context - thread information managed by an operating system and used when switching between threads.
  • Deadlocks - a problem in multi-threaded code that happens when two threads stop responding while each waits for the other to complete.
  • Race conditions - a synchronization problem in multi-threaded code that happens when anomalous result occurs due to an unexpected dependence on the timing of two events.
  • Thread-safe code - code is considered thread-safe if it functions correctly during simultaneous execution by multiple threads.
  • Thread pool - a collection of worker threads, managed by the system, that efficiently execute parallel code on behalf of the application. The thread pool is primarily used to reduce the number of application threads and provide management of the worker threads.
  • Task parallelism - occurs when more than one independent task is running at the same time.
  • Context-free code - code that can run in any synchronization context.
  • Context-aware code - code that must run in a specific context. Examples: code in GUI app that interacts with UI controls.

Async/Await

An object of the class Task<T> is an operation that will have a result of type T at some point in the future. It is a promise of a T when the awaited operation completes.

The await keyword yields control to the caller of the method that performed await, and it ultimately allows a UI or a service to be responsive.

  • If await is applied to the result of a method call that returns Task<TResult>, the type of the await expression is TResult.
  • If await is applied to the result of a method call that returns Task, the type of the await expression is void.
  • The SynchronizationContext class ensures that when an asynchronous method resumes after an awaited task completes, the environment and the context are restored to the state when the awaited task paused.
  • The synchronization context determines how the remaining work (i.e., the work after an awaited task completes) is scheduled: either on the single thread on the context or on a different thread.

An async method must return a type that conforms to the Awaiter pattern: it must have a GetAwaiter method that returns an object implementing the INotifyCompletion and ICriticalNotifyCompletion interfaces. The GetAwaiter method may be provided by an extension method.

Example: An async method:

using System.Net.Http;
...
// GetContentLengthAsync returns the length of contents of a web page specified as the parameter url.
async Task<int> GetContentLengthAsync(string url)  
{
    var client = new HttpClient();  
    Task<string> task = client.GetStringAsync(url);  
 
    // Perform work that doesn't rely on the result from GetStringAsync.  
    DoIndependentWork();  
 
    // The await operator suspends GetContentLengthAsync.     
    //  - GetContentLengthAsync can't continue until the task is complete.      
    //  - Meanwhile, control returns to the caller of GetContentLengthAsync.
    //  - Control resumes here when the task is complete.       
    //  - The await operator then retrieves the string result from the task.
    string urlContents = await task;  
 
    // Any methods that are awaiting GetContentLengthAsync retrieve the integer value.
    return urlContents.Length;  
}  

If you do not need to perform any independent work, you can simplify the above code:

string urlContents = await client.GetStringAsync();

Example (I/O-bound operation): Download data from a web service without blocking the UI thread:

private readonly HttpClient client = new HttpClient();
 
DownloadButton.Clicked += async (o, e) =>
{
    // Yield control to the UI thread.
    var data = await client.GetStringAsync(url);
    DoSomethingWithData(data);
};

Example: Different result types from an awaited method:

// await keyword used with a method that returns a Task<TResult>.  
TResult result = await AsyncMethodThatReturnsTaskTResult();  
 
// await keyword used with a method that returns a Task.  
await AsyncMethodThatReturnsTask();  
 
// await keyword used with a method that returns a ValueTask<TResult> (.NET Core / C# 7.0)
TResult result = await AsyncMethodThatReturnsValueTaskTResult(); 

The .NET Framework simplifies many operations to allow methods to be async. Examples of these operations are:

  • Saving state.
  • Saving the parameters of your method.
  • Saving local variables and any other variables in scope.

The .NET Framework saves the location in your method where the asynchronous thread starts to be able to resume back from that location.

The .NET Framework saves context of the current thread which includes:

  • Execution Context
  • Security Context
  • Call Context

Async Errors

[Source: “More Effective C#” by Bill Wagner]

High-level constructs like iterator methods and async methods rearrange your code, and change when errors are reported.

Report an error in the async code as soon as possible. In the following example, we split the async method into two methods:

  • The public wrapper method that returns a Task or ValueTask. It does not have the async modifier.
  • The implementation method performs the actual async work. It has the async modifier.
// The wrapper method performs the validation and eagerly reports any errors.
public Task<string> LoadMessageFinal(string userName)
{
    if (string.IsNullOrWhiteSpace(userName))
        throw new ArgumentException(message: "This must be a valid user", paramName: nameof(userName));
 
    return LoadMessageImpl();
 
    // The implementation method should have the most limited scope possible. We use a local function.
    async Task<string> LoadMessageImpl()
    {
        var settings = await context.LoadUser(userName);
        var message = settings.Message ?? "No message";
 
        return message;
    }
}

Always await tasks you start. It allows you to catch exceptions thrown from an async method.

  • Async methods report exceptions through the Task object.
  • When an exception is thrown, the Task enters the faulted state.
  • When you await a faulted task, the await expression throws the exception.
  • When you await a task that faults later, the exception is thrown when the method is scheduled to resume.
  • Exceptions are stored in a Task's AggregateException object.
  • When a faulted Task is awaited, the await expression throws the first exception in the AggregateException object.

Never write async void methods. The async void methods cannot be awaited. It means that the methods that started the asynchronous work won't be able to catch the exceptions thrown by async methods. Also, there is no way for a caller to determine when the async void method completes.

An exception from the above rule are event handlers which may be async void. Remember, async void event handlers must not throw any exceptions. You may want to catch all possible exceptions in such event handlers unless you want to allow the exception to terminate the application.

Example: Log everything, throw the exception, and make the system abort the thread:

private async void OnCommand(object sender, RoutedEventArgs args)
{
    try
    {
        await SomeMethod();
    }
    // Rethrow the exception to cause the synchronization context to stop execution.
    catch(Exception exc) when (LogMessage(exc))
    {
    }
}
 
private bool LogMessage(Exception exc)
{
    ... log the exception
    return false;
}

Example: Recover from some exceptions but not others. For example, you may be able to recover from a FileNotFoundException, but no others:

public static async void FireAndForget<TException>
    (this Task task,
    Action<TException> recovery,
    Func<Exception, bool> onError)
    where TException : Exception
{
    try
    {
        await task;
    }
 
    // Relies on onError() logging method always returning false.
    catch (Exception ex) when (onError(ex))
    {
    }
    catch (TException ex2)
    {
        recovery(ex2);
    }
}

Task

In .NET Framework 4, a new programming model based on the tasks rather than threads was introduced. A task resembles a thread or a ThreadPool work item, but at a higher level of abstraction without going into the details of working with a thread.

The Task Parallel Library (TPL) is the set of public types and APIs in the System.Threading and System.Threading.Tasks namespaces.

The TPL:

  • scales the degree of concurrency dynamically to most efficiently use the available processors
  • handles the partitioning of the work
  • manages scheduling of the threads on the ThreadPool
  • supports cancellation
  • manages state, e.g. IsCompleted, IsCanceled, IsFaulted, Status, AsyncState

The Task class represents a single operation that does not return a value and that usually executes asynchronously. The work performed by a Task typically executes on a managed ThreadPool thread rather than synchronously on the main application thread.

  • Use the Task<TResult> class for operations that return values,
  • Use the Task.Run method to run code on a separate thread. It takes a Func<Task> or Action as input. It also has overloads for cancellation.

Example: Specify the work that the task is to perform using a lambda expression:

using System;
using System.Threading.Tasks;
...
Task t = Task.Run(() => { ... some work to do ... });
t.Wait();

Example of a CPU-bound operation: Perform a calculation when a user presses a button. We start a background thread using Task.Run and await its result:

CalculateButton.Clicked += async (o, e) =>
{
    // Yield control to the UI while Calculate performs its work.
    var result = await Task.Run(() => Calculate());
    // ... do something with the result
};
 
private int Calculate()
{
    var result = 0;
    // ... perform calculations
    return result;
}
  • Task.Wait - awaits the completion of one or more tasks. Used to synchronize the execution of the calling thread and the asynchronous tasks. A call to the Wait method blocks the calling thread until a task completes.
  • Task.WhenAll - creates a task that completes when all of the tasks passed in the array parameter have completed.
  • Task.WhenAny - creates a task that completes when any of the supplied tasks have completed.

Wait for an asynchronous method to complete:

Task.Run(async () => { await Initialize(); }).Wait();
...
private async Task Initialize()
{
    // ... asynchronous code goes here
}

Wait until all tasks are completed (example #1):

var tasks = new Task[]
{
    Task.Run(() =>
    {
        var data1 = GetData1();
        // ... do something with data1
    }),
    Task.Run(() =>
    {
        var data2 = GetData2();
        // ... do something with data2
    }),
    Task.Run(() =>
    {
        var data3 = GetData3();
        // ... do something with data3
    })
};
await Task.Run(() => Task.WaitAll(tasks));
...
string GetData1() { return "A"; }
string GetData2() { return "B"; }
string GetData3() { return "C"; }  

Wait until all tasks are completed (example #2):

// Start three new tasks.
var task1 = Task.Factory.StartNew(AddItem);
var task2 = Task.Factory.StartNew(AddItem);
var task3 = Task.Factory.StartNew(AddItem);
 
// The order of tasks does not matter because the order of execution is not known.
Task.WaitAll(task3, task2, task1);

Create a task that completes when all the tasks it is watching have completed:

public async Task<IEnumerable<Result>> DoWork(IEnumerable<string> symbols)
{
    var tasks = new List<Task<Result>>();
    foreach (var symbol in symbols)
    {
        tasks.Add(ReadSymbol(symbol));
    }
 
    // The result from Task.WhenAll is an array of the completed (or faulted) tasks.
    var results = await Task.WhenAll(tasks);
    return results.OrderBy(s => s.Id);
}

Task & Thread Examples

Access the current thread:

using System.Threading; 
...
public static void Main()
{
    // Obtain the current running thread.
    Thread current = Thread.CurrentThread;
}

Return a stub task from an async method:

public Task LoadAsync()
{
    return Task.Delay(0);
}

Simulate a task running for 5 seconds. This approach may be useful in unit testing to ensure that your task finishes asynchronously.

public async void RunForFiveSeconds()
{
    // await unblocks the main thread.
    await Task.Delay(TimeSpan.FromSeconds(5));
}

Return a collection from an asynchronous method GetNamesAsync. Four versions of the GetNamesAsync method are presented:

var names = await GetNamesAsync();
...
// Method #1
public async Task<List<string>> GetNamesAsync()
{
    await Task.Yield();
    return new List<string>()
    {
        "AAA",
        "BBB",
        "CCC"
    };
}
 
// Method #2
public Task<List<string>> GetNamesAsync()
{
    return Task.Run(() =>
    {
        var names = new List<string>();
        names.Add("AAA");
        names.Add("BBB");
        names.Add("CCC");
        return names;
    });
}  
 
// Method #3
public Task<List<string>> GetNamesAsync()
{
    var names = new List<string>();
    names.Add("AAA");
    names.Add("BBB");
    names.Add("CCC");
    return Task.FromResult(names);
}
 
// Method #4
public async Task<List<string>> GetNamesAsync()
{
    var task = Task.Run(() => GetNames());
    List<string> names = await task;
    return names;
}
 
private List<string> GetNames()
{
    var names = new List<string>();
    names.Add("AAA");
    names.Add("BBB");
    names.Add("CCC");
    return names;
}

Execute a long running “fire-and-forget” method:

await DoWork();
...
private async Task DoWork()
{
    await Task.Run(() =>
    {
        // A long running task goes here.
        for (int i = 0; i < 1000000000; ++i)
        {
            //...
        }
    });
}

ValueTask

ValueTask<T> is a value type. It is useful in scenarios where an async method retrieves cached results.

Example [Source: “More Effective C#” by Bill Wagner]: A method that caches weather data:

private List<WeatherData> _recentObservations = new List<WeatherData>();
private DateTime _lastReading = ...;
private TimeSpan _startDate = ...;
private TimeSpan _endDate = ...;
...
// The method is not async, but rather returns a ValueTask.
public ValueTask<IEnumerable<WeatherData>> RetrieveHistoricalData()
{
    // ??? In the book the condition is '>'. I think it should be '<'.
    if (DateTime.Now - _lastReading < TimeSpan.FromMinutes(5))
    {
        return new ValueTask<IEnumerable<WeatherData>>(_recentObservations);
    }
    else
    {
        // The nested function performs the async work. It has the async modifier. 
        // It means your program doesn't do the extra state machine management and 
        // allocation if the cache is valid. 
        // InvalidateCache refreshes the cache.
        async Task<IEnumerable<WeatherData>> InvalidateCache()
        {
            _recentObservations = new List<WeatherData>();
 
            var observationDate = _startDate;
            while (observationDate < _endDate)
            {
                var observation = await RetrieveObservationData(observationDate);
                recentObservations.Add(observation);
                observationDate += TimeSpan.FromDays(1);
            }
            _lastReading = DateTime.Now;
 
            return _recentObservations;
        }
 
        // ValueTask has a constructor that takes a Task as its argument. It will do the await internally.
        return new ValueTask<IEnumerable<WeatherData>>(InvalidateCache());
    }
}

Use ValueTask<T> if memory allocations for Task objects create bottlenecks in your code.

Locking

When two threads simultaneously encounter a lock, one thread waits until the lock becomes available.

It is important to use the lock with a reference object that is private to the class, otherwise:

  • a value type would get boxed each time the lock is acquired
  • this variable might be used by other code to create a lock, causing a deadlock
  • a string variable might exists in several copies created by the compiler, so called string-interning (the compiler may create one object for several strings that have the same content)

The lock statement is a shortcut for calling the Enter and Exit methods of the System.Thread.Monitor class.

Example: Synchronize access to a block of code using the C# lock statement:

using System.Threading;
...
static void Main(string[] args)
{
    // Create an instance of WorkerClass.
    WorkerClass worker = new WorkerClass();
 
    // Create three secondary threads.
    Thread thread1 = new Thread(() => worker.Count());
    Thread thread2 = new Thread(() => worker.Count());
    Thread thread3 = new Thread(() => worker.Count());
 
    // Start all three threads.
    thread1.Start();
    thread2.Start();
    thread3.Start();
}
 
class WorkerClass
{
    // The object for the lock is private to the class.
    private object _locker = new object();
 
    public void Count()
    {
        // Only one thread at a time can execute this block of code.
        // In other words - the access to the block of code is synchronized.
        lock (_locker)
        {
            for (int i = 1; i < 10; i++)
            {
                Console.Write(i + " ");
                Thread.Sleep(500);
            }
            Console.WriteLine("");
        }
    }
}

Output:

1 2 3 4 5 6 7 8 9
1 2 3 4 5 6 7 8 9
1 2 3 4 5 6 7 8 9

Example: Add items to a dictionary in a thread-safe manner. Note that this is just an example of using the lock statement. Use the ConcurrentDictionary class for the same functionality.

class Program
{
    static object _locker = new object();
    static Dictionary<int, string> items = new Dictionary<int, string>();
 
    static void Main(string[] args)
    {
        // Start 5 new tasks.
        var task1 = Task.Factory.StartNew(AddItem);
        var task2 = Task.Factory.StartNew(AddItem);
        var task3 = Task.Factory.StartNew(AddItem);
 
        // The order of tasks does not matter because we don't know what the order of execution will be.
        Task.WaitAll(task3, task2, task1);
    }
 
    private static void AddItem()
    {
        // Lock the collection when adding items.
        lock (_locker)
        {
            Console.WriteLine("Lock1 acquired by " + Task.CurrentId);
            items.Add(items.Count, "Test #" + items.Count);
        }
 
        // Lock the collection when accessing items.
        Dictionary<int, string> dict;
        lock (_locker)
        {
            Console.WriteLine("Lock2 acquired by " + Task.CurrentId);
            dict = items;
        }
 
        // Show the items in the the collection.
        lock (_locker)
        {
            foreach (var item in dict)
            {
                Console.WriteLine($"{item.Key}: {item.Value}");
            }
        }
    }
}
Lock1 acquired by 1
Lock2 acquired by 1
0: Test #0
Lock1 acquired by 2
Lock2 acquired by 2
0: Test #0
1: Test #1
Lock1 acquired by 3
Lock2 acquired by 3
0: Test #0
1: Test #1
2: Test #2

Interlocked Class

The Interlocked class offers methods that change a variable value in a thread-safe manner:

  • Increment - increments a variable
  • Decrement - decrements a variable
  • Exchange - switches values
  • CompareExchange - verifies the current value and then changes it to the new value in one atomic operation; thanks to that, no other thread can change the value between comparing and exchanging it.

The Interlocked class guarantees that the increment and decrement operations are executed atomically.

using System;
using System.Threading;
 
namespace CSharpTest
{
    class Program
    {
        public static void Main()
        {
            Console.WriteLine("1. Create an instance of Counter.");
            Counter counter = new Counter();
            Console.WriteLine("");
 
            Console.WriteLine("2. Create four threads.");
            Thread thread1 = new Thread(new ThreadStart(counter.Increase));
            Thread thread2 = new Thread(new ThreadStart(counter.Increase));
            Thread thread3 = new Thread(new ThreadStart(counter.Decrease));
            Thread thread4 = new Thread(new ThreadStart(counter.Decrease));
            Console.WriteLine("");
 
            Console.WriteLine("3. Start the threads.");
            thread1.Start();
            thread2.Start();
            thread3.Start();
            thread4.Start();
 
            Console.ReadKey();
        }
    }
 
    // The 'count' parameter is sent by reference as it needs to be modified 
    // by Interlocked.Increment and Interlocked.Decrement
    internal class Counter
    {
        private long count = 0;
 
        public void Increase()
        {
            Interlocked.Increment(ref count);
            Console.WriteLine("   Counter = {0}", count);
        }
 
        public void Decrease()
        {
            Interlocked.Decrement(ref count);
            Console.WriteLine("   Counter = {0}", count);
        }
    }
}

Output:

1. Create an instance of Counter.
 
2. Create four threads.
 
3. Start the threads.
   Counter = 1
   Counter = 2
   Counter = 1
   Counter = 0

Parallel Class

Considerations for running parallel code:

  • How many processors / cores are used?
  • How many tasks we want to run in parallel?
  • How to make the variables thread safe?
  • How to track the progress and results of the processes?

The System.Threading.Tasks.Parallel class allows you to execute tasks in parallel using threads. It handles many of the concerns you would have when using threads:

  • It manages scheduling of the tasks.
  • It manages thread creation while using an appropriately sized thread pool.
  • It makes decisions on how to distribute the workload to various processors/processes.
  • If an error happens on one task, the rest will continue.
  • It reports progress on the whole collection of tasks.

The Parallel class provides support for parallel loops and regions. It has library-based data parallel replacements for common operations such as for loops, foreach loops, and execution of a set of statements. It is also thread-safe since all public and protected members of Parallel class are thread-safe and may be used concurrently from multiple threads.

Considerations for code using Parallel:

  • As with any parallel code, make sure you have multiple CPU-bound processes, otherwise gains will be limited.
  • Parallel helps when you have many processes to run.
  • Parallel is good for bulk operations with similar characteristic – for instance, parsing many log files.
  • Try to structure clean units of work. Think how to achieve isolation between tasks.
  • Try not to create dependencies on order of operation.
  • Decide on a strategy for handling errors.

The Parallel.Invoke method executes each of the provided actions, possibly in parallel. The method takes as a parameter a list of Actions to execute in parallel. No guarantees are made about the order in which the operations execute or whether they execute in parallel. It does not return until each of the provided operations has completed, regardless of whether completion occurs due to normal or exceptional termination.

Parallel.Invoke optimizes scheduling and the number of threads used to execute the provided actions. You simply express which actions you want to run concurrently, and the runtime handles all thread scheduling details, including scaling automatically to the number of cores on the host computer. It also allows simultaneous launch and management of different types of delegates.

The Parallel.For method runs a certain number of iterations in parallel for a delegate. You can pass different parameters for the delegate for each iteration. You can monitor and manipulate the state of the loop through the Monitor static class accessible in the delegates body. the state property allows you to know which iteration is executing for debugging and logging purposes. Overloads of the method may allow more than one delegate for more advanced use cases.

The Parallel.ForEach method processes a delegate for each item in a collection.

Run code in parallel:

public void RunParallel()
{
    Action m1 = () => LongRunningCpuBoundMethod1();
    Action m2 = () => LongRunningCpuBoundMethod2();
 
    Parallel.Invoke(m1, m2);           
}

Perform ten parallel iterations:

using System.Threading;
using System.Threading.Tasks;
...
Parallel.For(0, 10, i =>
{
    Console.WriteLine(i); // 0 1 2 3 4 5 6 7 8 9
    Thread.Sleep(1000);
});

Process a rectangular array of size W by H. The outer loop iterates over columns (W 'width' is the number of columns) and the inner loop iterates over rows (H 'height' is the number of rows). It is opposite to what we would do if we processed an array sequentially.

Parallel.For(0, W, (int x, ParallelLoopState state) =>
{
    for (int y = 0; y < H; y++)
    {
        // Test is a method we call on each value of x and y.
        d[x, y] = Test(x, y);
    }
 
    // If you want to stop Parallel.For use state.Stop()
});

You can stop a parallel loop:

  • using the ParallelLoopState.Break method which ensures that all currently running iterations are finished
  • using the ParallelLoopState.Stop method which terminates currently running iterations

Finish all currently running iterations:

ParallelLoopResult result1 =
    Parallel.For(0, 100, (int i, ParallelLoopState state) =>
    {
        if (i == 50)
        {
            Console.WriteLine("Break");
            state.Break();
        }
    });
 
Console.WriteLine("IsCompleted: " + result1.IsCompleted.ToString());
Console.WriteLine("LowestBreakIteration: " +
    (result1.LowestBreakIteration.HasValue ? result1.LowestBreakIteration.ToString() : "NULL"));

Output:

Break
IsCompleted: False
LowestBreakIteration: 50

Terminate currently running iterations:

ParallelLoopResult result2 =
    Parallel.For(0, 100, (int i, ParallelLoopState state) =>
    {
        if (i == 50)
        {
            Console.WriteLine("Stop");
            state.Stop();
        }
    });
 
Console.WriteLine("IsCompleted: " + result2.IsCompleted.ToString());
Console.WriteLine("LowestBreakIteration: " +
    (result2.LowestBreakIteration.HasValue ? result2.LowestBreakIteration.ToString() : "NULL"));

Output:

Stop
IsCompleted: False
LowestBreakIteration: NULL    

PLINQ

For PLINQ, refer here.

notes/csharp/multithreading.txt · Last modified: 2020/11/20 by leszek