隐藏

C# Task 多任务 限制Task并发数量

发布:2021/3/6 15:45:50作者:管理员 来源:本站 浏览次数:1010

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Utils
{
    public class LimitedTaskScheduler : TaskScheduler, IDisposable
    {
        #region 外部方法
        [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")]
        public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize);
        #endregion

        #region 变量属性事件
        private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
        List<Thread> _threadList = new List<Thread>();
        private int _threadCount = 0;
        private int _timeOut = Timeout.Infinite;
        private Task _tempTask;
        #endregion

        #region 构造函数
        public LimitedTaskScheduler(int threadCount = 10)
        {
            CreateThreads(threadCount);
        }
        #endregion

        #region override GetScheduledTasks
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasks;
        }
        #endregion

        #region override TryExecuteTaskInline
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }
        #endregion

        #region override QueueTask
        protected override void QueueTask(Task task)
        {
            _tasks.Add(task);
        }
        #endregion

        #region 资源释放
        /// <summary>
        /// 资源释放
        /// 如果尚有任务在执行,则会在调用此方法的线程上引发System.Threading.ThreadAbortException,请使用Task.WaitAll等待任务执行完毕后,再调用该方法
        /// </summary>
        public void Dispose()
        {
            _timeOut = 100;

            foreach (Thread item in _threadList)
            {
                item.Abort();
            }
            _threadList.Clear();

            GC.Collect();
            GC.WaitForPendingFinalizers();
            if (Environment.OSVersion.Platform == PlatformID.Win32NT)
            {
                SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1);
            }
        }
        #endregion

        #region 创建线程池
        /// <summary>
        /// 创建线程池
        /// </summary>
        private void CreateThreads(int? threadCount = null)
        {
            if (threadCount != null) _threadCount = threadCount.Value;
            _timeOut = Timeout.Infinite;

            for (int i = 0; i < _threadCount; i++)
            {
                Thread thread = new Thread(new ThreadStart(() =>
                {
                    Task task;
                    while (_tasks.TryTake(out task, _timeOut))
                    {
                        TryExecuteTask(task);
                    }
                }));
                thread.IsBackground = true;
                thread.Start();
                _threadList.Add(thread);
            }
        }
        #endregion

        #region 全部取消
        /// <summary>
        /// 全部取消
        /// </summary>
        public void CancelAll()
        {
            while (_tasks.TryTake(out _tempTask)) { }
        }
        #endregion

    }

}


using System;
using System.Windows.Threading;
using System.Threading;
using System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// 线程帮助类(处理单线程任务)
    /// </summary>
    public class ThreadHelper
    {
        private static LimitedTaskScheduler _defaultScheduler = new LimitedTaskScheduler();

        /// <summary>
        /// 执行
        /// 例:ThreadHelper.Run(() => { }, (ex) => { });
        /// </summary>
        /// <param name="doWork">在线程中执行</param>
        /// <param name="errorAction">错误处理</param>
        public static System.Threading.Tasks.Task Run2(Action doWork, LimitedTaskScheduler scheduler = null, Action<Exception> errorAction = null)
        {
            if (scheduler == null) scheduler = _defaultScheduler;
            System.Threading.Tasks.Task task = System.Threading.Tasks.Task.Factory.StartNew(() =>
            {
                try
                {
                    if (doWork != null)
                    {
                        doWork();
                    }
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.LogError(ex);
                }
            }, CancellationToken.None, TaskCreationOptions.None, scheduler);
            return task;
        }

        /// <summary>
        /// 执行
        /// 例:ThreadHelper.Run(() => { }, (ex) => { });
        /// </summary>
        /// <param name="doWork">在线程中执行</param>
        /// <param name="errorAction">错误处理</param>
        public static System.Threading.Tasks.Task Run(Action doWork, LimitedTaskScheduler scheduler = null, Action<Exception> errorAction = null)
        {
            System.Threading.Tasks.Task task = System.Threading.Tasks.Task.Factory.StartNew(() =>
            {
                try
                {
                    if (doWork != null)
                    {
                        doWork();
                    }
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.LogError(ex);
                }
            });
            return task;
        }

        /// <summary>
        /// 封装Dispatcher.BeginInvoke
        /// 例:ThreadHelper.BeginInvoke(this.Dispatcher, () => { }, (ex) => { });
        /// </summary>
        /// <param name="errorAction">错误处理</param>
        public static void BeginInvoke(Dispatcher dispatcher, Action action, Action<Exception> errorAction = null)
        {
            dispatcher.InvokeAsync(new Action(() =>
            {
                try
                {
                    DateTime dt = DateTime.Now;
                    action();
                    double d = DateTime.Now.Subtract(dt).TotalSeconds;
                    if (d > 0.01) LogUtil.Log("ThreadHelper.BeginInvoke UI耗时:" + d + "秒 " + action.Target.ToString());
                }
                catch (Exception ex)
                {
                    if (errorAction != null) errorAction(ex);
                    LogUtil.LogError(ex);
                }
            }), DispatcherPriority.Background);
        }
    }
}


private void Test23()
{
    //变量定义
    DateTime dt = DateTime.Now;
    Random rnd = new Random();
    int taskCount = 1000;
    LimitedTaskScheduler scheduler = new LimitedTaskScheduler();

    //生成测试数据
    BlockingCollection<double> _data = new BlockingCollection<double>();
    for (int i = 0; i < taskCount; i++)
    {
        _data.Add(rnd.NextDouble());
    }

    //数据计算
    Thread thread = new Thread(new ThreadStart(() =>
    {
        dt = DateTime.Now;
        for (int i = 0; i < taskCount; i++)
        {
            ThreadHelper.Run(() =>
            {
                Thread.Sleep(50);
                double a;
                if (_data.TryTake(out a))
                {
                    double r = Math.PI * a;
                }
            }, scheduler);
        }
        double d = DateTime.Now.Subtract(dt).TotalSeconds;

        this.BeginInvoke(new Action(() =>
        {
            textBox1.Text += "调用" + taskCount + "次ThreadHelper.Run耗时:" + d.ToString() + "秒\r\n";
        }));
    }));
    thread.IsBackground = true;
    thread.Start();

    //数据计算耗时
    Thread thread2 = new Thread(new ThreadStart(() =>
    {
        while (_data.Count > 0)
        {
            Thread.Sleep(1);
        }
        double d = DateTime.Now.Subtract(dt).TotalSeconds;

        this.BeginInvoke(new Action(() =>
        {
            textBox1.Text += "数据计算结束,耗时:" + d.ToString() + "秒\r\n";
        }));
    }));
    thread2.IsBackground = true;
    thread2.Start();

    scheduler.Dispose();
}

private LimitedTaskScheduler _scheduler = new LimitedTaskScheduler();
private void Test24()
{
    //点击按钮耗时
    DateTime dt = DateTime.Now;
    ThreadHelper.Run(() =>
    {
        double d = DateTime.Now.Subtract(dt).TotalSeconds;
        this.BeginInvoke(new Action(() =>
        {
            textBox1.Text += "点击按钮耗时:" + d.ToString() + "秒\r\n";
        }));
    }, _scheduler);
}

private void button1_Click(object sender, EventArgs e)
{
    Test23();
}

private void button2_Click(object sender, EventArgs e)
{
    Test24();
}


结论:使用Run,点击button2时,卡了好几秒才出来结果,而使用Run2,点击button2时,立即显示结果,button2的操作本身应该耗时极少。

现实意义:当一批耗时任务无脑使用Task.Factory.StartNew时,另一个使用Task.Factory.StartNew的任务就无法及时响应了。