隐藏

nodejs队列实现amqplib,rabbitmq

发布:2023/11/2 20:24:29作者:管理员 来源:本站 浏览次数:570

最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

var amqp = require('amqplib/callback_api');

var config=require('../config/config');

var log=require('../util/loghelp');

function fail(err, conn) {

   log.error(err);

   if (conn) conn.close();

}

exports.StartConsumer=function (action,qname) {

   function on_connect(err, conn) {

       if (err !== null) return fail(err);

       function on_channel_open(err, ch) {

           ch.assertQueue(qname, {durable: true}, function(err, ok) {

               if (err !== null) return bail(err, conn);

               ch.consume(qname, function(msg) {


                   log.info(`Received ${msg.content.toString()},start process`);

                   action(JSON.parse(msg.content))

                       .then(d=> {

                           log.info("mq 处理成功,确认");ch.ack(msg)

                       }

                          )

                       .catch(err=>

                       ch.nack(msg));

               }, {noAck: false} );

           });

       }

       conn.createChannel(on_channel_open);

   }

   amqp.connect(config.amqp.url,on_connect);

};


exports.enqueue=function (data,qname) {

   function on_connect(err, conn) {

       if (err !== null) return bail(err);


       function on_channel_open(err, ch) {

           if (err !== null) return bail(err, conn);

           ch.assertQueue(qname, {durable: true}, function(err, ok) {

               if (err !== null) return bail(err, conn);

               var msg=JSON.stringify(data);

               ch.sendToQueue(qname, new Buffer(msg));

               log.info(`mq send ${msg}`);

               ch.close(function() { conn.close(); });

           });

       }

       conn.createChannel(on_channel_open);

   }

   amqp.connect(config.amqp.url,on_connect);

};



其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。


  而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。