Articles of node amqp

在Nodejs RabbitMQ服务器上的高性能

我正在build立一个分析系统,在同一时间内有100万用户在线。 我使用消息代理等RabbitMQ来降低服务器的容量 这是我的图 我的系统包括3个组件。 发行商服务器:(生产者)这个系统build立在nodejs上。 这个系统将消息发布到queue RabbitMQ队列 :该系统存储publisher server发送的消息。 之后,打开一个连接以从subscriber server队列发送消息。 订户服务器(消费者) :该系统从queue接收消息 发布服务器源代码 var amqp = require('amqplib/callback_api'); amqp.connect("amqp://localhost", function(error, connect) { if (error) { return callback(-1, null); } else { connect.createChannel(function(error, channel) { if (error) { return callback(-3, null); } else { var q = 'logs'; var msg = data; // object // convert […]

AMQP警告:检测到可能的EventEmitter内存泄漏。 添加了11位听众。

我收到Node.js中的以下错误,我相信它与AMQP有关。 (node) warning: possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit. Trace at Connection.EventEmitter.addListener (events.js:160:15) at Connection.EventEmitter.once (events.js:179:8) at Connection.connect (/var/www/project/app/node_modules/amqp/amqp.js:1084:8) at Connection.reconnect (/var/www/project/app/node_modules/amqp/amqp.js:1049:8) at null._onTimeout (/var/www/project/app/node_modules/amqp/amqp.js:886:16) at Timer.listOnTimeout [as ontimeout] (timers.js:110:15) 任何人都可以指出问题可能是什么? 下面是我用来连接模块的代码: JackRabbit.prototype.subscribe = function subscribe(recievedCB, routingKey) { var self = this; var route = routingKey || '#'; […]

Meteor + node-amqp:无法通过SSL连接到RabbitMQ服务器

我在Meteor应用程序中使用node-amqp通过SSL连接到rabbitMQ服务器。 这是我的连接string: var rConn = AMQP.createConnection({ url: amqps://user:pass@host:pppp, vhost: '/virthost' }); 但是,没有连接。 以下是在RabbitMQ错误日志中: =INFO REPORT==== 25-Jun-2013::17:41:30 === accepting AMQP connection <0.20247.0> (xxx.xxx.xxx.xxx:pppp -> xxx.xxx.xxx.xxx:pppp) =ERROR REPORT==== 25-Jun-2013::17:41:30 === error on AMQP connection <0.20231.0>: {ssl_upgrade_error,"record overflow"} (unknown POSIX error) =INFO REPORT==== 25-Jun-2013::18:11:35 === accepting AMQP connection <0.22556.0> (xxx.xxx.xxx.xxx:pppp -> xxx.xxx.xxx.xxx:pppp) =ERROR REPORT==== 25-Jun-2013::18:11:40 === error on […]

用SailsJs订阅RabbitMQ队列

如何使用SailsJs(无水线适配器)订阅RabbitMQ队列。 只能使用amqp.node https://github.com/squaremo/amqp.node 。 没有客户要求。 从我理解的控制器是为http请求。 我想要的是连接到RabbitMQ队列并做一些事情。 但是这不应该从http请求中发生。

等到node-amqp发送消息?

如果我使用下面的代码发送消息给RabbitMQ: var amqp = require('amqp'); var connection = amqp.createConnection(); connection.on('ready', function() { connection.publish('foo', 'Hello'); process.exit(); }); 那么这个消息就不会被发送,这个过程就会退出。 如果我删除process.exit() ,那么消息被发送,但是这个过程永不退出。 如何让node-amqp在发送消息时告诉我?

node-amqp队列销毁:通知订户

我使用node-amqp和rabbit作为一些pub / sub工具。 我想知道是否有可能通知订阅者某人/某事(在其他进程中)已经销毁他们正在收听的队列。 例如,处理一: connection.queue(name = "test-queue1", options, function(queue) { queue.subscribe({}, function(message, headers, deliveryInfo) { deliverMessage(message); }).once('error', function(error) { logger.error(error) }).once("queueDeleteOk", function bindHandler() { queue.close(); }); }; 现在,其他进程可能会破坏该队列。 用户如何知道这一点? 例如,stream程二可以做到: connection.queue(name = "test-queue1", options, function(queue) { queue.destroy(delete_options); }).once("queueDeleteOk", function bindHandler() { queue.close(); }); 我试图听取“queueDeletedOk”这个事件的作用,但是它只在进程2中被发送和接收,这正在销毁队列。 尝试事件“closures”在队列上同样发生。 感谢和最好的问候,

如何从node.js检查RabbitMQ中是否存在交换?

我想从node.js检查一个特定的RabbitMQ交换是否存在。 我正在使用Mocha作为testing框架。 我写了相同的代码,但我的期望似乎是不正确的。 如果没有交换,我希望交换variables具有未定义的值,但事实并非如此。 我正在使用amqp模块与RabbitMQ进行交互。 以下是代码: var should = require('should'); var amqp = require('amqp'); //Configuration var amqpConnectionDetails = { 'host':'localhost', 'port':5672, 'login':'guest', 'password':'guest' }; describe('AMQP Objects', function(){ describe('Exchanges', function(){ it('There should exist an exchange', function(done){ var amqpConnection = amqp.createConnection(amqpConnectionDetails); amqpConnection.on('ready', function(){ var exchange = amqpConnection.exchange('some_exchange', {'passive':true, 'noDeclare':true}); exchange.should.not.be.equal(undefined); exchange.should.not.be.equal(null); done(); }); }); }); }); 什么是检查交换存在的正确方法? […]

为什么node-amqp出现问题时会“closures”事件?

(我正在使用node-amqp和rabbitmq服务器。) 我想猜测为什么我有一个closures事件,如果出现错误。 例如,如果我尝试打开连接到队列(具有错误的参数),我收到一个错误事件。 这是完美的。 但是,在发生任何错误之后,我将收到一个closures的连接(在这种情况下,可能是因为closures了失败的套接字到队列)。 之后,自动重新连接,我收到(初始)准备好的事件。 问题: connection.on('ready', function() { do_a_lot_of_things }).on(error, function(error){ solve_the_problem }); 如果出现错误,我收到错误,但是“准备就绪”事件,它将重新do_a_lot_of_things。 我的方法错了吗? 最好的祝福

使用amqplib控制Node.JS,控制RabbitMQ使用者的使用率

我的应用程序使用RabbitMQ队列来存储消息,然后我有一个工作人员消费这些消息,并将其插入到数据库中。 目的不是强调工作负载高峰期的数据库。 我遇到的问题是,在那些高峰期,队列的发布率真的很高,工作人员每秒钟开始接收的消息比它能处理的时间多,直到它崩溃。 有什么办法来控制消费率,所以我可以确保工人收不到消息比消耗更快? 信息并不重要,所以我不介意他们有多less时间入队,直到工作人员能够处理。 我使用的Node.JS amqplib,这是我使用的工人代码: open.then(function(conn) { var ok = conn.createChannel(); ok = ok.then(function(ch) { ch.assertQueue(q); ch.consume(q, function(msg) { if (msg !== null) { message = JSON.parse(msg.content.toString()); processMessage(message); } }, {noAck: true}); }); return ok; }).then(null, console.warn);

如何发送一个消息给所有的订阅者,除了发布者也是同一个rabbitMQ队列上的一个侦听器

我有一个由nodeJS服务器安装的rabbitMQ。 我使用rabbit.js库与兔子进行交互,到目前为止,我对此感到满意。 我在扇出模式的同一个队列中有多个用户,每个节点都是一个订阅者,也是一个发布者,这对我很有好处,并且工作正常,因为在很多情况下,我想通知所有服务器一些发生的更新在这些节点之一(这也是出版商…) 我偶然发现一个情况,我需要发送队列中的所有侦听器的消息,除了发送它的人(这也是同一队列中的侦听器)。 我不知道谁在监听(可能有一个,可能有几百万),所以我不能通过一些白名单的路由规则将它路由到一些特定的节点。 它必须是某种排除通配符路由规则(一些黑名单),例如,发送这个消息给每个人听谁不对应我自己的唯一ID … 可以使用rabbit.js来完成吗? 它可以甚至在rabbitmq以某种方式完成? 我不太了解那只兔子,所以要对我温柔:) 顺便说一句,如果你知道如何使用rabbit.js,甚至更好… 编辑:: 根据Derick Bailey的要求,这是我需要这个的原因 我有一个系统,其中有许多负载平衡的nodeJS服务器作为web服务运行。 他们是完全透明的。 他们都不知道哪些其他节点存在。 我想保持这种方式,因为这种分离使我更容易通过添加和删除其他“并行”节点来更好地扩展。 这些节点中的每一个都有自己的内存本地caching服务。 我偶然发现了一个单个节点更新某个实体的情况。 现在我需要使这个节点可以通知所有其他并行节点(可能在caching中具有相同的实体)使其无效。 问题在于发送消息的节点(更新节点)也会收到消息,因为他也是一个监听者。 所以我希望他以某种方式排除自己的特定消息的接收者列表…因此需要一些路由黑名单。 (他知道自己,所以除了自己的身份证之外,我可以让他通往所有人……但即使有人确实在听另一端的话,他也不知道……所以它不能成为白名单) 希望我的需要现在更清楚。 我已经想到了解决我的问题,但它需要额外的发展在我身边,我想通过使用兔子的当前能力(如果可能的话),以避免它我可以只添加一个唯一的ID的内容的消息。 那么发送节点可以认识到这个消息来自他并忽略这个消息。 但正如你可以明白,这可能会变得棘手,因为我需要考虑更多的陷阱和其他边缘情况下可能会失败… 如果有人能告诉我如何使用一些兔子现有的configuration,我会很高兴听到如何:)