310 lines
13 KiB
C#
310 lines
13 KiB
C#
using System;
|
||
using System.Net.Sockets;
|
||
using System.Threading;
|
||
using System.Threading.Tasks;
|
||
|
||
namespace XiaoZhiSharp.Kernels
|
||
{
|
||
/* CancellationTokenSource.Cancel(false);//相当于cancellationTokenSource.Cancel()
|
||
CancellationToken状态只能改变一次(从未取消变成已取消),当CancellationToken时已取消状态时,
|
||
每次往其中注册的回调都会立刻执行!当处于未取消状态时,注册进去的回调都会等待执行。
|
||
需要注意的是,当在未取消状态下注册多个回调时,它们在执行时是一个类似栈的结构顺序,先注册后执行。
|
||
而CancellationToken的Register可以注册多个回调,那他们可能都会抛出异常,throwOnFirstException参数表示在第一次报错时的处理行为。
|
||
----------------------
|
||
throwOnFirstException = true 表示立即抛出当前发生的异常,后续的回调将会取消执行;
|
||
throwOnFirstException = false 表示跳过当前回调的异常,继续执行生效的回调,等所有的回调执行完成之后,
|
||
再将所有的异常打包成一个System.AggregateException异常抛出来!
|
||
*/
|
||
|
||
/// <summary>
|
||
/// 任务超时取消 示例
|
||
/// </summary>
|
||
public class TaskTest
|
||
{
|
||
public static async Task TimeoutCancelTask()
|
||
{
|
||
CancellationTokenSource cts = new CancellationTokenSource();//取消令牌
|
||
Task task = DoAction(cts);//业务异步任务
|
||
double timeoutSeconds = 2;//超时时间 秒
|
||
Task delayTask = Task.Delay(TimeSpan.FromSeconds(timeoutSeconds));//指定一个等待任务 等待到超时时间
|
||
Task completeTask = await Task.WhenAny(task, delayTask);//等待两个任务,任意一个任务执行完成。返回率先完成的任务
|
||
if (completeTask == delayTask)//如果率先完成的是超时等待任务,就说明业务任务执行超时了。
|
||
{
|
||
cts.Cancel();//取消令牌 状态改为取消
|
||
Console.WriteLine("任务已超时取消");
|
||
}
|
||
else
|
||
{
|
||
Console.WriteLine("任务已完成");
|
||
}
|
||
}
|
||
|
||
//模拟业务任务
|
||
public static async Task DoAction(CancellationTokenSource cts)
|
||
{
|
||
await Task.Delay(200);
|
||
for (int i = 1; i <= 5; i++)
|
||
{
|
||
if (cts.IsCancellationRequested)//在业务任务每个耗时的操作开始之前判断取消令牌是否已取消
|
||
break;
|
||
Console.WriteLine(i);
|
||
await Task.Delay(TimeSpan.FromSeconds(1));//模拟业务操作,耗时任务。
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// TaskExtensions测试
|
||
/// </summary>
|
||
public class TaskExtsTest
|
||
{
|
||
public static async Task Go()
|
||
{
|
||
double timeoutSeconds = 2;//超时时间 秒
|
||
int maxRetryCount = 2;//最大重试次数
|
||
CancellationTokenSource cts = new CancellationTokenSource();
|
||
bool isSuccess = false;
|
||
string result = string.Empty;
|
||
//1.超时取消任务 无返回值
|
||
//isSuccess = await TaskExtensions.TimeoutCancelAsync((cts) => DoActionNoResult(cts), timeoutSeconds, cts);
|
||
|
||
//1.超时取消任务 无返回值
|
||
//(isSuccess, result) = await TaskExtensions.TimeoutCancelAsync((cts) => DoActionWithResult(cts), timeoutSeconds, cts);
|
||
|
||
//3.超时取消并重试任务 无返回值
|
||
//isSuccess = await TaskExtensions.TimeoutRetryAsync((cts) => DoActionNoResult(cts), timeoutSeconds, maxRetryCount, cts);
|
||
|
||
//4.超时取消并重试任务 带返回值任务
|
||
(isSuccess, result) = await TaskExtensions.TimeoutRetryAsync((cts) => DoActionWithResult(cts), timeoutSeconds, maxRetryCount, cts);
|
||
|
||
if (isSuccess)
|
||
{
|
||
Console.WriteLine("任务执行成功,结果:" + result);
|
||
}
|
||
else
|
||
{
|
||
Console.WriteLine("任务执行失败!");
|
||
}
|
||
Console.ReadLine();
|
||
}
|
||
|
||
public static async Task DoActionNoResult(CancellationTokenSource cts)
|
||
{
|
||
await Task.Delay(200);
|
||
for (int i = 1; i <= 5; i++)
|
||
{
|
||
if (cts.IsCancellationRequested)//在业务任务每个耗时的操作开始之前判断取消令牌是否已取消
|
||
return;
|
||
Console.WriteLine($"num:{i}");
|
||
await Task.Delay(1000);//模拟业务操作,耗时任务。
|
||
}
|
||
}
|
||
|
||
public static async Task<string> DoActionWithResult(CancellationTokenSource cts)
|
||
{
|
||
await Task.Delay(200);
|
||
for (int i = 1; i <= 5; i++)
|
||
{
|
||
if (cts.IsCancellationRequested)//在业务任务每个耗时的操作开始之前判断取消令牌是否已取消
|
||
return "";
|
||
Console.WriteLine($"num:{i}");
|
||
await Task.Delay(1000);//模拟业务操作,耗时任务。
|
||
}
|
||
return "666";
|
||
}
|
||
}
|
||
|
||
public class TaskExecute
|
||
{
|
||
//线程
|
||
private Task task = null;
|
||
//线程源
|
||
private CancellationTokenSource cts = new CancellationTokenSource();
|
||
//手动停止事件对象
|
||
private ManualResetEvent mResetEvent = new ManualResetEvent(true);
|
||
|
||
public void Start()
|
||
{
|
||
if (task != null) { return; }
|
||
//暂停之后需要再次启动执行
|
||
if (cts.IsCancellationRequested)
|
||
{
|
||
cts = new CancellationTokenSource();
|
||
}
|
||
this.task = new Task(() =>
|
||
{
|
||
int count = 0;
|
||
while (!cts.IsCancellationRequested)
|
||
{
|
||
//用来控制是否需要继续和暂停
|
||
mResetEvent.WaitOne();
|
||
count++;
|
||
Console.WriteLine(count);
|
||
Thread.Sleep(3000);
|
||
}
|
||
}, cts.Token);
|
||
task.Start();
|
||
}
|
||
|
||
public void Pause()
|
||
{
|
||
mResetEvent.Reset();
|
||
}
|
||
|
||
public void Continue()
|
||
{
|
||
mResetEvent.Set();
|
||
}
|
||
|
||
public void Stop()
|
||
{
|
||
cts.Cancel();
|
||
cts.Dispose();
|
||
this.task.Dispose();
|
||
this.task = null;
|
||
}
|
||
}
|
||
|
||
public static class TaskExtensions
|
||
{
|
||
private static void WaitTask()
|
||
{
|
||
CancellationTokenSource cts = new CancellationTokenSource();
|
||
Task task = Task.Factory.StartNew(delegate
|
||
{
|
||
while (true)
|
||
{
|
||
try
|
||
{
|
||
throw new System.InvalidOperationException();
|
||
}
|
||
catch (SocketException ex)
|
||
{
|
||
break;
|
||
}
|
||
}
|
||
}, cts.Token);
|
||
task.Wait(5000);
|
||
if (task.IsCompleted)
|
||
{
|
||
|
||
}
|
||
else
|
||
{
|
||
cts.Cancel(true);
|
||
if (!task.IsFaulted)
|
||
{
|
||
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 任务超时取消
|
||
/// </summary>
|
||
/// <param name="func">业务任务(超时要取消任务的话 需要在耗时操作之前 判断cts如果取消就结束方法)</param>
|
||
/// <param name="timeoutSeconds">超时时间 秒</param>
|
||
/// <param name="cts">任务取消令牌</param>
|
||
/// <returns>true执行成功 false超时取消</returns>
|
||
public static async Task<bool> TimeoutCancelAsync(
|
||
Func<CancellationTokenSource, Task> func, double timeoutSeconds, CancellationTokenSource cts)
|
||
{
|
||
Task task = func.Invoke(cts);
|
||
Task delayTask = Task.Delay(TimeSpan.FromSeconds(timeoutSeconds), cts.Token);
|
||
Task completeTask = await Task.WhenAny(task, delayTask);
|
||
if (completeTask == task)
|
||
return true;
|
||
cts.Cancel();
|
||
Console.WriteLine("【TimeoutCancelAsync】任务执行超时已取消。");
|
||
return false;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 任务超时取消 (带泛型返回值)
|
||
/// </summary>
|
||
/// <param name="func">业务任务带返回值(超时要取消任务的话 需要在耗时操作之前 判断cts如果取消就结束方法)</param>
|
||
/// <param name="timeoutSeconds">超时时间 秒</param>
|
||
/// <param name="cts">任务取消令牌</param>
|
||
/// <returns>IsSuccess:true执行成功 false超时取消 Result:任务执行成功的结果</returns>
|
||
public static async Task<(bool IsSuccess, T Result)> TimeoutCancelAsync<T>(
|
||
Func<CancellationTokenSource, Task<T>> func, double timeoutSeconds, CancellationTokenSource cts)
|
||
{
|
||
Task<T> task = func.Invoke(cts);
|
||
Task<T> delayTask = Task.Delay(TimeSpan.FromSeconds(timeoutSeconds), cts.Token)
|
||
.ContinueWith(_ => default(T));
|
||
Task completeTask = await Task.WhenAny<T>(task, delayTask);
|
||
if (completeTask == task)
|
||
return (true, task.Result);
|
||
cts.Cancel();
|
||
Console.WriteLine("【TimeoutCancelAsync】任务执行超时已取消。");
|
||
return (false, delayTask.Result);
|
||
}
|
||
|
||
|
||
/// <summary>
|
||
/// 任务超时取消 然后重新执行
|
||
/// </summary>
|
||
/// <param name="func">业务任务(超时要取消任务的话 需要在耗时操作之前 判断cts如果取消就结束方法)</param>
|
||
/// <param name="timeoutSeconds">超时时间 秒</param>
|
||
/// <param name="maxRetryCount">最大重试次数</param>
|
||
/// <param name="cts">任务取消令牌</param>
|
||
/// <returns>是否成功</returns>
|
||
public static async Task<bool> TimeoutRetryAsync(
|
||
Func<CancellationTokenSource, Task> func, double timeoutSeconds, int maxRetryCount, CancellationTokenSource cts)
|
||
{
|
||
for (int i = 0; i <= maxRetryCount; i++)
|
||
{
|
||
if (cts.IsCancellationRequested)
|
||
break;
|
||
if (i > 0)
|
||
Console.WriteLine($"【TimeoutRetryAsync】任务第{i}次重试开始...");
|
||
CancellationTokenSource currentCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
|
||
Task task = func.Invoke(currentCts);
|
||
Task delayTask = Task.Delay(TimeSpan.FromSeconds(timeoutSeconds), currentCts.Token);
|
||
Task completeTask = await Task.WhenAny(task, delayTask);
|
||
if (completeTask == task)
|
||
{
|
||
currentCts.Dispose();
|
||
return true;
|
||
}
|
||
currentCts.Cancel();
|
||
Console.WriteLine("【TimeoutRetryAsync】任务执行超时已取消。");
|
||
}
|
||
return false;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 任务超时取消 然后重新执行 (带泛型返回值)
|
||
/// </summary>
|
||
/// <param name="func">业务任务带返回值(超时要取消任务的话 需要在耗时操作之前 判断cts如果取消就结束方法)</param>
|
||
/// <param name="timeoutSeconds">超时时间 秒</param>
|
||
/// <param name="maxRetryCount">最大重试次数</param>
|
||
/// <param name="cts">任务取消令牌</param>
|
||
/// <returns>IsSuccess:是否成功 Result:任务执行成功的结果</returns>
|
||
public static async Task<(bool IsSuccess, T Result)> TimeoutRetryAsync<T>(
|
||
Func<CancellationTokenSource, Task<T>> func, double timeoutSeconds, int maxRetryCount, CancellationTokenSource cts)
|
||
{
|
||
for (int i = 0; i <= maxRetryCount; i++)
|
||
{
|
||
if (cts.IsCancellationRequested)
|
||
break;
|
||
if (i > 0)
|
||
Console.WriteLine($"【TimeoutRetryAsync】任务第{i}次重试开始...");
|
||
CancellationTokenSource currentCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
|
||
Task<T> task = func.Invoke(currentCts);
|
||
Task<T> delayTask = Task.Delay(TimeSpan.FromSeconds(timeoutSeconds), currentCts.Token)
|
||
.ContinueWith(_ => default(T));
|
||
Task completeTask = await Task.WhenAny<T>(task, delayTask);
|
||
if (completeTask == task)
|
||
{
|
||
currentCts.Dispose();
|
||
return (true, await task);
|
||
}
|
||
currentCts.Cancel();
|
||
Console.WriteLine("【TimeoutRetryAsync】任务执行超时已取消。");
|
||
}
|
||
return (false, default(T));
|
||
}
|
||
}
|
||
}
|