隐藏

asp.net c# 通过消息队列处理高并发请求

发布:2021/3/5 17:51:46作者:管理员 来源:本站 浏览次数:1180

网站面对高并发的情况下,除了增加硬件, 优化程序提高以响应速度外,还可以通过并行改串行的思路来解决。这种思想常见的实践方式就是数据库锁和消息队列的方式。这种方式的缺点是需要排队,响应速度慢,优点是节省成本。

演示一下现象

创建一个在售产品表

	
  1. CREATE TABLE [dbo].[product](
  2. [id] [int] NOT NULL,--唯一主键
  3. [name] [nvarchar](50) NULL,--产品名称
  4. [status] [int] NULL ,--0未售出 1 售出 默认为0
  5. [username] [nvarchar](50) NULL--下单用户
  6. )

添加一条记录

	
  1. insert into product(id,name,status,username) values(1,'小米手机',0,null)

创建一个抢票程序

	
  1. public ContentResult PlaceOrder(string userName)
  2. {
  3. using (RuanMou2020Entities db = new RuanMou2020Entities())
  4. {
  5. var product = db.product.Where<product>(p => p.status== ).FirstOrDefault();
  6. if (product.status == )
  7. {
  8. return Content("失败,产品已经被卖光");
  9. }
  10. else
  11. {
  12. //模拟数据库慢造成并发问题
  13. Thread.Sleep();
  14. product.status = ; product.username= userName; db.SaveChanges(); return Content("成功购买"); } } }

如果我们在5秒内一次访问以下两个地址,那么返回的结果都是成功购买且数据表中的username是lisi。

/controller/PlaceOrder?username=zhangsan

/controller/PlaceOrder?username=lisi

这就是并发带来的问题。

第一阶段,利用线程锁简单粗暴

Web程序是多线程的,那我们把他在容易出现并发的地方加一把锁就可以了,如下图处理方式。

	
  1. private static object _lock = new object();
  2.  
  3. public ContentResult PlaceOrder(string userName)
  4. {
  5. using (RuanMou2020Entities db = new RuanMou2020Entities())
  6. {
  7. lock (_lock)
  8. {
  9. var product = db.product.Where<product>(p => p.status == ).FirstOrDefault();
  10. if (product.status == )
  11. {
  12. return Content("失败,产品已经被卖光");
  13. }
  14. else
  15. {
  16. //模拟数据库慢造成并发问题
  17. Thread.Sleep();
  18. product.status = ;
  19. product.username = userName;
  20. db.SaveChanges();
  21. return Content("成功购买");
  22. }
  23. }
  24. }
  25. }

这样每一个请求都是依次执行,不会出现并发问题了。

优点:解决了并发的问题。

缺点:效率太慢,用户体验性太差,不适合大数据量场景。

第二阶段,拉消息队列,通过生产者,消费者的模式

1,创建订单提交入口(生产者)

	
  1. public class HomeController : Controller
  2. {
  3.  
  4. /// <summary>
  5. /// 接受订单提交(生产者)
  6. /// </summary>
  7. /// <returns></returns>
  8. public ContentResult PlaceOrderQueen(string userName)
  9. {
  10. //直接将请求写入到订单队列
  11. OrderConsumer.TicketOrders.Enqueue(userName);
  12. return Content("wait");
  13. }
  14.  
  15. /// <summary>
  16. /// 查询订单结果
  17. /// </summary>
  18. /// <returns></returns>
  19. public ContentResult PlaceOrderQueenResult(string userName)
  20. {
  21. var rel = OrderConsumer.OrderResults.Where(p => p.userName == userName).FirstOrDefault();
  22. if (rel == null)
  23. {
  24. return Content("还在排队中");
  25. }
  26. else
  27. {
  28. return Content(rel.Result.ToString());
  29. }
  30. }
  31. }

2,创建订单处理者(消费者)

	
  1. /// <summary>
  2. /// 订单的处理者(消费者)
  3. /// </summary>
  4. public class OrderConsumer
  5. {
  6. /// <summary>
  7. /// 订票的消息队列
  8. /// </summary>
  9. public static ConcurrentQueue<string> TicketOrders = new ConcurrentQueue<string>();
  10. /// <summary>
  11. /// 订单结果消息队列
  12. /// </summary>
  13. public static List<OrderResult> OrderResults = new List<OrderResult>();
  14. /// <summary>
  15. /// 订单处理
  16. /// </summary>
  17. public static void StartTicketTask()
  18. {
  19. string userName = null;
  20. while (true)
  21. {
  22. //如果没有订单任务就休息1秒钟
  23. if (!TicketOrders.TryDequeue(out userName))
  24. {
  25. Thread.Sleep();
  26. continue;
  27. }
  28. //执行真实的业务逻辑(如插入数据库)
  29. bool rel = new TicketHelper().PlaceOrderDataBase(userName);
  30. //将执行结果写入结果集合
  31. OrderResults.Add(new OrderResult() { Result = rel, userName = userName });
  32. }
  33. }
  34. }

3,创建订单业务的实际执行者

	
  1. /// <summary>
  2. /// 订单业务的实际处理者
  3. /// </summary>
  4. public class TicketHelper
  5. {
  6. /// <summary>
  7. /// 实际库存标识
  8. /// </summary>
  9. private bool hasStock = true;
  10. /// <summary>
  11. /// 执行一个订单到数据库
  12. /// </summary>
  13. /// <returns></returns>
  14. public bool PlaceOrderDataBase(string userName)
  15. {
  16. //如果没有了库存,则直接返回false,防止频繁读库
  17. if (!hasStock)
  18. {
  19. return hasStock;
  20. }
  21. using (RuanMou2020Entities db = new RuanMou2020Entities())
  22. {
  23. var product = db.product.Where(p => p.status == ).FirstOrDefault();
  24. if (product == null)
  25. {
  26. hasStock = false;
  27. return false;
  28. }
  29. else
  30. {
  31. Thread.Sleep();//模拟数据库的效率比较慢,执行插入时间比较久
  32. product.status = ;
  33. product.username = userName;
  34. db.SaveChanges();
  35. return true;
  36. }
  37. }
  38. }
  39. }
  40. /// <summary>
  41. /// 订单处理结果实体
  42. /// </summary>
  43. public class OrderResult
  44. {
  45. public string userName { get; set; }
  46. public bool Result { get; set; }
  47. }

4,在程序启动前,启动消费者线程

	
  1. protected void Application_Start()
  2. {
  3. AreaRegistration.RegisterAllAreas();
  4. GlobalConfiguration.Configure(WebApiConfig.Register);
  5. FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
  6. RouteConfig.RegisterRoutes(RouteTable.Routes);
  7. BundleConfig.RegisterBundles(BundleTable.Bundles);
  8.  
  9. //在Global的Application_Start事件里单独开启一个消费者线程
  10. Task.Run(OrderConsumer.StartTicketTask);
  11. }

这样程序的运行模式是:用户提交的需求里都会添加到消息队列里去排队处理,程序会依次处理该队列里的内容(当然可以一次取出多条来进行处理,提高效率)。

优点:比上一步快了。

缺点:不够快,而且下单后需要轮询另外一个接口判断是否成功。

第三阶段 反转生产者消费者的角色,把可售产品提前放到队列里,然后让提交的订单来消费队列里的内容

1,创建生产者并且在程序启动前调用其初始化程序

	
  1. public class ProductForSaleManager
  2. {
  3. /// <summary>
  4. /// 待售商品队列
  5. /// </summary>
  6. public static ConcurrentQueue<int> ProductsForSale = new ConcurrentQueue<int>();
  7. /// <summary>
  8. /// 初始化待售商品队列
  9. /// </summary>
  10. public static void Init()
  11. {
  12. using (RuanMou2020Entities db = new RuanMou2020Entities())
  13. {
  14. db.product.Where(p => p.status == ).Select(p => p.id).ToList().ForEach(p =>
  15. {
  16. ProductsForSale.Enqueue(p);
  17. });
  18. }
  19. }
  20. }
	
  1. public class MvcApplication : System.Web.HttpApplication
  2. {
  3. protected void Application_Start()
  4. {
  5. AreaRegistration.RegisterAllAreas();
  6. GlobalConfiguration.Configure(WebApiConfig.Register);
  7. FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
  8. RouteConfig.RegisterRoutes(RouteTable.Routes);
  9. BundleConfig.RegisterBundles(BundleTable.Bundles);
  10.  
  11. //程序启动前,先初始化待售产品消息队列
  12. ProductForSaleManager.Init();
  13. }
  14. }

2,创建消费者

	
  1. public class OrderController : Controller
  2. {
  3. /// <summary>
  4. /// 下订单
  5. /// </summary>
  6. /// <param name="userName">订单提交者</param>
  7. /// <returns></returns>
  8. public async Task<ContentResult> PlaceOrder(string userName)
  9. {
  10. if (ProductForSaleManager.ProductsForSale.TryDequeue(out int pid))
  11. {
  12. await new TicketHelper2().PlaceOrderDataBase(userName, pid);
  13. return Content($"下单成功,对应产品id为:{pid}");
  14. }
  15. else
  16. {
  17. await Task.CompletedTask;
  18. return Content($"商品已经被抢光");
  19. }
  20. }
  21. }

3,当然还需要一个业务的实际执行者

	
  1. /// <summary>
  2. /// 订单业务的实际处理者
  3. /// </summary>
  4. public class TicketHelper2
  5. {
  6. /// <summary>
  7. /// 执行复杂的订单操作(如数据库)
  8. /// </summary>
  9. /// <param name="userName">下单用户</param>
  10. /// <param name="pid">产品id</param>
  11. /// <returns></returns>
  12. public async Task PlaceOrderDataBase(string userName, int pid)
  13. {
  14. using (RuanMou2020Entities db = new RuanMou2020Entities())
  15. {
  16. var product = db.product.Where(p => p.id == pid).FirstOrDefault();
  17. if (product != null)
  18. {
  19. product.status = ;
  20. product.username = userName;
  21. await db.SaveChangesAsync();
  22. }
  23. }
  24. }
  25. }

这样我们同时访问下面三个地址,如果数据库里只有两个商品的话,会有一个请求结果为:商品已经被抢光。

http://localhost:88/Order/PlaceOrder?userName=zhangsan

http://localhost:88/Order/PlaceOrder?userName=lisi

http://localhost:88/Order/PlaceOrder?userName=wangwu

这种处理方式的优点为:执行效率快,相比第二种方式不需要第二个接口来返回查询结果。

缺点:暂时没想到,欢迎大家补充。

说明:该方式只是个人猜想,并非实际项目经验,大家只能作为参考,慎重用于项目。欢迎大家批评指正。