C#学习教程:如何正确排队任务以在C#中运行分享


如何正确排队任务以在C#中运行

我有一个项目的枚举( RunData.Demand ),每个项目代表一些涉及通过HTTP调用API的工作。 如果我只是通过它来完成并在每次迭代期间调用API,它的工作效果很好。 但是,每次迭代需要一两秒钟,所以我想运行2-3个线程并在它们之间划分工作。 这就是我正在做的事情:

 ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads var tasks = RunData.Demand .Select(service => Task.Run(async delegate { var availabilityResponse = await client.QueryAvailability(service); // Do some other stuff, not really important })); await Task.WhenAll(tasks); 

client.QueryAvailability调用基本上使用HttpClient类调用API:

 public async Task QueryAvailability(QueryAvailabilityMultidayRequest request) { var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request); if (response.IsSuccessStatusCode) { return await response.Content.ReadAsAsync(); } throw new HttpException((int) response.StatusCode, response.ReasonPhrase); } 

这种方法很有效,但最终事情开始超时。 如果我将HttpClient超时设置为一小时,那么我开始得到奇怪的内部服务器错误。

我开始做的是在QueryAvailability方法中设置一个秒表,看看发生了什么。

发生的事情是RunData.Demand中的所有1200个项目一次被创建,所有1200个await client.PostAsJsonAsync方法被调用。 它似乎然后使用2个线程慢慢检查任务,所以最后我有等待9或10分钟的任务。

这是我想要的行为:

我想创建1,200个任务,然后在线程可用时一次运行3-4个任务。 我不想立即排队1,200个HTTP呼叫。

这有什么好办法吗?

正如我一直建议..你需要的是TPL Dataflow(安装: Install-Package Microsoft.Tpl.Dataflow )。

您创建一个ActionBlock其中包含要对每个项执行的操作。 设置MaxDegreeOfParallelism以进行限制。 开始发布并等待其完成:

 var block = new ActionBlock(async service => { var availabilityResponse = await client.QueryAvailability(service); // ... }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); foreach (var service in RunData.Demand) { block.Post(service); } block.Complete(); await block.Completion; 

您正在使用异步HTTP调用,因此限制线程数将无济于事( ParallelOptions.MaxDegreeOfParallelism中的Parallel.ForEach也不会作为答案之一)。 即使是单个线程也可以启动所有请求并在结果到达时处理结果。

解决它的一种方法是使用TPL Dataflow。

另一个不错的解决方案是将源IEnumerable划分为分区并按顺序处理每个分区中的项目,如本博文中所述:

 public static Task ForEachAsync(this IEnumerable source, int dop, Func body) { return Task.WhenAll( from partition in Partitioner.Create(source).GetPartitions(dop) select Task.Run(async delegate { using (partition) while (partition.MoveNext()) await body(partition.Current); })); } 

虽然Dataflow库很棒,但我认为在不使用块组合时它有点沉重。 我倾向于使用类似下面的扩展方法。

此外,与Partitioner方法不同,它在调用上下文上运行异步方法 – 需要注意的是,如果您的代码不是真正的异步,或者采用“快速路径”,那么它将有效地同步运行,因为没有显式创建线程。

 public static async Task RunParallelAsync(this IEnumerable items, Func asyncAction, int maxParallel) { var tasks = new List(); foreach (var item in items) { tasks.Add(asyncAction(item)); if (tasks.Count < maxParallel) continue; var notCompleted = tasks.Where(t => !t.IsCompleted).ToList(); if (notCompleted.Count >= maxParallel) await Task.WhenAny(notCompleted); } await Task.WhenAll(tasks); } 

老问题,但我想提出一个使用SemaphoreSlim类的替代轻量级解决方案。 只需参考System.Threading。

 SemaphoreSlim sem = new SemaphoreSlim(4,4); foreach (var service in RunData.Demand) { await sem.WaitAsync(); Task t = Task.Run(async () => { var availabilityResponse = await client.QueryAvailability(serviceCopy)); // do your other stuff here with the result of QueryAvailability } t.ContinueWith(sem.Release()); } 

信号量充当锁定机制。 您只能通过调用Wait(WaitAsync)来输入信号量,Wait(WaitAsync)从计数中减去一个。 呼叫释放增加了一个计数。

上述就是C#学习教程:如何正确排队任务以在C#中运行分享的全部内容,如果对大家有所用处且需要了解更多关于C#学习教程,希望大家多多关注—猴子技术宅(www.ssfiction.com)

本文来自网络收集,不代表猴子技术宅立场,如涉及侵权请点击右边联系管理员删除。

如若转载,请注明出处:https://www.ssfiction.com/ckf/1031855.html

发表评论

邮箱地址不会被公开。 必填项已用*标注