Articles of node amqp

RxJS具有asynchronous订户function的Observable

我试图做一些感觉应该是直截了当的事情,但certificate令人惊讶的困难。 我有一个订阅RabbitMQ队列的函数。 具体来说,这里是Channel.consume函数: http ://www.squaremobius.net/amqp.node/channel_api.html#channel_consume 它返回一个承诺,这个承诺用一个订阅ID来解决 – 稍后需要取消订阅 – 并且还有一个callback参数来在邮件从队列中取出时调用。 当我想取消订阅队列时,我需要在这里使用Channel.cancel函数取消使用者: http ://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel。 这需要先前返回的订阅ID。 我想把所有这些东西包装在观察者订阅时订阅队列的Observable中,并且当observable退订时取消订阅。 然而,由于调用的“双重asynchronous”性质(我的意思是说,它们同时具有callback和返回承诺),这样做有些困难。 理想情况下,我想写的代码是: return new Rx.Observable(async (subscriber) => { var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message)); return async () => { await channel.cancel(consumeResult.consumerTag); }; }); 然而,这是不可能的,因为这个构造函数不支持asynchronous订阅函数或者拆卸逻辑。 我一直无法找出这一个。 我在这里错过了什么? 为什么这么难? 干杯,Alex

处理node-amqp(node.js)中的独占队列订阅时出错

当您订阅独占队列(一次只允许一个消费者)时,node-amqp会在队列超额订阅(已经有消费者)时抛出exception。 我试过使用.on(“error”,cb)语法。 我试过错误域(节点0.10.0) 尝试/抓住显然没有工作 这是我的订阅线,但没什么特别的: queue.subscribe({ack: true, prefetchCount: 1, exclusive: exclusive}, cbExecute).addCallback((ok) -> listeners[type] = ok.consumerTag); 当使用队列时,会得到未处理的exception: ACCESS_REFUSED – queue 'respQ' in vhost 'brkoacph' in exclusive use 看看node-amqp里面的内容,我发现它们在模块内部实现了一个独立的任务队列,所以当发生错误时,任务在独立的上下文中运行。 有没有解决方法? …或者我只是做错了什么?

当node-amqp没有exception

我正在尝试使用node-amqp。 当在连接到兔子的部分抛出一个exception,我可以得到这个exception,但它永远重新启动连接到兔子。 看那个: amqp = require("amqp") # Open a connection conn = amqp.createConnection( {url: "amqp://localhost"} , {reconnect: true}) conn.on "ready", -> console.log "Conn Ready" conn.queue "queueX", {ack:true}, (queue) -> console.log "Subscribed #{queue.name}" assdsd() #calling non-exiting method. No exception is thrown but the connection is restarted 系统循环引发错误。 我知道这是因为{recconnect:true}。 但我希望能够自行处理例外情况。 任何想法? 我的脚本的输出是这样的: Conn Ready Subscribed queueX Conn […]

在Nodejs RabbitMQ服务器上的高性能

我正在build立一个分析系统,在同一时间内有100万用户在线。 我使用消息代理等RabbitMQ来降低服务器的容量 这是我的图 我的系统包括3个组件。 发行商服务器:(生产者)这个系统build立在nodejs上。 这个系统将消息发布到queue RabbitMQ队列 :该系统存储publisher server发送的消息。 之后,打开一个连接以从subscriber server队列发送消息。 订户服务器(消费者) :该系统从queue接收消息 发布服务器源代码 var amqp = require('amqplib/callback_api'); amqp.connect("amqp://localhost", function(error, connect) { if (error) { return callback(-1, null); } else { connect.createChannel(function(error, channel) { if (error) { return callback(-3, null); } else { var q = 'logs'; var msg = data; // object // convert […]

AMQP警告:检测到可能的EventEmitter内存泄漏。 添加了11位听众。

我收到Node.js中的以下错误,我相信它与AMQP有关。 (node) warning: possible EventEmitter memory leak detected. 11 listeners added. Use emitter.setMaxListeners() to increase limit. Trace at Connection.EventEmitter.addListener (events.js:160:15) at Connection.EventEmitter.once (events.js:179:8) at Connection.connect (/var/www/project/app/node_modules/amqp/amqp.js:1084:8) at Connection.reconnect (/var/www/project/app/node_modules/amqp/amqp.js:1049:8) at null._onTimeout (/var/www/project/app/node_modules/amqp/amqp.js:886:16) at Timer.listOnTimeout [as ontimeout] (timers.js:110:15) 任何人都可以指出问题可能是什么? 下面是我用来连接模块的代码: JackRabbit.prototype.subscribe = function subscribe(recievedCB, routingKey) { var self = this; var route = routingKey || '#'; […]

Meteor + node-amqp:无法通过SSL连接到RabbitMQ服务器

我在Meteor应用程序中使用node-amqp通过SSL连接到rabbitMQ服务器。 这是我的连接string: var rConn = AMQP.createConnection({ url: amqps://user:pass@host:pppp, vhost: '/virthost' }); 但是,没有连接。 以下是在RabbitMQ错误日志中: =INFO REPORT==== 25-Jun-2013::17:41:30 === accepting AMQP connection <0.20247.0> (xxx.xxx.xxx.xxx:pppp -> xxx.xxx.xxx.xxx:pppp) =ERROR REPORT==== 25-Jun-2013::17:41:30 === error on AMQP connection <0.20231.0>: {ssl_upgrade_error,"record overflow"} (unknown POSIX error) =INFO REPORT==== 25-Jun-2013::18:11:35 === accepting AMQP connection <0.22556.0> (xxx.xxx.xxx.xxx:pppp -> xxx.xxx.xxx.xxx:pppp) =ERROR REPORT==== 25-Jun-2013::18:11:40 === error on […]

用SailsJs订阅RabbitMQ队列

如何使用SailsJs(无水线适配器)订阅RabbitMQ队列。 只能使用amqp.node https://github.com/squaremo/amqp.node 。 没有客户要求。 从我理解的控制器是为http请求。 我想要的是连接到RabbitMQ队列并做一些事情。 但是这不应该从http请求中发生。

等到node-amqp发送消息?

如果我使用下面的代码发送消息给RabbitMQ: var amqp = require('amqp'); var connection = amqp.createConnection(); connection.on('ready', function() { connection.publish('foo', 'Hello'); process.exit(); }); 那么这个消息就不会被发送,这个过程就会退出。 如果我删除process.exit() ,那么消息被发送,但是这个过程永不退出。 如何让node-amqp在发送消息时告诉我?

node-amqp队列销毁:通知订户

我使用node-amqp和rabbit作为一些pub / sub工具。 我想知道是否有可能通知订阅者某人/某事(在其他进程中)已经销毁他们正在收听的队列。 例如,处理一: connection.queue(name = "test-queue1", options, function(queue) { queue.subscribe({}, function(message, headers, deliveryInfo) { deliverMessage(message); }).once('error', function(error) { logger.error(error) }).once("queueDeleteOk", function bindHandler() { queue.close(); }); }; 现在,其他进程可能会破坏该队列。 用户如何知道这一点? 例如,stream程二可以做到: connection.queue(name = "test-queue1", options, function(queue) { queue.destroy(delete_options); }).once("queueDeleteOk", function bindHandler() { queue.close(); }); 我试图听取“queueDeletedOk”这个事件的作用,但是它只在进程2中被发送和接收,这正在销毁队列。 尝试事件“closures”在队列上同样发生。 感谢和最好的问候,

如何从node.js检查RabbitMQ中是否存在交换?

我想从node.js检查一个特定的RabbitMQ交换是否存在。 我正在使用Mocha作为testing框架。 我写了相同的代码,但我的期望似乎是不正确的。 如果没有交换,我希望交换variables具有未定义的值,但事实并非如此。 我正在使用amqp模块与RabbitMQ进行交互。 以下是代码: var should = require('should'); var amqp = require('amqp'); //Configuration var amqpConnectionDetails = { 'host':'localhost', 'port':5672, 'login':'guest', 'password':'guest' }; describe('AMQP Objects', function(){ describe('Exchanges', function(){ it('There should exist an exchange', function(done){ var amqpConnection = amqp.createConnection(amqpConnectionDetails); amqpConnection.on('ready', function(){ var exchange = amqpConnection.exchange('some_exchange', {'passive':true, 'noDeclare':true}); exchange.should.not.be.equal(undefined); exchange.should.not.be.equal(null); done(); }); }); }); }); 什么是检查交换存在的正确方法? […]