隐藏

.NET的ConcurrentDictionary,线程安全集合类

发布:2023/8/25 21:00:59作者:管理员 来源:本站 浏览次数:546

ConcurrentDictionary 是.NET 4.0里面新增的号称线程安全的集合类。


那么自然,我们会预期ConcurrentDictionary 中的代码是线程安全的(至少几个关键方法是线程安全的).


举个例子,使用者可能会预期GetOrAdd中的方法当Key不存在的时候只执行一次Add的委托,第二次调用GetOrAdd就应该直接取回刚才生成的值了.


参考一下以下代码:


public static  void Test()

{

   var concurentDictionary = new ConcurrentDictionary<int, int>();


   var w = new ManualResetEvent(false);

   int timedCalled = 0;

   var threads = new List<Thread>();

   for (int i = 0; i < Environment.ProcessorCount; i++)

   {

       threads.Add(new Thread(() =>

       {

           w.WaitOne();

           concurentDictionary.GetOrAdd(1, i1 =>

           {

               Interlocked.Increment(ref timedCalled);

               return 1;

           });

       }));

       threads.Last().Start();

   }


   w.Set();//release all threads to start at the same time        

   Thread.Sleep(100);

   Console.WriteLine(timedCalled);// output is 4, means call initial 4 times

   //Console.WriteLine(concurentDictionary.Keys.Count);

}


  


GetOrAdd方法的定义就是按照Key获取一个Value,如果Key不存在,那么调用Func<T> 添加一个键值对.


按照ConcurrentDictionary的定义, 我预期这个Add应该只被调用一次


可是上面那段代码的运行结果表明, Interlocked.Increment(ref timedCalled); 被调用了4次,真是尴尬啊


用于初始化值的委托还真的是可以多次执行的,所以


   要么保证委托中的代码重复执行不会有问题

   要么使用线程安全的初始化方法,例如Lazy<T>


public static void Test()

       {

           var concurentDictionary = new ConcurrentDictionary<int, int>();


           var w = new ManualResetEvent(false);

           int timedCalled = 0;

           var threads = new List<Thread>();

           Lazy<int> lazy = new Lazy<int>(() => { Interlocked.Increment(ref timedCalled); return 1; });

           for (int i = 0; i < Environment.ProcessorCount; i++)

           {

               threads.Add(new Thread(() =>

               {

                   w.WaitOne();

                   concurentDictionary.GetOrAdd(1, i1 =>

                   {

                       return lazy.Value;

                   });

               }));

               threads.Last().Start();

           }


           w.Set();//release all threads to start at the same time        

           Thread.Sleep(100);

           Console.WriteLine(timedCalled);// output is 1

       }


  


附: 注释中也不说一下这个初始化方法会被多次调用,如果不是偶然遇到这个问题,估计永远都不知道


//

       // Summary:

       //     Adds a key/value pair to the System.Collections.Concurrent.ConcurrentDictionary<TKey,TValue>

       //     if the key does not already exist.

       //

       // Parameters:

       //   key:

       //     The key of the element to add.

       //

       //   valueFactory:

       //     The function used to generate a value for the key

       //

       // Returns:

       //     The value for the key. This will be either the existing value for the key

       //     if the key is already in the dictionary, or the new value for the key as

       //     returned by valueFactory if the key was not in the dictionary.

       //

       // Exceptions:

       //   System.ArgumentNullException:

       //     key is a null reference (Nothing in Visual Basic).-or-valueFactory is a null

       //     reference (Nothing in Visual Basic).

       //

       //   System.OverflowException:

       //     The dictionary contains too many elements.


  


该集合类中所有使用Func<T>的方法也存在类似的问题


希望能给还不知道该问题的朋友提个醒,避免不必要的BUG

【netcore基础】ConcurrentDictionary 使用字符串作为key给代码加锁且使用EF事物防止并发调用数据混乱的问题


业务场景需要锁住指定的字符串下的代码,防止并发创建多个订单


这里我们使用


ConcurrentDictionary


  


首先初始化一个字典


private static readonly ConcurrentDictionary<string, string> _dictLock = new ConcurrentDictionary<string, string>();


  


然后使用定义一个要锁代码的的key,这里为保证每个订单唯一,使用微信的订单号作为key


对同一微信支付订单的回调进行加锁处理代码


var lockkey = "wxpay_callback_lock_" + MD5Helper.GetMD5Str(pay.out_trade_no + pay.transaction_id); ;


lock (_dictLock.GetOrAdd(lockkey, lockkey))

{

   if (GeduRedisHelper.Exists(lockkey))

   {

       throw new GeduException("操作正在处理,请勿重复请求");

   }


   GeduRedisHelper.Add(lockkey, new

   {

       pay.out_trade_no,

       pay.transaction_id


   }, 60);

}


 


在lock代码段里我们使用 redis 来判断是否存在,为了方便以后分布式部署多台服务器的并发问题,这里可以redis共享key


然后在需要写入多个表的地方添加 事物处理


using (var _trs = _orderService.GetDB().BeginTransaction())

           {

               try

               {


                   //todo...

                   

                   _trs.Commit();

               }

               catch (Exception ex)

               {

                   _trs.Rollback();


                   throw new GeduException(ex.Message);

               }

               finally

               {

                   GeduRedisHelper.Remove(lockkey);

               }


  


这样可以防止数据部分成功部分失败的问题


最后操作成功之后要清理掉 redis 里的lock

.Net多线程编程—并发集合

1.为什么使用并发集合?


原因主要有以下几点:


   System.Collections和System.Collections.Generic名称空间中所提供的经典列表、集合和数组都不是线程安全的,若无同步机制,他们不适合于接受并发的指令来添加和删除元素。

   在并发代码中使用上述经典集合需要复杂的同步管理,使用起来很不方便。

   使用复杂的同步机制会大大降低性能。

   NET Framework 4所提供的新的集合尽可能地减少需要使用锁的次数。这些新的集合通过使用比较并交换(compare-and-swap,CAS)指令和内存屏障,避免使用互斥的重量级锁。这对性能有保障。


注意:


与经典集合相比,并发集合会有更大的开销,因此在串行代码中使用并发集合无意义,只会增加额外的开销且运行速度比访问经典集合慢。

2.并发集合


1)ConcurrentQueue:线程安全的先进先出 (FIFO) 集合


主要方法:


   Enqueue(T item);将对象添加到集合结尾。

   TryDequeue(out T result); 尝试移除并返回位于集合开始处的对象,返回值表示操作是否成功。

   TryPeek(out T result);尝试返回集合开始处的对象,但不将其移除,返回值表示操作是否成功。


说明:


   ConcurrentQueue是完全无锁的,但当CAS操作失败且面临资源争用时,它可能会自旋并且重试操作。

   ConcurrentQueue是FIFO集合,某些和出入顺序无关的场合,尽量不要用ConcurrentQueue。


2)ConcurrentStack:线程安全的后进先出 (LIFO) 集合


主要方法及属性:


   Push(T item);将对象插入集合的顶部。

   TryPop(out T result);尝试弹出并返回集合顶部的对象,返回值表示操作是否成功。

   TryPeek(out T result);尝试返回集合开始处的对象,但不将其移除,返回值表示操作是否成功。

   IsEmpty { get; }指示集合是否为空。

   PushRange(T[] items);将多个对象插入集合的顶部。

   TryPopRange(T[] items);弹出顶部多个元素,返回结果为弹出元素个数。


说明:


   与ConcurrentQueue相似地,ConcurrentStack完全无锁的,但当CAS操作失败且面临资源争用时,它可能会自旋并且重试操作。

   获取集合是否包含元素使用IsEmpty属性,而不是通过判断Count属性是否大于零。调用Count比调用IsEmpty开销大。

   使用PushRange(T[] items)和TryPopRange(T[] items)时注意缓冲引起的额外开销和额外的内存消耗。


3) ConcurrentBag:元素可重复的无序集合


主要方法及属性:


   TryPeek(out T result);尝试从集合返回一个对象,但不移除该对象,返回值表示是否成功获得该对象。

   TryTake(out T result);尝试从集合返回一个对象并移除该对象,返回值表示是否成功获得该对象。

   Add(T item);将对象添加到集合中。

   IsEmpty { get; }解释同ConcurrentStack


说明:


   ConcurrentBag为每一个访问集合的线程维护了一个本地队列,在可能的情况下,它会以无锁的方式访问本地队列。

   ConcurrentBag在同一个线程添加和删除元素的场合下效率非常高。

   因为ConcurrentBag有时会需要锁,在生产者线程和消费者线程完全分开的场景下效率非常低。

   ConcurrentBag调用IsEmpty的开销非常大,因为这需要临时获得这个无序组的所有锁。


4)BlockingCollection:实现


System.Collections.Concurrent.IProducerConsumerCollection<T> 的线程安全集合,提供阻塞和限制功能


主要方法及属性:


   BlockingCollection(int boundedCapacity); boundedCapacity表示集合限制大小。

   CompleteAdding();将BlockingCollection实例标记为不再接受任何添加。

   IsCompleted { get; }此集合是否已标记为已完成添加并且为空。

   GetConsumingEnumerable();从集合中移除并返回移除的元素

   Add(T item);添加元素到集合。

   TryTake(T item, int millisecondsTimeout, CancellationToken cancellationToken);


说明:


   使用BlockingCollection()构造函数实例化BlockingCollection,意味着不设置boundedCapacity,那么boundedCapacity为默认值:int.MaxValue。

   限界:使用BlockingCollection(int boundedCapacity),设置boundedCapacity的值,当集合容量达到这个值得时候,向BlockingCollection添加元素的线程将会被阻塞,直到有元素被删除。


限界功能可控制内存中集合最大大小,这对于需要处理大量元素的时候非常有用。


   默认情况下,BlockingCollection封装了一个ConcurrentQueue。可以在构造函数中指定一个实现了IProducerConsumerCollection接口的并发集合,包括:ConcurrentStack、ConcurrentBag。

   使用此集合包含易于无限制等待的风险,所以使用TryTake更加,因为TryTake提供了超时控制,指定的时间内可以从集合中移除某个项,则为true;否则为 false。


5)ConcurrentDictionary:可由多个线程同时访问的键值对的线程安全集合。


主要方法


   AddOrUpdate(TKey key, TValue addValue, Func<TKey, TValue, TValue> updateValueFactory);如果指定的键尚不存在,则将键/值对添加到 字典中;如果指定的键已存在,则更新字典中的键/值对。

   GetOrAdd(TKey key, TValue value);如果指定的键尚不存在,则将键/值对添加到字典中。

   TryRemove(TKey key, out TValue value);尝试从字典中移除并返回具有指定键的值。

   TryUpdate(TKey key, TValue newValue, TValue comparisonValue);将指定键的现有值与指定值进行比较,如果相等,则用第三个值更新该键。


说明:


   ConcurrentDictionary对于读操作是完全无锁的。当多个任务或线程向其中添加元素或修改数据的时候,ConcurrentDictionary使用细粒度的锁。使用细粒度的锁只会锁定真正需要锁定的部分,而不是整个字典。


6)IProducerConsumerCollection:定义供生产者/消费者用来操作线程安全集合的方法。 此接口提供一个统一的表示(为生产者/消费者集合),从而更高级别抽象如 System.Collections.Concurrent.BlockingCollection<T>可以使用集合作为基础的存储机制。

3.常用模式


1)并行的生产者-消费者模式


定义:


生成者和消费者是此模式中的两类对象模型,消费者依赖于生产者的结果,生产者生成结果的同时,消费者使用结果。

在这里插入图片描述

图1 并行的生产者-消费者模式


说明:


   并发集合用在此模式下非常合适,因为并发集合支持此模式中对象的并行操作。

   若不使用并发集合,那么就要加入同步机制,从而使程序变得比较复杂,难于维护和理解,同时大大降低性能。

   上图为生产者消费者模式示意图,纵轴为时间轴,生成者与消费者的并不在一条时间线上,但二者有交叉,意在表明生成者先产生结果,而后消费者才真正使用了生成者产生的数据。


2)流水线模式


定义:


流水线由多个阶段构成,每个阶段由一系列的生产者和消费者构成。一般来讲前一个阶段是后一个阶段的生成者;依靠相邻两个阶段之间的缓冲区队列,每个阶段可以并发执行。


在这里插入图片描述

图2 并行的流水线模式


说明:


   常使用BlockingCollection<T>作为缓冲罐区队列。

   流水线的速度近似等于流水线最慢阶段的速度。

   上图为流水线模式示意图,前一阶段为后一阶段的生成者,这里展示了最为简单和基本的流水线模式,更复杂的模式可以认为是每个阶段都包括了对数据更多的处理过程。


4 使用方式


仅以ConcurrentBag和BlockingCollection为例,其他的并发集合与之相似。


ConcurrentBag


List<string> list = ......

ConcurrentBag<string> bags = new ConcurrentBag<string>();

Parallel.ForEach(list, (item) =>

{

   //对list中的每个元素进行处理然后,加入bags中

   bags.Add(itemAfter);

});


 


BlockingCollection—生产者消费者模式


public static void Execute()

{

           //调用Invoke,使得生产者任务和消费者任务并行执行

           //Producer方法和Customer方法在Invoke中的参数顺序任意,不论何种顺序都会获得正确的结果

           Parallel.Invoke(()=>Customer(),()=>Producer());

           Console.WriteLine(string.Join(",",customerColl));

}


//生产者集合

private static BlockingCollection<int> producerColl = new BlockingCollection<int>();

//消费者集合

private static BlockingCollection<string> customerColl = new BlockingCollection<string>();


public static void Producer()

{

           //循环将数据加入生成者集合

           for (int i = 0; i < 100; i++)

           {

               producerColl.Add(i);

           }


           //设置信号,表明不在向生产者集合中加入新数据

           //可以设置更加复杂的通知形式,比如数据量达到一定值且其中的数据满足某一条件时就设置完成添加

           producerColl.CompleteAdding();

}


public static void Customer()

{

           //调用IsCompleted方法,判断生产者集合是否在添加数据,是否还有未"消费"的数据

           //注意不要使用IsAddingCompleted,IsAddingCompleted只表明集合标记为已完成添加,而不能说明其为空

           //而IsCompleted为ture时,那么IsAddingCompleted为ture且集合为空

           while (!producerColl.IsCompleted)

           {

               //调用Take或TryTake "消费"数据,消费一个,移除一个

               //TryAdd的好处是提供超时机制

               customerColl.Add(string.Format("消费:{0}", producerColl.Take()));

           }

}


 


C# .net 集合-并发处理(List<t>集合换成ConcurrentQueue、ConcurrentDictionary )

背景


List集合,数组Int[],String[] ……,Dictory字典等等。但是这些列表、集合和数组的线程都不是安全的,不能接受并发请求。例如:


namespace Spider

{

   class Program

   {


       private static List<Product> _Products { get; set; }

       static void Main(string[] args)

       {


           _Products = new List<Product>();

           Task t1 = Task.Factory.StartNew(() =>

           {

               AddProducts();

           });

           Task t2 = Task.Factory.StartNew(() =>

           {

               AddProducts();

           });

           Task t3 = Task.Factory.StartNew(() =>

           {

               AddProducts();

           });


           Task.WaitAll(t1, t2, t3);//同步执行

           Console.WriteLine(_Products.Count);

           Console.ReadLine();

       }


       static void AddProducts()

       {

           Parallel.For(0, 1000, (i) =>

           {

               Product product = new Product();

               product.Name = "name" + i;

               product.Category = "Category" + i;

               product.SellPrice = i;

               _Products.Add(product);

           });


       }

   }


   class Product

   {

       public string Name { get; set; }

       public string Category { get; set; }

       public int SellPrice { get; set; }

   }

}


 


上图理论上是会显示3000行数据,但是实际上显示了2934个由于是list集合并不能保证线程安全,所以导致数据丢失,无法保证数据的一致性。这个时候我们常用的解决方法就是加锁(lock)lock 确保当一个线程位于代码的临界区时,另一个线程不进入临界区。如果其他线程试图进入锁定的代码,则它将一直等待(即被阻止),直到该对象被释放。


  static void Main(string[] args)

       {


           _Products = new List<Product>();

           Stopwatch swTask = new Stopwatch();//用于统计时间消耗的

           swTask.Start();

           Task t1 = Task.Factory.StartNew(() =>

           {

               AddProducts();

           });

           Task t2 = Task.Factory.StartNew(() =>

           {

               AddProducts();

           });

           Task t3 = Task.Factory.StartNew(() =>

           {

               AddProducts();

           });


           Task.WaitAll(t1, t2, t3);//同步执行

           swTask.Stop();

           Console.WriteLine("List<Product> 当前数据量为:" + _Products.Count);

           Console.WriteLine("List<Product> 执行时间为:" + swTask.ElapsedMilliseconds);


           Console.ReadLine();

       }


       static void AddProducts()

       {

           Parallel.For(0, 1000000, (i) =>

           {

               lock (_Products)

               {


                   Product product = new Product();

                   product.Name = "name" + i;

                   product.Category = "Category" + i;

                   product.SellPrice = i;

                   _Products.Add(product);

               }

                //lock 是Monitor的语法糖

               //Monitor.Enter(_Products);

               //Product product = new Product();

               //product.Name = "name" + i;

               //product.Category = "Category" + i;

               //product.SellPrice = i;

               //_Products.Add(product);

               //Monitor.Exit(_Products);

           });


       }


  


List<Product> 当前数据量为:3000000

List<Product> 执行时间为:4638




这个时候就显示3000条


但是锁的引入,带来了一定的开销和性能的损耗,并降低了程序的扩展性,而且还会有死锁的发生(虽说概率不大,但也不能不防啊),因此:使用LOCK进行并发编程显然不太适用。


还好,微软一直在更新自己的东西:


.NET Framework 4提供了新的线程安全和扩展的并发集合,它们能够解决潜在的死锁问题和竞争条件问题,因此在很多复杂的情形下它们能够使得并行代码更容易编写,这些集合尽可能减少使用锁的次数,从而使得在大部分情形下能够优化为最佳性能,不会产生不必要的同步开销。


需要注意的是:在串行代码中使用并发集合是没有意义的,因为它们会增加无谓的开销。


在.NET Framework4.0以后的版本中提供了命名空间:System.Collections.Concurrent 来解决线程安全问题,通过这个命名空间,能访问以下为并发做好了准备的集合。


   BlockingCollection 与经典的阻塞队列数据结构类似,能够适用于多个任务添加和删除数据,提供阻塞和限界能力。

   ConcurrentBag 提供对象的线程安全的无序集合

   ConcurrentDictionary 提供可有多个线程同时访问的键值对的线程安全集合

   ConcurrentQueue 提供线程安全的先进先出集合

   ConcurrentStack 提供线程安全的后进先出集合


这些集合通过使用比较并交换和内存屏障等技术,避免使用典型的互斥重量级的锁,从而保证线程安全和性能。


static void Main(string[] args)

       {


           _Products = new ConcurrentQueue<Product>();

           Stopwatch swTask = new Stopwatch();//用于统计时间消耗的

           swTask.Start();

           Task t1 = Task.Factory.StartNew(() =>

           {

               AddProducts();

           });

           Task t2 = Task.Factory.StartNew(() =>

           {

               AddProducts();

           });

           Task t3 = Task.Factory.StartNew(() =>

           {

               AddProducts();

           });


           Task.WaitAll(t1, t2, t3);//同步执行

           swTask.Stop();

           Console.WriteLine("List<Product> 当前数据量为:" + _Products.Count);

           Console.WriteLine("List<Product> 执行时间为:" + swTask.ElapsedMilliseconds);


           Console.ReadLine();

       }


       static void AddProducts()

       {

           Parallel.For(0, 1000000, (i) =>

           {


                   Product product = new Product();

                   product.Name = "name" + i;

                   product.Category = "Category" + i;

                   product.SellPrice = i;

                   _Products.Enqueue(product);


           });


       }

   }




打印


List 当前数据量为:3000000

List 执行时间为:2911


 


得出结果