Skip to content

实现异步任务

如何创建和调用异步方法

在 C# 中,异步操作是使用 asyncawait 关键字实现的。 关键字 async 用于定义异步方法,而 await 关键字用于调用异步方法并等待其结果而不阻止调用线程。 异步操作通常使用 TaskTask<T> 类型来实现,这些类型表示可以等待的进行中的操作。

下面是有关如何在 C# 中创建和调用异步任务的示例:

c#
using System;
using System.IO;
using System.Threading.Tasks;

public class Program
{
    public static async Task Main()
    {
        string filePath = "example.txt";
        string content = await ReadFileAsync(filePath);
        Console.WriteLine(content);
    }

    public static async Task<string> ReadFileAsync(string filePath)
    {
        using (StreamReader reader = new StreamReader(filePath))
        {
            string content = await reader.ReadToEndAsync();
            return content;
        }
    }
}

在此示例中,方法 ReadFileAsync 是使用 async 关键字定义的,并且返回类型为 Task<string>。 关键字 async 指示该方法包含异步操作,返回类型 Task<string> 指示该方法返回表示异步操作的任务。 Task<string> 类型是一个泛型任务,表示返回字符串值的异步操作。 方法 ReadFileAsync 采用文件路径作为参数,并异步读取文件的内容。

方法 Main 也定义为异步方法,允许它使用 await 关键字调用 ReadFileAsync 方法。 关键字 await 用于指示程序应在继续操作之前等待异步操作的结果。 在这种情况下,程序等待文件被读取,然后将其内容打印到控制台。

实现异步文件输入和输出

下面的代码示例演示如何创建异步方法,以序列化 C# 对象、将 JSON 字符串写入文件、将文件内容读入字符串,并将 JSON 字符串反序列化回 C# 对象:

c#
using System;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;

public class Account
{
    public string Name { get; set; }
    public decimal Balance { get; set; }
}

public class Program
{
    public static async Task Main()
    {
        // Combine a directory and file name, then create the directory if it doesn't exist
        string directoryPath = @"C:\TempDir";
        if (!Directory.Exists(directoryPath))
        {
            Directory.CreateDirectory(directoryPath);
        }

        string fileName = "account.json";
        string filePath = Path.Combine(directoryPath, fileName);

        Account account = new Account { Name = "Elize Harmsen", Balance = 1000.00m };

        // Save account data to a file asynchronously
        await SaveAccountDataAsync(filePath, account);

        // Load account data from the file asynchronously
        Account loadedAccount = await LoadAccountDataAsync(filePath);
        Console.WriteLine($"Name: {loadedAccount.Name}, Balance: {loadedAccount.Balance}");
    }

    public static async Task SaveAccountDataAsync(string filePath, Account account)
    {
        string jsonString = JsonSerializer.Serialize(account);
        await File.WriteAllTextAsync(filePath, jsonString);
    }

    public static async Task<Account> LoadAccountDataAsync(string filePath)
    {
        string jsonString = await File.ReadAllTextAsync(filePath);
        return JsonSerializer.Deserialize<Account>(jsonString);
    }
}

将 HttpClient 用于异步 API 调用

HttpClient 类是 System.Net.Http 名称空间的一部分,它提供了用于发送HTTP请求和接收HTTP响应的类。HttpClient 类被设计为异步使用,允许你对web资源进行非阻塞调用。

HTTPClient 类包括以下异步方法:

  • GetAsync:将 GET 请求发送到指定的 URI 并返回响应。
  • PostAsync:使用指定内容将POST 请求发送到指定 URI 并返回响应。
  • PutAsync:使用指定内容将 PUT 请求发送到指定 URI 并返回响应。
  • DeleteAsync:将 DELETE 请求发送到指定的 URI 并返回响应。
  • SendAsync:发送 HTTP 请求消息并返回响应。

下面的代码示例演示如何使用 HttpClient 类向 REST API 发出异步 GET 请求并处理响应:

c#
// Code that demonstrates the use of asynchronous REST API calls in C#

using System;
using System.ComponentModel;
using System.Net.Http;
using System.Threading.Tasks;
using System.Text.Json;

namespace ConsoleApp
{
    class Program
    {
        static async Task Main(string[] args)
        {
            using (HttpClient client = new HttpClient())
            {
                try
                {
                    // PetStore API endpoint
                    string url = "https://petstore.swagger.io/v2/pet/findByStatus?status=available";
                    HttpResponseMessage response = await client.GetAsync(url);
                    response.EnsureSuccessStatusCode();
                    string responseBody = await response.Content.ReadAsStringAsync();
                    //Console.WriteLine($"Response: {responseBody}");

                    // Deserialize the JSON response into a list of pets
                    var pets = JsonSerializer.Deserialize<List<Pet>>(responseBody);

                    // Iterate through the list of pets and display their details
                    foreach (var pet in pets)
                    {
                        //Console.WriteLine($"Pet ID: {pet.id}, Name: {pet.name}");
                        if (pet.id.ToString().Length > 4)
                        {
                            Console.WriteLine($"Pet ID: {pet.id}, Name: {pet.name}");
                        }
                    }
                }
                catch (HttpRequestException e)
                {
                    Console.WriteLine($"Request error: {e.Message}");
                }
            }
        }
    }
}
public class Pet
{
    public long id { get; set; }
    public string name { get; set; }
    public Category category { get; set; }
    public List<string> photoUrls { get; set; }
    public List<Tag> tags { get; set; }
    public string status { get; set; }
}

public class Category
{
    public long id { get; set; }
    public string name { get; set; }
}

public class Tag
{
    public long id { get; set; }
    public string name { get; set; }
}

并行运行异步任务

在 C# 中,可以使用任务并行库 (Task Parallel Library,TPL) 简化并行代码编写过程。 TPL 是 System.ThreadingSystem.Threading.Tasks 命名空间中的一组公共类型和 API。

TPL 在以下方面提供支持:

  • 数据并行:任务并行库(TPL)提供执行数据并行的方法,使你能够同时对多个数据元素执行相同的操作。 当你拥有大型数据集并且想要单独对每个元素执行计算或转换时,这特别有用。

  • 基于任务的异步编程模型:TPL 提供 Task 类,表示异步操作。 可以使用 asyncawait 关键字来简化编写异步代码的过程。 这样,就可以编写更易于读取和维护的代码,同时仍利用并行度。

  • 数据流:TPL 提供数据流编程模型,可用于创建复杂的数据处理管道。 此模型基于“块”的概念,可以异步处理数据并使用消息相互通信。

数据并行

数据并行是一种并行编程模式,侧重于同时对多个数据元素执行相同的操作。 当你拥有大型数据集并且想要单独对每个元素执行计算或转换时,这特别有用。 在 C# 中,可以使用 Parallel.ForParallel.ForEach 方法轻松实现数据并行。 通过这些方法,可以并行循环访问集合或数据范围,并将工作负荷分布到多个线程。

任务并行库通过 System.Threading.Tasks.Parallel 类支持数据并行。 此类提供基于方法的 forforeach 循环的并行实现。 为 Parallel.For 循环或 Parallel.ForEach 循环编写循环逻辑时,就像编写顺序循环一样。 TPL 为你处理所有低级别工作。

下面的代码示例演示了一个简单的 foreach 循环及其并行等效项。

c#
// Sequential version
foreach (var item in sourceCollection)
{
    Process(item);
}

// Parallel equivalent
Parallel.ForEach(sourceCollection, item => Process(item));

TPL 还提供一组数据结构,这些结构针对并发访问进行了优化,例如 ConcurrentBagConcurrentQueueConcurrentDictionary。 借助这些数据结构,可以安全地添加、删除和访问多个线程中的元素,而无需显式锁定。

下面的代码示例演示如何使用 ConcurrentBag 来存储并行运行的多个任务的结果:

c#
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        var results = new ConcurrentBag<int>();
        Parallel.For(0, 100, i =>
        {
            // Simulate some work
            Task.Delay(100).Wait();
            results.Add(i);
        });

        Console.WriteLine($"Processed {results.Count} items in parallel.");
    }
}

使用 Task.WhenAll 和 Task.WhenAny 并行运行任务

Task.WhenAllTask.WhenAny 方法是 C# 中任务并行库的一部分。 这些方法允许并行运行多个任务,并等待其完成。

  • 当您希望在继续之前等待所有任务完成时,使用 Task.WhenAll。它接受一个任务数组作为输入,并返回一个任务,表示所有输入任务的完成。当您有多个可以并发执行的独立任务时,例如进行多个API调用或同时处理多个文件,这是非常有用的。

  • 当您想要等待任何任务完成时,使用 Task.WhenAny。它接受任务数组作为输入,并返回表示第一个完成的任务的任务。当您希望在任何任务完成后立即执行某些操作,而不是等待所有任务完成时,这很有用。

下面的代码示例演示如何用于 Task.WhenAll 并行运行多个任务并等待其完成:

c#
using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;

class Program
{
    static async Task Main(string[] args)
    {
        var urls = new List<string>
        {
            "https://example.com",
            "https://example.org",
            "https://example.net"
        };

        var tasks = new List<Task<string>>();

        foreach (var url in urls)
        {
            tasks.Add(FetchDataAsync(url));
        }

        // Wait for all tasks to complete
        var results = await Task.WhenAll(tasks);

        foreach (var result in results)
        {
            Console.WriteLine(result);
        }
    }

    static async Task<string> FetchDataAsync(string url)
    {
        using (var client = new HttpClient())
        {
            return await client.GetStringAsync(url);
        }
    }
}

同时进行多个文件 I/O 操作

以下示例按顺序循环访问目录,但并行处理文件。 如果文件与目录比率较大,这可能是最佳方法。 还可以并行化目录迭代,并按顺序访问每个文件。 除非你专门面向具有大量处理器的计算机,否则并行化这两个循环可能并不有效。 但是,与在所有情况下一样,应全面测试应用程序,以确定最佳方法。

c#
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Security;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        try
        {
            TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
            {
                // Exceptions are no-ops.
                try
                {
                    // Do nothing with the data except read it.
                    byte[] data = File.ReadAllBytes(f);
                }
                catch (FileNotFoundException) { }
                catch (IOException) { }
                catch (UnauthorizedAccessException) { }
                catch (SecurityException) { }
                // Display the filename.
                Console.WriteLine(f);
            });
        }
        catch (ArgumentException)
        {
            Console.WriteLine(@"The directory 'C:\Program Files' does not exist.");
        }

        // Keep the console window open.
        Console.ReadKey();
    }

    public static void TraverseTreeParallelForEach(string root, Action<string> action)
    {
        //Count of files traversed and timer for diagnostic output
        int fileCount = 0;
        var sw = Stopwatch.StartNew();

        // Determine whether to parallelize file processing on each folder based on processor count.
        int procCount = Environment.ProcessorCount;

        // Data structure to hold names of subfolders to be examined for files.
        Stack<string> dirs = new Stack<string>();

        if (!Directory.Exists(root))
        {
            throw new ArgumentException(
                "The given root directory doesn't exist.", nameof(root));
        }
        dirs.Push(root);

        while (dirs.Count > 0)
        {
            string currentDir = dirs.Pop();
            string[] subDirs = { };
            string[] files = { };

            try
            {
                subDirs = Directory.GetDirectories(currentDir);
            }
            // Thrown if we do not have discovery permission on the directory.
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            // Thrown if another process has deleted the directory after we retrieved its name.
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            try
            {
                files = Directory.GetFiles(currentDir);
            }
            catch (UnauthorizedAccessException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (DirectoryNotFoundException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }
            catch (IOException e)
            {
                Console.WriteLine(e.Message);
                continue;
            }

            // Execute in parallel if there are enough files in the directory.
            // Otherwise, execute sequentially.Files are opened and processed
            // synchronously but this could be modified to perform async I/O.
            try
            {
                if (files.Length < procCount)
                {
                    foreach (var file in files)
                    {
                        action(file);
                        fileCount++;
                    }
                }
                else
                {
                    Parallel.ForEach(files, () => 0,
                        (file, loopState, localCount) =>
                        {
                            action(file);
                            return (int)++localCount;
                        },
                        (c) =>
                        {
                            Interlocked.Add(ref fileCount, c);
                        });
                }
            }
            catch (AggregateException ae)
            {
                ae.Handle((ex) =>
                {
                    if (ex is UnauthorizedAccessException)
                    {
                        // Here we just output a message and go on.
                        Console.WriteLine(ex.Message);
                        return true;
                    }
                    // Handle other exceptions here if necessary...

                    return false;
                });
            }

            // Push the subdirectories onto the stack for traversal.
            // This could also be done before handing the files.
            foreach (string str in subDirs)
                dirs.Push(str);
        }

        // For diagnostic purposes.
        Console.WriteLine($"Processed {fileCount} files in {sw.ElapsedMilliseconds} milliseconds");
    }
}

在此示例中,文件 I/O 是同步执行的。 当代码使用大型文件或网络连接缓慢时,最好以异步方式访问文件。 可以将异步 I/O 技术与并行迭代相结合。

该示例使用本地 fileCount 变量来维护已处理的文件总数的计数。 由于变量可由多个任务并发访问,因此通过调用 Interlocked.Add 该方法来同步访问。

请注意,如果在主线程上引发异常,则 ForEach 方法启动的线程可能会继续运行。 若要停止这些线程,可以在异常处理程序中设置布尔变量,并在并行循环的每个迭代中检查其值。 如果值指示已引发异常,请使用 ParallelLoopState 变量停止或中断循环。

异步和并行任务中的异常处理

使用任务并行库(TPL)来运行任务时,异常可以通过多种不同的方式发生。 最常见的情况是当任务引发异常时。 当任务在线程池线程上运行或在主线程上运行时,可能会引发异常。 在任一情况下,异常将传播回调用线程。

使用 Task.Wait 该方法等待任务完成时,任务引发的任何异常将传播回调用线程。 可以使用 try/catch 块处理这些异常。 如果一个任务是附加子任务的父任务,或者在等待多个任务时,可能会引发多个异常。 如果引发一个或多个异常,它们会被封装在一个 AggregateException 实例中。

AggregateException 异常具有一个 InnerExceptions 属性,可以枚举该属性以查看所有引发的原始异常,并单独处理每一个(或选择不处理)。

以下示例演示如何处理任务引发的异常。

c#
public static partial class Program
{
    public static void Main()
    {
        HandleThree();
    }
    
    public static void HandleThree()
    {
        var task = Task.Run(
            () => throw new CustomException("This exception is expected!"));

        try
        {
            task.Wait();
        }
        catch (AggregateException ae)
        {
            foreach (var ex in ae.InnerExceptions)
            {
                // Handle the custom exception.
                if (ex is CustomException)
                {
                    Console.WriteLine(ex.Message);
                }
                // Rethrow any other exception.
                else
                {
                    throw ex;
                }
            }
        }
    }
}

// Define the CustomException class
public class CustomException : Exception
{
    public CustomException(string message) : base(message) { }
}
// The example displays the following output:
//        This exception is expected!

在此示例中,方法 HandleThree 创建了一个会引发 CustomException 的任务。 该 try/catch 块会捕获 AggregateException,并循环访问 InnerExceptions 集合。 如果异常的类型 CustomException,它会将消息输出到控制台。 如果遇到其他类型的异常,则会重新引发该异常。

还可以使用 AggregateException.Handle 方法处理原始异常。 此方法采用为 InnerExceptions 集合中的每个异常调用的委托。 如果委托返回 true,则异常会被视为已处理,并从集合中删除。 如果返回 false,则会重新引发异常。

以下示例演示如何使用 Handle 方法处理任务引发的异常。

c#
public static partial class Program
{
    public static void HandleFour()
    {
        var task = Task.Run(
            () => throw new CustomException("This exception is expected!"));

        try
        {
            task.Wait();
        }
        catch (AggregateException ae)
        {
            ae.Handle(ex =>
            {
                // Handle the custom exception.
                if (ex is CustomException)
                {
                    Console.WriteLine(ex.Message);
                    return true;
                }
                // Rethrow any other exception.
                return false;
            });
        }
    }
}

在此示例中,方法 HandleFour 创建了一个会引发 CustomException 的任务。 该 try/catch 块捕获 AggregateException 并调用 Handle 该方法。 委托会检查异常的类型是否为 CustomException。 如果异常的类型是 CustomException,委托会将消息输出到控制台并返回 true。 指示异常已被处理的响应是 true。 如果异常是其他类型,委托将返回 false,导致重新引发该异常。

Last updated:

Released under the MIT License.