Appearance
实现异步任务
如何创建和调用异步方法
在 C# 中,异步操作是使用 async 和 await 关键字实现的。 关键字 async 用于定义异步方法,而 await 关键字用于调用异步方法并等待其结果而不阻止调用线程。 异步操作通常使用 Task 或 Task<T> 类型来实现,这些类型表示可以等待的进行中的操作。
下面是有关如何在 C# 中创建和调用异步任务的示例:
c#
using System;
using System.IO;
using System.Threading.Tasks;
public class Program
{
public static async Task Main()
{
string filePath = "example.txt";
string content = await ReadFileAsync(filePath);
Console.WriteLine(content);
}
public static async Task<string> ReadFileAsync(string filePath)
{
using (StreamReader reader = new StreamReader(filePath))
{
string content = await reader.ReadToEndAsync();
return content;
}
}
}在此示例中,方法 ReadFileAsync 是使用 async 关键字定义的,并且返回类型为 Task<string>。 关键字 async 指示该方法包含异步操作,返回类型 Task<string> 指示该方法返回表示异步操作的任务。 Task<string> 类型是一个泛型任务,表示返回字符串值的异步操作。 方法 ReadFileAsync 采用文件路径作为参数,并异步读取文件的内容。
方法 Main 也定义为异步方法,允许它使用 await 关键字调用 ReadFileAsync 方法。 关键字 await 用于指示程序应在继续操作之前等待异步操作的结果。 在这种情况下,程序等待文件被读取,然后将其内容打印到控制台。
实现异步文件输入和输出
下面的代码示例演示如何创建异步方法,以序列化 C# 对象、将 JSON 字符串写入文件、将文件内容读入字符串,并将 JSON 字符串反序列化回 C# 对象:
c#
using System;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;
public class Account
{
public string Name { get; set; }
public decimal Balance { get; set; }
}
public class Program
{
public static async Task Main()
{
// Combine a directory and file name, then create the directory if it doesn't exist
string directoryPath = @"C:\TempDir";
if (!Directory.Exists(directoryPath))
{
Directory.CreateDirectory(directoryPath);
}
string fileName = "account.json";
string filePath = Path.Combine(directoryPath, fileName);
Account account = new Account { Name = "Elize Harmsen", Balance = 1000.00m };
// Save account data to a file asynchronously
await SaveAccountDataAsync(filePath, account);
// Load account data from the file asynchronously
Account loadedAccount = await LoadAccountDataAsync(filePath);
Console.WriteLine($"Name: {loadedAccount.Name}, Balance: {loadedAccount.Balance}");
}
public static async Task SaveAccountDataAsync(string filePath, Account account)
{
string jsonString = JsonSerializer.Serialize(account);
await File.WriteAllTextAsync(filePath, jsonString);
}
public static async Task<Account> LoadAccountDataAsync(string filePath)
{
string jsonString = await File.ReadAllTextAsync(filePath);
return JsonSerializer.Deserialize<Account>(jsonString);
}
}将 HttpClient 用于异步 API 调用
HttpClient 类是 System.Net.Http 名称空间的一部分,它提供了用于发送HTTP请求和接收HTTP响应的类。HttpClient 类被设计为异步使用,允许你对web资源进行非阻塞调用。
该 HTTPClient 类包括以下异步方法:
GetAsync:将 GET 请求发送到指定的 URI 并返回响应。PostAsync:使用指定内容将POST 请求发送到指定 URI 并返回响应。PutAsync:使用指定内容将 PUT 请求发送到指定 URI 并返回响应。DeleteAsync:将 DELETE 请求发送到指定的 URI 并返回响应。SendAsync:发送 HTTP 请求消息并返回响应。
下面的代码示例演示如何使用 HttpClient 类向 REST API 发出异步 GET 请求并处理响应:
c#
// Code that demonstrates the use of asynchronous REST API calls in C#
using System;
using System.ComponentModel;
using System.Net.Http;
using System.Threading.Tasks;
using System.Text.Json;
namespace ConsoleApp
{
class Program
{
static async Task Main(string[] args)
{
using (HttpClient client = new HttpClient())
{
try
{
// PetStore API endpoint
string url = "https://petstore.swagger.io/v2/pet/findByStatus?status=available";
HttpResponseMessage response = await client.GetAsync(url);
response.EnsureSuccessStatusCode();
string responseBody = await response.Content.ReadAsStringAsync();
//Console.WriteLine($"Response: {responseBody}");
// Deserialize the JSON response into a list of pets
var pets = JsonSerializer.Deserialize<List<Pet>>(responseBody);
// Iterate through the list of pets and display their details
foreach (var pet in pets)
{
//Console.WriteLine($"Pet ID: {pet.id}, Name: {pet.name}");
if (pet.id.ToString().Length > 4)
{
Console.WriteLine($"Pet ID: {pet.id}, Name: {pet.name}");
}
}
}
catch (HttpRequestException e)
{
Console.WriteLine($"Request error: {e.Message}");
}
}
}
}
}
public class Pet
{
public long id { get; set; }
public string name { get; set; }
public Category category { get; set; }
public List<string> photoUrls { get; set; }
public List<Tag> tags { get; set; }
public string status { get; set; }
}
public class Category
{
public long id { get; set; }
public string name { get; set; }
}
public class Tag
{
public long id { get; set; }
public string name { get; set; }
}并行运行异步任务
在 C# 中,可以使用任务并行库 (Task Parallel Library,TPL) 简化并行代码编写过程。 TPL 是 System.Threading 和 System.Threading.Tasks 命名空间中的一组公共类型和 API。
TPL 在以下方面提供支持:
数据并行:任务并行库(TPL)提供执行数据并行的方法,使你能够同时对多个数据元素执行相同的操作。 当你拥有大型数据集并且想要单独对每个元素执行计算或转换时,这特别有用。
基于任务的异步编程模型:TPL 提供
Task类,表示异步操作。 可以使用async和await关键字来简化编写异步代码的过程。 这样,就可以编写更易于读取和维护的代码,同时仍利用并行度。数据流:TPL 提供数据流编程模型,可用于创建复杂的数据处理管道。 此模型基于“块”的概念,可以异步处理数据并使用消息相互通信。
数据并行
数据并行是一种并行编程模式,侧重于同时对多个数据元素执行相同的操作。 当你拥有大型数据集并且想要单独对每个元素执行计算或转换时,这特别有用。 在 C# 中,可以使用 Parallel.For 和 Parallel.ForEach 方法轻松实现数据并行。 通过这些方法,可以并行循环访问集合或数据范围,并将工作负荷分布到多个线程。
任务并行库通过 System.Threading.Tasks.Parallel 类支持数据并行。 此类提供基于方法的 for 和 foreach 循环的并行实现。 为 Parallel.For 循环或 Parallel.ForEach 循环编写循环逻辑时,就像编写顺序循环一样。 TPL 为你处理所有低级别工作。
下面的代码示例演示了一个简单的 foreach 循环及其并行等效项。
c#
// Sequential version
foreach (var item in sourceCollection)
{
Process(item);
}
// Parallel equivalent
Parallel.ForEach(sourceCollection, item => Process(item));TPL 还提供一组数据结构,这些结构针对并发访问进行了优化,例如 ConcurrentBag, ConcurrentQueue 和 ConcurrentDictionary。 借助这些数据结构,可以安全地添加、删除和访问多个线程中的元素,而无需显式锁定。
下面的代码示例演示如何使用 ConcurrentBag 来存储并行运行的多个任务的结果:
c#
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
class Program
{
static void Main()
{
var results = new ConcurrentBag<int>();
Parallel.For(0, 100, i =>
{
// Simulate some work
Task.Delay(100).Wait();
results.Add(i);
});
Console.WriteLine($"Processed {results.Count} items in parallel.");
}
}使用 Task.WhenAll 和 Task.WhenAny 并行运行任务
Task.WhenAll 和 Task.WhenAny 方法是 C# 中任务并行库的一部分。 这些方法允许并行运行多个任务,并等待其完成。
当您希望在继续之前等待所有任务完成时,使用
Task.WhenAll。它接受一个任务数组作为输入,并返回一个任务,表示所有输入任务的完成。当您有多个可以并发执行的独立任务时,例如进行多个API调用或同时处理多个文件,这是非常有用的。当您想要等待任何任务完成时,使用
Task.WhenAny。它接受任务数组作为输入,并返回表示第一个完成的任务的任务。当您希望在任何任务完成后立即执行某些操作,而不是等待所有任务完成时,这很有用。
下面的代码示例演示如何用于 Task.WhenAll 并行运行多个任务并等待其完成:
c#
using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Generic;
class Program
{
static async Task Main(string[] args)
{
var urls = new List<string>
{
"https://example.com",
"https://example.org",
"https://example.net"
};
var tasks = new List<Task<string>>();
foreach (var url in urls)
{
tasks.Add(FetchDataAsync(url));
}
// Wait for all tasks to complete
var results = await Task.WhenAll(tasks);
foreach (var result in results)
{
Console.WriteLine(result);
}
}
static async Task<string> FetchDataAsync(string url)
{
using (var client = new HttpClient())
{
return await client.GetStringAsync(url);
}
}
}同时进行多个文件 I/O 操作
以下示例按顺序循环访问目录,但并行处理文件。 如果文件与目录比率较大,这可能是最佳方法。 还可以并行化目录迭代,并按顺序访问每个文件。 除非你专门面向具有大量处理器的计算机,否则并行化这两个循环可能并不有效。 但是,与在所有情况下一样,应全面测试应用程序,以确定最佳方法。
c#
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
try
{
TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
{
// Exceptions are no-ops.
try
{
// Do nothing with the data except read it.
byte[] data = File.ReadAllBytes(f);
}
catch (FileNotFoundException) { }
catch (IOException) { }
catch (UnauthorizedAccessException) { }
catch (SecurityException) { }
// Display the filename.
Console.WriteLine(f);
});
}
catch (ArgumentException)
{
Console.WriteLine(@"The directory 'C:\Program Files' does not exist.");
}
// Keep the console window open.
Console.ReadKey();
}
public static void TraverseTreeParallelForEach(string root, Action<string> action)
{
//Count of files traversed and timer for diagnostic output
int fileCount = 0;
var sw = Stopwatch.StartNew();
// Determine whether to parallelize file processing on each folder based on processor count.
int procCount = Environment.ProcessorCount;
// Data structure to hold names of subfolders to be examined for files.
Stack<string> dirs = new Stack<string>();
if (!Directory.Exists(root))
{
throw new ArgumentException(
"The given root directory doesn't exist.", nameof(root));
}
dirs.Push(root);
while (dirs.Count > 0)
{
string currentDir = dirs.Pop();
string[] subDirs = { };
string[] files = { };
try
{
subDirs = Directory.GetDirectories(currentDir);
}
// Thrown if we do not have discovery permission on the directory.
catch (UnauthorizedAccessException e)
{
Console.WriteLine(e.Message);
continue;
}
// Thrown if another process has deleted the directory after we retrieved its name.
catch (DirectoryNotFoundException e)
{
Console.WriteLine(e.Message);
continue;
}
try
{
files = Directory.GetFiles(currentDir);
}
catch (UnauthorizedAccessException e)
{
Console.WriteLine(e.Message);
continue;
}
catch (DirectoryNotFoundException e)
{
Console.WriteLine(e.Message);
continue;
}
catch (IOException e)
{
Console.WriteLine(e.Message);
continue;
}
// Execute in parallel if there are enough files in the directory.
// Otherwise, execute sequentially.Files are opened and processed
// synchronously but this could be modified to perform async I/O.
try
{
if (files.Length < procCount)
{
foreach (var file in files)
{
action(file);
fileCount++;
}
}
else
{
Parallel.ForEach(files, () => 0,
(file, loopState, localCount) =>
{
action(file);
return (int)++localCount;
},
(c) =>
{
Interlocked.Add(ref fileCount, c);
});
}
}
catch (AggregateException ae)
{
ae.Handle((ex) =>
{
if (ex is UnauthorizedAccessException)
{
// Here we just output a message and go on.
Console.WriteLine(ex.Message);
return true;
}
// Handle other exceptions here if necessary...
return false;
});
}
// Push the subdirectories onto the stack for traversal.
// This could also be done before handing the files.
foreach (string str in subDirs)
dirs.Push(str);
}
// For diagnostic purposes.
Console.WriteLine($"Processed {fileCount} files in {sw.ElapsedMilliseconds} milliseconds");
}
}在此示例中,文件 I/O 是同步执行的。 当代码使用大型文件或网络连接缓慢时,最好以异步方式访问文件。 可以将异步 I/O 技术与并行迭代相结合。
该示例使用本地 fileCount 变量来维护已处理的文件总数的计数。 由于变量可由多个任务并发访问,因此通过调用 Interlocked.Add 该方法来同步访问。
请注意,如果在主线程上引发异常,则 ForEach 方法启动的线程可能会继续运行。 若要停止这些线程,可以在异常处理程序中设置布尔变量,并在并行循环的每个迭代中检查其值。 如果值指示已引发异常,请使用 ParallelLoopState 变量停止或中断循环。
异步和并行任务中的异常处理
使用任务并行库(TPL)来运行任务时,异常可以通过多种不同的方式发生。 最常见的情况是当任务引发异常时。 当任务在线程池线程上运行或在主线程上运行时,可能会引发异常。 在任一情况下,异常将传播回调用线程。
使用 Task.Wait 该方法等待任务完成时,任务引发的任何异常将传播回调用线程。 可以使用 try/catch 块处理这些异常。 如果一个任务是附加子任务的父任务,或者在等待多个任务时,可能会引发多个异常。 如果引发一个或多个异常,它们会被封装在一个 AggregateException 实例中。
该 AggregateException 异常具有一个 InnerExceptions 属性,可以枚举该属性以查看所有引发的原始异常,并单独处理每一个(或选择不处理)。
以下示例演示如何处理任务引发的异常。
c#
public static partial class Program
{
public static void Main()
{
HandleThree();
}
public static void HandleThree()
{
var task = Task.Run(
() => throw new CustomException("This exception is expected!"));
try
{
task.Wait();
}
catch (AggregateException ae)
{
foreach (var ex in ae.InnerExceptions)
{
// Handle the custom exception.
if (ex is CustomException)
{
Console.WriteLine(ex.Message);
}
// Rethrow any other exception.
else
{
throw ex;
}
}
}
}
}
// Define the CustomException class
public class CustomException : Exception
{
public CustomException(string message) : base(message) { }
}
// The example displays the following output:
// This exception is expected!在此示例中,方法 HandleThree 创建了一个会引发 CustomException 的任务。 该 try/catch 块会捕获 AggregateException,并循环访问 InnerExceptions 集合。 如果异常的类型 CustomException,它会将消息输出到控制台。 如果遇到其他类型的异常,则会重新引发该异常。
还可以使用 AggregateException.Handle 方法处理原始异常。 此方法采用为 InnerExceptions 集合中的每个异常调用的委托。 如果委托返回 true,则异常会被视为已处理,并从集合中删除。 如果返回 false,则会重新引发异常。
以下示例演示如何使用 Handle 方法处理任务引发的异常。
c#
public static partial class Program
{
public static void HandleFour()
{
var task = Task.Run(
() => throw new CustomException("This exception is expected!"));
try
{
task.Wait();
}
catch (AggregateException ae)
{
ae.Handle(ex =>
{
// Handle the custom exception.
if (ex is CustomException)
{
Console.WriteLine(ex.Message);
return true;
}
// Rethrow any other exception.
return false;
});
}
}
}在此示例中,方法 HandleFour 创建了一个会引发 CustomException 的任务。 该 try/catch 块捕获 AggregateException 并调用 Handle 该方法。 委托会检查异常的类型是否为 CustomException。 如果异常的类型是 CustomException,委托会将消息输出到控制台并返回 true。 指示异常已被处理的响应是 true。 如果异常是其他类型,委托将返回 false,导致重新引发该异常。