发布:2021/3/5 17:37:31作者:管理员 来源:本站 浏览次数:862
使用场景描述:
网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求。当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些。特别是交易类的数据,这种操作更是需要避免重复发送请求。另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定。
其他需求描述:
这类请求一般存在时间范围和高并发的特点,就是短时间内会出现重复的请求,因此对模块需要支持高并发性。
技术实现:
对请求的业务内容进行MD5摘要,并且将MD5摘要存储到缓存中,每个请求数据都通过这个一个公共的调用的方法进行判断。
代码实现:
公共调用代码 UniqueCheck 采用单例模式创建唯一对象,便于在多线程调用的时候,只访问一个统一的缓存库
/*
* volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
* 它是被设计用来修饰被不同线程访问和修改的变量。
* 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
*/
private
static
readonly object lockHelper =
new
object();
private
volatile
static
UniqueCheck _instance;
/// <summary>
/// 获取单一实例
/// </summary>
/// <returns></returns>
public
static
UniqueCheck GetInstance()
{
if
(_instance ==
null
)
{
lock (lockHelper)
{
if
(_instance ==
null
)
_instance =
new
UniqueCheck();
}
}
return
_instance;
}
这里需要注意volatile的修饰符,在实际测试过程中,如果没有此修饰符,在高并发的情况下会出现报错。
自定义一个可以进行并发处理队列,代码如下:ConcurrentLinkedQueue
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace PackgeUniqueCheck
{
/// <summary>
/// 非加锁并发队列,处理100个并发数以内
/// </summary>
/// <typeparam name="T"></typeparam>
public
class
ConcurrentLinkedQueue<T>
{
private
class
Node<K>
{
internal K Item;
internal Node<K> Next;
public
Node(K item, Node<K> next)
{
this
.Item = item;
this
.Next = next;
}
}
private
Node<T> _head;
private
Node<T> _tail;
public
ConcurrentLinkedQueue()
{
_head =
new
Node<T>(
default
(T),
null
);
_tail = _head;
}
public
bool IsEmpty
{
get {
return
(_head.Next ==
null
); }
}
/// <summary>
/// 进入队列
/// </summary>
/// <param name="item"></param>
public
void
Enqueue(T item)
{
Node<T> newNode =
new
Node<T>(item,
null
);
while
(
true
)
{
Node<T> curTail = _tail;
Node<T> residue = curTail.Next;
//判断_tail是否被其他process改变
if
(curTail == _tail)
{
//A 有其他process执行C成功,_tail应该指向新的节点
if
(residue ==
null
)
{
//C 其他process改变了tail节点,需要重新取tail节点
if
(Interlocked.CompareExchange<Node<T>>(
ref curTail.Next, newNode, residue) == residue)
{
//D 尝试修改tail
Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);
return
;
}
}
else
{
//B 帮助其他线程完成D操作
Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);
}
}
}
}
/// <summary>
/// 队列取数据
/// </summary>
/// <param name="result"></param>
/// <returns></returns>
public
bool TryDequeue(out T result)
{
Node<T> curHead;
Node<T> curTail;
Node<T> next;
while
(
true
)
{
curHead = _head;
curTail = _tail;
next = curHead.Next;
if
(curHead == _head)
{
if
(next ==
null
)
//Queue为空
{
result =
default
(T);
return
false
;
}
if
(curHead == curTail)
//Queue处于Enqueue第一个node的过程中
{
//尝试帮助其他Process完成操作
Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);
}
else
{
//取next.Item必须放到CAS之前
result = next.Item;
//如果_head没有发生改变,则将_head指向next并退出
if
(Interlocked.CompareExchange<Node<T>>(ref _head,
next, curHead) == curHead)
break
;
}
}
}
return
true
;
}
/// <summary>
/// 尝试获取最后一个对象
/// </summary>
/// <param name="result"></param>
/// <returns></returns>
public
bool TryGetTail(out T result)
{
result =
default
(T);
if
(_tail ==
null
)
{
return
false
;
}
result = _tail.Item;
return
true
;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Collections;
namespace PackgeUniqueCheck
{
public
class
UniqueCheck
{
/*
* volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
* 它是被设计用来修饰被不同线程访问和修改的变量。
* 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
*/
private
static
readonly object lockHelper =
new
object();
private
volatile
static
UniqueCheck _instance;
/// <summary>
/// 获取单一实例
/// </summary>
/// <returns></returns>
public
static
UniqueCheck GetInstance()
{
if
(_instance ==
null
)
{
lock (lockHelper)
{
if
(_instance ==
null
)
_instance =
new
UniqueCheck();
}
}
return
_instance;
}
private
UniqueCheck()
{
//创建一个线程安全的哈希表,作为字典缓存
_DataKey = Hashtable.Synchronized(
new
Hashtable());
Queue myqueue =
new
Queue();
_DataQueue = Queue.Synchronized(myqueue);
_Myqueue =
new
ConcurrentLinkedQueue<string>();
_Timer =
new
Thread(DoTicket);
_Timer.Start();
}
#region 公共属性设置
/// <summary>
/// 设定定时线程的休眠时间长度:默认为1分钟
/// 时间范围:1-7200000,值为1毫秒到2小时
/// </summary>
/// <param name="value"></param>
public
void
SetTimeSpan(
int
value)
{
if
(value >
0
&& value <=
7200000
)
{
_TimeSpan = value;
}
}
/// <summary>
/// 设定缓存Cache中的最大记录条数
/// 值范围:1-5000000,1到500万
/// </summary>
/// <param name="value"></param>
public
void
SetCacheMaxNum(
int
value)
{
if
(value >
0
&& value <=
5000000
)
{
_CacheMaxNum = value;
}
}
/// <summary>
/// 设置是否在控制台中显示日志
/// </summary>
/// <param name="value"></param>
public
void
SetIsShowMsg(bool value)
{
Helper.IsShowMsg = value;
}
/// <summary>
/// 线程请求阻塞增量
/// 值范围:1-CacheMaxNum,建议设置为缓存最大值的10%-20%
/// </summary>
/// <param name="value"></param>
public
void
SetBlockNumExt(
int
value)
{
if
(value >
0
&& value <= _CacheMaxNum)
{
_BlockNumExt = value;
}
}
/// <summary>
/// 请求阻塞时间
/// 值范围:1-max,根据阻塞增量设置请求阻塞时间
/// 阻塞时间越长,阻塞增量可以设置越大,但是请求实时响应就越差
/// </summary>
/// <param name="value"></param>
public
void
SetBlockSpanTime(
int
value)
{
if
(value >
0
)
{
_BlockSpanTime = value;
}
}
#endregion
#region 私有变量
/// <summary>
/// 内部运行线程
/// </summary>
private
Thread _runner =
null
;
/// <summary>
/// 可处理高并发的队列
/// </summary>
private
ConcurrentLinkedQueue<string> _Myqueue =
null
;
/// <summary>
/// 唯一内容的时间健值对
/// </summary>
private
Hashtable _DataKey =
null
;
/// <summary>
/// 内容时间队列
/// </summary>
private
Queue _DataQueue =
null
;
/// <summary>
/// 定时线程的休眠时间长度:默认为1分钟
/// </summary>
private
int
_TimeSpan =
3000
;
/// <summary>
/// 定时计时器线程
/// </summary>
private
Thread _Timer =
null
;
/// <summary>
/// 缓存Cache中的最大记录条数
/// </summary>
private
int
_CacheMaxNum =
500000
;
/// <summary>
/// 线程请求阻塞增量
/// </summary>
private
int
_BlockNumExt =
10000
;
/// <summary>
/// 请求阻塞时间
/// </summary>
private
int
_BlockSpanTime =
100
;
#endregion
#region 私有方法
private
void
StartRun()
{
_runner =
new
Thread(DoAction);
_runner.Start();
Helper.ShowMsg(
"内部线程启动成功!"
);
}
private
string GetItem()
{
string tp = string.Empty;
bool result = _Myqueue.TryDequeue(out tp);
return
tp;
}
/// <summary>
/// 执行循环操作
/// </summary>
private
void
DoAction()
{
while
(
true
)
{
while
(!_Myqueue.IsEmpty)
{
string item = GetItem();
_DataQueue.Enqueue(item);
if
(!_DataKey.ContainsKey(item))
{
_DataKey.Add(item, DateTime.Now);
}
}
//Helper.ShowMsg("当前数组已经为空,处理线程进入休眠状态...");
Thread.Sleep(
2
);
}
}
/// <summary>
/// 执行定时器的动作
/// </summary>
private
void
DoTicket()
{
while
(
true
)
{
Helper.ShowMsg(
"当前数据队列个数:"
+ _DataQueue.Count.ToString());
if
(_DataQueue.Count > _CacheMaxNum)
{
while
(
true
)
{
Helper.ShowMsg(string.Format(
"当前队列数:{0},已经超出最大长度:{1},开始进行清理操作..."
, _DataQueue.Count, _CacheMaxNum.ToString()));
string item = _DataQueue.Dequeue().ToString();
if
(!string.IsNullOrEmpty(item))
{
if
(_DataKey.ContainsKey(item))
{
_DataKey.Remove(item);
}
if
(_DataQueue.Count <= _CacheMaxNum)
{
Helper.ShowMsg(
"清理完成,开始休眠清理线程..."
);
break
;
}
}
}
}
Thread.Sleep(_TimeSpan);
}
}
/// <summary>
/// 线程进行睡眠等待
/// 如果当前负载压力大大超出了线程的处理能力
/// 那么需要进行延时调用
/// </summary>
private
void
BlockThread()
{
if
(_DataQueue.Count > _CacheMaxNum + _BlockNumExt)
{
Thread.Sleep(_BlockSpanTime);
}
}
#endregion
#region 公共方法
/// <summary>
/// 开启服务线程
/// </summary>
public
void
Start()
{
if
(_runner ==
null
)
{
StartRun();
}
else
{
if
(_runner.IsAlive ==
false
)
{
StartRun();
}
}
}
/// <summary>
/// 关闭服务线程
/// </summary>
public
void
Stop()
{
if
(_runner !=
null
)
{
_runner.Abort();
_runner =
null
;
}
}
/// <summary>
/// 添加内容信息
/// </summary>
/// <param name="item">内容信息</param>
/// <returns>true:缓存中不包含此值,队列添加成功,false:缓存中包含此值,队列添加失败</returns>
public
bool AddItem(string item)
{
BlockThread();
item = Helper.MakeMd5(item);
if
(_DataKey.ContainsKey(item))
{
return
false
;
}
else
{
_Myqueue.Enqueue(item);
return
true
;
}
}
/// <summary>
/// 判断内容信息是否已经存在
/// </summary>
/// <param name="item">内容信息</param>
/// <returns>true:信息已经存在于缓存中,false:信息不存在于缓存中</returns>
public
bool CheckItem(string item)
{
item = Helper.MakeMd5(item);
return
_DataKey.ContainsKey(item);
}
#endregion
}
}
模拟测试代码:
private
static
string _example = Guid.NewGuid().ToString();
private
static
UniqueCheck _uck =
null
;
static
void
Main(string[] args)
{
_uck = UniqueCheck.GetInstance();
_uck.Start();
_uck.SetIsShowMsg(
false
);
_uck.SetCacheMaxNum(
20000000
);
_uck.SetBlockNumExt(
1000000
);
_uck.SetTimeSpan(
6000
);
_uck.AddItem(_example);
Thread[] threads =
new
Thread[
20
];
for
(
int
i =
0
; i <
20
; i++)
{
threads[i] =
new
Thread(AddInfo);
threads[i].Start();
}
Thread checkthread =
new
Thread(CheckInfo);
checkthread.Start();
string value = Console.ReadLine();
checkthread.Abort();
for
(
int
i =
0
; i <
50
; i++)
{
threads[i].Abort();
}
_uck.Stop();
}
static
void
AddInfo()
{
while
(
true
)
{
_uck.AddItem(Guid.NewGuid().ToString());
}
}
static
void
CheckInfo()
{
while
(
true
)
{
Console.WriteLine(
"开始时间:{0}..."
, DateTime.Now.ToString(
"yyyy-MM-dd HH:mm:ss.ffff"
));
Console.WriteLine(
"插入结果:{0}"
, _uck.AddItem(_example));
Console.WriteLine(
"结束时间:{0}"
, DateTime.Now.ToString(
"yyyy-MM-dd HH:mm:ss.ffff"
));
//调整进程休眠时间,可以测试高并发的情况
//Thread.Sleep(1000);
}
}
测试截图:
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对脚本之家的支持。
© Copyright 2014 - 2024 柏港建站平台 ejk5.com. 渝ICP备16000791号-4