Simulating the Task mechanism in .net: exploring the mysteries of asynchronous programming

Using Task in .net makes it easy to write asynchronous programs. In order to better understand Task and its scheduling mechanism, let’s simulate the implementation of Task in order to understand:

  1. What is Task
  2. How tasks are scheduled

Basic Task simulation implementation

Start with the most basic Task usage

Task.Run(Action action)

The function of this command is to submit the action as a task to the scheduler, and the scheduler will arrange idle threads to process it.
We use Job to simulate Task

public class Job
{
    private readonly Action _work;

    public Job(Action work) => _work = work;
    public JobStatus Status { get; internal set; }

    internal protected virtual void Invoke()
    {
        Status = JobStatus.Running;
        _work();
        Status = JobStatus.Completed;
    }

    public void Start(JobScheduler? scheduler = null)
        => (scheduler  JobScheduler.Current).QueueJob(this);

    public static Job Run(Action work)
    {
        var job = new Job(work);
        job.Start();
        return job;
    }
}

public enum JobStatus
{
    Created,
    Scheduled,
    Running,
    Completed
}

The same static Run method as Task is also defined here, and its usage is similar to Task.

Job.Run(() => Console.WriteLine($"Job1, thread:{Thread.CurrentThread.ManagedThreadId}"));

For comparison, the writing method when using Task is as follows, with the addition of the await keyword, which will be discussed later.

await Task.Run(()=>() => Console.WriteLine($"Task1, thread:{Thread.CurrentThread.ManagedThreadId}"));

When the Job.Run method is called, a Job will be created based on the given Action, and then job.Start() will be executed. However, the Job does not start execution immediately. Instead, it is submitted to the scheduler through the QueueJob method. The scheduler determines where the Job should be. When the Job is actually executed, its Invoke method will be called. At this time, the given Action will be executed, and the status of the Job will be modified accordingly, from Running to Completed. To put it simply, the basic working process of .net Task is the same as this rough Job. It can be seen that Task/Job represents an operation with a certain status.

Scheduling based on thread pool

But the execution dependence of Task/Job and the scheduler are simulated here using JobScheduler. .net uses the thread pool-based scheduling strategy by default. We also simulate and implement a ThreadPoolJobScheduler.
First, let’s take a look at JobScheduler. As an abstract base class, its QueueJob method will be implemented by a specific scheduler (ThreadPoolJobScheduler):

public abstract class JobScheduler
{
    public abstract void QueueJob(Job job);
    public static JobScheduler Current { get; set; } = new ThreadPoolJobScheduler();
}

The QueueJob implemented by ThreadPoolJobScheduler is as follows:

public class ThreadPoolJobScheduler : JobScheduler
{
    public override void QueueJob(Job job)
    {
        job.Status = JobStatus.Scheduled;
        var executionContext = ExecutionContext.Capture();
        ThreadPool.QueueUserWorkItem(_ => ExecutionContext.Run(executionContext!,
            _ => job.Invoke(), null));
    }
}

ThreadPoolJobScheduler will submit the job to the thread pool and set the job status to Scheduled.

Use specified thread for scheduling

The Current property of JobScheduler is set to thread-based scheduling by default. If there is another scheduler, it can be replaced, but why should it be replaced? This starts with the limitations of thread-based scheduling. For some tasks with higher priority, adopting this strategy may not be able to meet the needs. For example, when threads are busy, new tasks may not be executed for a long time. For this situation, .net can solve it by setting TaskCreationOptions.LongRunning. For analysis, first use a custom scheduler to solve this problem:

public class DedicatedThreadJobScheduler : JobScheduler
{
    private readonly BlockingCollection<Job> _queues=new();
    private readonly Thread[] _threads;

    public DedicatedThreadJobScheduler(int threadCount)
    {
        _threads=new Thread[threadCount];
        for(int index=0; index<threadCount; index + + )
        {
            _threads[index] =new Thread(Invoke);
        }
        Array.ForEach(_threads, thread=>thread.Start());

        void Invoke(object? state){
            while(true){
                _queues.Take().Invoke();
            }
        }
    }

    public override void QueueJob(Job job)
    {
        _queues.Add(job);
    }
}

When starting the DedicatedThreadJobScheduler, a specified number of threads will be started, and these threads will continuously take out tasks from the queue and execute them.
Next, let’s take a look at how to use .net’s TaskCreationOptions.LongRunning:

await Task.Factory.StartNew(LongRunningMethod, TaskCreationOptions.LongRunning);

static void LongRunningMethod()
{
    // Simulate a long-running operation
    Console.WriteLine("Long-running task started on thread {0}.", Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(10000);
    Console.WriteLine("Long-running task finished on thread {0}.", Thread.CurrentThread.ManagedThreadId);
}

Arrangement of task sequence

When using Task, the await keyword is often used to control the order between multiple asynchronous tasks. Await is actually syntactic sugar. Before understanding await, let’s take a look at the most basic ContinueWith method.

var taskA = Task.Run(() => DateTime.Now);
var taskB = taskA.ContinueWith(time => Console.WriteLine(time.Result));
await taskB;

Imitating the Task, we also add the ContinueWith method to the Job.

public class Job
{
    private readonly Action _work;
    private Job? _continue;

    public Job(Action work) => _work = work;
    public JobStatus Status { get; internal set; }

    internal protected virtual void Invoke()
    {
        Status = JobStatus.Running;
        _work();
        Status = JobStatus.Completed;
        _continue?.Start();
    }

    public void Start(JobScheduler? scheduler = null)
        => (scheduler  JobScheduler.Current).QueueJob(this);

    public static Job Run(Action work)
    {
        var job = new Job(work);
        job.Start();
        return job;
    }

    public Job ContinueWith(Action<Job> tobeContinued)
    {
        if (_continue == null)
        {
            var job = new Job(() => tobeContinued(this));
            _continue = job;
        }
        else
        {
            _continue.ContinueWith(tobeContinued);
        }
        return this;
    }
}

This ContinueWith method will place the next Job to be executed in _continue, so that multiple sequentially executed Jobs will form a linked list.
When the execution of the Invoke method of the current Job ends, the next Job will be triggered to be scheduled.
Usage example:

Job.Run(() =>
{
    Thread.Sleep(1000);
    Console.WriteLine("11");
}).ContinueWith(_ =>
{
    Thread.Sleep(1000);
    Console.WriteLine("12");
});

Further use the await keyword to control

To use await like Task, the Job needs to support the GetAwaiter method. For any type, as long as it has the GetAwaiter method, you can use the await keyword on it.
GetAwaiter can be found in the Task class of c#

public TaskAwaiter GetAwaiter();

Then TaskAwaiter inherits the ICriticalNotifyCompletion interface

public readonly struct TaskAwaiter<TResult> : System.Runtime.CompilerServices.ICriticalNotifyCompletion

Just like a cat and a tiger, add the simplest JobAwaiter to the Job

public class Job
{
    ...

    public JobAwaiter GetAwaiter() => new(this);
}

JobAwaiter is defined as follows:

public struct JobAwaiter : ICriticalNotifyCompletion
{
    private readonly Job _job;
    public readonly bool IsCompleted => _job.Status == JobStatus.Completed;

    public JobAwaiter(Job job)
    {
        _job = job;
        if (job.Status == JobStatus.Created)
        {
            job.Start();
        }
    }
    
    public void GetResult() { }

    public void OnCompleted(Action continuation)
    {
        _job.ContinueWith(_ => continuation());
    }

    public void UnsafeOnCompleted(Action continuation)
    => OnCompleted(continuation);
}

After adding await, the previous code can also be written like this:

await F1();
await F2();

static Job F1() => new Job(() =>
{
        Thread.Sleep(1000);
        Console.WriteLine("11");
});

static Job F2() => new Job(() =>
{
        Thread.Sleep(1000);
        Console.WriteLine("12");
});

Summary

Looking back at the first two questions, we can now try to come up with answers.

  1. What is Task? Task is a stateful operation (Created, Scheduled, Running, Completed). It is an abstraction of time-consuming operations. Just like a task in reality, its execution takes a relatively long time. It also has the basic processes of Created, Scheduled, Running, and Completed. Of course, the results need to be obtained to complete the task. The Job here is relatively simple and does not simulate specific results;
  2. How Tasks are scheduled is based on thread pool scheduling by default. That is, after a Task is created, it is executed by an idle thread in the thread pool. Developers do not need to care about when and which thread is executed. During the process,
    However, due to the limitations of the .net global thread pool, when some special scenarios cannot be met (for example, a task needs to be executed immediately), the scheduling behavior can be changed through TaskCreationOptions;

In addition, await is syntactic sugar. The implementation behind it is based on GetAwaiter, which returns the implementation of the ICriticalNotifyCompletion interface and encapsulates ContinueWith.