隐藏

C#请求唯一性校验支持高并发的实现方法

发布:2021/11/6 9:54:53作者:管理员 来源:本站 浏览次数:787

使用场景描述:

网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求。当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些。特别是交易类的数据,这种操作更是需要避免重复发送请求。另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定。

其他需求描述:

这类请求一般存在时间范围和高并发的特点,就是短时间内会出现重复的请求,因此对模块需要支持高并发性。

技术实现:

对请求的业务内容进行MD5摘要,并且将MD5摘要存储到缓存中,每个请求数据都通过这个一个公共的调用的方法进行判断。

代码实现:

公共调用代码 UniqueCheck 采用单例模式创建唯一对象,便于在多线程调用的时候,只访问一个统一的缓存库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
   * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
   * 它是被设计用来修饰被不同线程访问和修改的变量。
   * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
   */
  private static readonly object lockHelper = new object();
  
  private volatile static UniqueCheck _instance; 
  
  /// <summary>
  /// 获取单一实例
  /// </summary>
  /// <returns></returns>
  public static UniqueCheck GetInstance()
  {
   if (_instance == null)
   {
    lock (lockHelper)
    {
     if (_instance == null)
      _instance = new UniqueCheck();
    }
   }
   return _instance;
  }

这里需要注意volatile的修饰符,在实际测试过程中,如果没有此修饰符,在高并发的情况下会出现报错。

自定义一个可以进行并发处理队列,代码如下:ConcurrentLinkedQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
 
namespace PackgeUniqueCheck
{
 /// <summary>
 /// 非加锁并发队列,处理100个并发数以内
 /// </summary>
 /// <typeparam name="T"></typeparam>
 public class ConcurrentLinkedQueue<T>
 {
  private class Node<K>
  {
   internal K Item;
   internal Node<K> Next;
 
   public Node(K item, Node<K> next)
   {
    this.Item = item;
    this.Next = next;
   }
  }
 
  private Node<T> _head;
  private Node<T> _tail;
 
  public ConcurrentLinkedQueue()
  {
   _head = new Node<T>(default(T), null);
   _tail = _head;
  }
 
  public bool IsEmpty
  {
   get { return (_head.Next == null); }
  }
  /// <summary>
  /// 进入队列
  /// </summary>
  /// <param name="item"></param>
  public void Enqueue(T item)
  {
   Node<T> newNode = new Node<T>(item, null);
   while (true)
   {
    Node<T> curTail = _tail;
    Node<T> residue = curTail.Next;
 
    //判断_tail是否被其他process改变
    if (curTail == _tail)
    {
     //A 有其他process执行C成功,_tail应该指向新的节点
     if (residue == null)
     {
      //C 其他process改变了tail节点,需要重新取tail节点
      if (Interlocked.CompareExchange<Node<T>>(
       ref curTail.Next, newNode, residue) == residue)
      {
       //D 尝试修改tail
       Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);
       return;
      }
     }
     else
     {
      //B 帮助其他线程完成D操作
      Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);
     }
    }
   }
  }
  /// <summary>
  /// 队列取数据
  /// </summary>
  /// <param name="result"></param>
  /// <returns></returns>
  public bool TryDequeue(out T result)
  {
   Node<T> curHead;
   Node<T> curTail;
   Node<T> next;
   while (true)
   {
    curHead = _head;
    curTail = _tail;
    next = curHead.Next;
    if (curHead == _head)
    {
     if (next == null) //Queue为空
     {
      result = default(T);
      return false;
     }
     if (curHead == curTail) //Queue处于Enqueue第一个node的过程中
     {
      //尝试帮助其他Process完成操作
      Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);
     }
     else
     {
      //取next.Item必须放到CAS之前
      result = next.Item;
      //如果_head没有发生改变,则将_head指向next并退出
      if (Interlocked.CompareExchange<Node<T>>(ref _head,
       next, curHead) == curHead)
       break;
     }
    }
   }
   return true;
  }
  /// <summary>
  /// 尝试获取最后一个对象
  /// </summary>
  /// <param name="result"></param>
  /// <returns></returns>
  public bool TryGetTail(out T result)
  {
   result = default(T);
   if (_tail == null)
   {
    return false;
   }
   result = _tail.Item;
   return true;
  }
 }
}

虽然是一个非常简单的唯一性校验逻辑,但是要做到高效率,高并发支持,高可靠性,以及低内存占用,需要实现这样的需求,需要做细致的模拟测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Collections;
 
namespace PackgeUniqueCheck
{
 public class UniqueCheck
 {
  /*
   * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
   * 它是被设计用来修饰被不同线程访问和修改的变量。
   * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
   */
  private static readonly object lockHelper = new object();
 
  private volatile static UniqueCheck _instance; 
 
  /// <summary>
  /// 获取单一实例
  /// </summary>
  /// <returns></returns>
  public static UniqueCheck GetInstance()
  {
   if (_instance == null)
   {
    lock (lockHelper)
    {
     if (_instance == null)
      _instance = new UniqueCheck();
    }
   }
   return _instance;
  }
 
  private UniqueCheck()
  {
   //创建一个线程安全的哈希表,作为字典缓存
   _DataKey = Hashtable.Synchronized(new Hashtable());
   Queue myqueue = new Queue();
   _DataQueue = Queue.Synchronized(myqueue);
   _Myqueue = new ConcurrentLinkedQueue<string>();
   _Timer = new Thread(DoTicket);
   _Timer.Start();
  }
 
  #region 公共属性设置
  /// <summary>
  /// 设定定时线程的休眠时间长度:默认为1分钟
  /// 时间范围:1-7200000,值为1毫秒到2小时
  /// </summary>
  /// <param name="value"></param>
  public void SetTimeSpan(int value)
  {
   if (value > 0&& value <=7200000)
   {
    _TimeSpan = value;
   }
  }
  /// <summary>
  /// 设定缓存Cache中的最大记录条数
  /// 值范围:1-5000000,1到500万
  /// </summary>
  /// <param name="value"></param>
  public void SetCacheMaxNum(int value)
  {
   if (value > 0 && value <= 5000000)
   {
    _CacheMaxNum = value;
   }
  }
  /// <summary>
  /// 设置是否在控制台中显示日志
  /// </summary>
  /// <param name="value"></param>
  public void SetIsShowMsg(bool value)
  {
   Helper.IsShowMsg = value;
  }
  /// <summary>
  /// 线程请求阻塞增量
  /// 值范围:1-CacheMaxNum,建议设置为缓存最大值的10%-20%
  /// </summary>
  /// <param name="value"></param>
  public void SetBlockNumExt(int value)
  {
   if (value > 0 && value <= _CacheMaxNum)
   {
    _BlockNumExt = value;
   }
  }
  /// <summary>
  /// 请求阻塞时间
  /// 值范围:1-max,根据阻塞增量设置请求阻塞时间
  /// 阻塞时间越长,阻塞增量可以设置越大,但是请求实时响应就越差
  /// </summary>
  /// <param name="value"></param>
  public void SetBlockSpanTime(int value)
  {
   if (value > 0)
   {
    _BlockSpanTime = value;
   }
  }
  #endregion
 
  #region 私有变量
  /// <summary>
  /// 内部运行线程
  /// </summary>
  private Thread _runner = null;
  /// <summary>
  /// 可处理高并发的队列
  /// </summary>
  private ConcurrentLinkedQueue<string> _Myqueue = null;
  /// <summary>
  /// 唯一内容的时间健值对
  /// </summary>
  private Hashtable _DataKey = null;
  /// <summary>
  /// 内容时间队列
  /// </summary>
  private Queue _DataQueue = null;
  /// <summary>
  /// 定时线程的休眠时间长度:默认为1分钟
  /// </summary>
  private int _TimeSpan = 3000;
  /// <summary>
  /// 定时计时器线程
  /// </summary>
  private Thread _Timer = null;
  /// <summary>
  /// 缓存Cache中的最大记录条数
  /// </summary>
  private int _CacheMaxNum = 500000;
  /// <summary>
  /// 线程请求阻塞增量
  /// </summary>
  private int _BlockNumExt = 10000;
  /// <summary>
  /// 请求阻塞时间
  /// </summary>
  private int _BlockSpanTime = 100;
  #endregion
 
  #region 私有方法
  private void StartRun()
  {
   _runner = new Thread(DoAction);
   _runner.Start();
   Helper.ShowMsg("内部线程启动成功!");
  }
 
  private string GetItem()
  {
   string tp = string.Empty;
   bool result = _Myqueue.TryDequeue(out tp);
   return tp;
  }
  /// <summary>
  /// 执行循环操作
  /// </summary>
  private void DoAction()
  {
   while (true)
   {
    while (!_Myqueue.IsEmpty)
    {
     string item = GetItem();
     _DataQueue.Enqueue(item);
     if (!_DataKey.ContainsKey(item))
     {
      _DataKey.Add(item, DateTime.Now);
     }
    }
    //Helper.ShowMsg("当前数组已经为空,处理线程进入休眠状态...");
    Thread.Sleep(2);
   }
  }
  /// <summary>
  /// 执行定时器的动作
  /// </summary>
  private void DoTicket()
  {
   while (true)
   {
    Helper.ShowMsg("当前数据队列个数:" + _DataQueue.Count.ToString());
    if (_DataQueue.Count > _CacheMaxNum)
    {
     while (true)
     {
      Helper.ShowMsg(string.Format("当前队列数:{0},已经超出最大长度:{1},开始进行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString()));
      string item = _DataQueue.Dequeue().ToString();
      if (!string.IsNullOrEmpty(item))
      {
       if (_DataKey.ContainsKey(item))
       {
        _DataKey.Remove(item);
       }
       if (_DataQueue.Count <= _CacheMaxNum)
       {
        Helper.ShowMsg("清理完成,开始休眠清理线程...");
        break;
       }
      }
     }
    }
    Thread.Sleep(_TimeSpan);
   }
  }
 
  /// <summary>
  /// 线程进行睡眠等待
  /// 如果当前负载压力大大超出了线程的处理能力
  /// 那么需要进行延时调用
  /// </summary>
  private void BlockThread()
  {
   if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt)
   {
    Thread.Sleep(_BlockSpanTime);
   }
  }
  #endregion
 
  #region 公共方法
  /// <summary>
  /// 开启服务线程
  /// </summary>
  public void Start()
  {
   if (_runner == null)
   {
    StartRun();
   }
   else
   {
    if (_runner.IsAlive == false)
    {
     StartRun();
    }
   }
 
  }
  /// <summary>
  /// 关闭服务线程
  /// </summary>
  public void Stop()
  {
   if (_runner != null)
   {
    _runner.Abort();
    _runner = null;
   }
  }
 
  /// <summary>
  /// 添加内容信息
  /// </summary>
  /// <param name="item">内容信息</param>
  /// <returns>true:缓存中不包含此值,队列添加成功,false:缓存中包含此值,队列添加失败</returns>
  public bool AddItem(string item)
  {
   BlockThread();
   item = Helper.MakeMd5(item);
   if (_DataKey.ContainsKey(item))
   {
    return false;
   }
   else
   {
    _Myqueue.Enqueue(item);
    return true;
   }
  }
  /// <summary>
  /// 判断内容信息是否已经存在
  /// </summary>
  /// <param name="item">内容信息</param>
  /// <returns>true:信息已经存在于缓存中,false:信息不存在于缓存中</returns>
  public bool CheckItem(string item)
  {
   item = Helper.MakeMd5(item);
   return _DataKey.ContainsKey(item);
  }
  #endregion
 
 }
}

模拟测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private static string _example = Guid.NewGuid().ToString();
 
  private static UniqueCheck _uck = null;
 
  static void Main(string[] args)
  {
   _uck = UniqueCheck.GetInstance();
   _uck.Start();
   _uck.SetIsShowMsg(false);
   _uck.SetCacheMaxNum(20000000);
   _uck.SetBlockNumExt(1000000);
   _uck.SetTimeSpan(6000);
 
   _uck.AddItem(_example);
   Thread[] threads = new Thread[20];
 
   for (int i = 0; i < 20; i++)
   {
    threads[i] = new Thread(AddInfo);
    threads[i].Start();
   }
 
   Thread checkthread = new Thread(CheckInfo);
   checkthread.Start();
 
   string value = Console.ReadLine();
 
   checkthread.Abort();
   for (int i = 0; i < 50; i++)
   {
    threads[i].Abort();
   }
   _uck.Stop();
  }
 
  static void AddInfo()
  {
   while (true)
   {
    _uck.AddItem(Guid.NewGuid().ToString());
   }
  }
 
  static void CheckInfo()
  {
   while (true)
   {
    Console.WriteLine("开始时间:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
    Console.WriteLine("插入结果:{0}", _uck.AddItem(_example));
    Console.WriteLine("结束时间:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
//调整进程休眠时间,可以测试高并发的情况
    //Thread.Sleep(1000);
   }
    
  }

测试截图:

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对脚本之家的支持。