Articles of amqp

RabbitMQ:`rabbitmqctl list_exchanges`下的'direct'标题是什么意思?

如果我使用php的amqp类创buildexchangem交换types设置为direct和唯一的选项指定是durable ,然后尝试连接到node.js交换,我得到一个Cannot redeclare exchange with different options error你看到如果你的select不匹配。 但是,据我所知,我正在使用完全相同的选项(键入直接和选项持久)。 所以我做了一个testing,并在php中创build了一个名为php_exchange交换和一个名为node_exchange node.js,并运行rabbitmqctl list_exchanges并获得了一些有趣的输出: Listing exchanges … node_exchange direct amq.headers headers amq.direct direct amq.match headers amq.fanout fanout direct php_exchange direct amq.topic topic …done. direct标题在这种情况下意味着什么(显然它不是直接types的所有交换,因为它将node_exchange列为直接types)? 我怎样才能得到node.js创build一个相同的amqp交换(以便它不会失败时,连接到一个持久的交stream)? 我正在使用PECL php amqp库和node-amqp库(https://github.com/ry/node-amqp) 谢谢。 编辑: 以下是rabbitmqctl list_exchanges name type auto_delete internal arguments的输出(为简洁起见,删除了其他交换): Listing exchanges … node_exchange direct false false [] php_exchange direct […]

在Node.js中使用node-amqp模块进行手动确认

我正在使用postwait模块node- amqp。 我能够发布/订阅队列的一些消息,但我想手动确认消息。 例如,我希望消息被读取并且不被确认,然后对这个消息执行一些计算,然后发送一个q.shift()来确认消息并且通过第二个消息。 我想要确认手动使用。 其实我有一个允许订阅国旗{ack: false}的函数,那么我有一个函数来手动创build一个q.shift() (如文档中所述)。 但是,当我发布2条消息,然后我读了2条消息,没有确认,我看到了2条消息。 但是我希望子文件重复最初收到的同样的信息,直到我不承认它。 我怎样才能使用这些方法?

RabbitMQ(和node.js)中的asynchronous确认

我有一个RabbitMQ的具体使用案例,我想澄清一些事情,并要求build议。 考虑这种情况: 1 – 我发布两个消息,即要执行的任务:messageA然后messageB 2-我的消费者得到messageA,执行包含在这个消息中的任务,但是当任务正在运行时,服务器崩溃 我的问题是:当服务器重新启动时,messageA将被重新sorting,并且会在messageB(与崩溃之前的顺序相同)之前重新sorting? 据我所知,messageA会丢失,如果服务器崩溃,因为我的消费者默认情况下收到消息后确认。 所以我的想法是单独使用和确认消息:先消耗,运行任务,然后在任务成功执行后确认消息。 你觉得这个方法有问题吗? 你会build议我做其他事吗?

nodejs + amqp默默死亡

我有基本脚本采取amqp消息,并转发给socket.io它工作几个小时,有时几天,但最终它静静地停止转发amqp消息,同时继续发出心跳。 日志显示,当它到达中断状态时,它不会尝试发送socket.io消息,因为amqp消息到达,所以我认为问题是与amqp连接/订阅,而不是socket.io 我testing了脚本运行时手动closures并重新启动rabbitmq服务器,似乎没有问题重新连接,并继续按照它应该的function。 var port = 8888, http = require("http"), amqp = require("amqp"), socketio = require("socket.io"), express = require("express"), rabbitMq = amqp.createConnection({ host: 'server', reconnect: true }), app = express(), server = http.createServer(app), io = socketio.listen(server); rabbitMq.on("ready", function () { console.log("ready"); var queue = rabbitMq.queue("ft-secondary-display", function () { queue.bind(rabbitMq.exchange('ft', {type: 'topic'}), "data"); console.log("bound"); queue.subscribe(function (message, […]

ETIMEDOUT与node.js&amqp的问题

我有两个生产者和两个消费者在我的项目中使用rabbitmq通过node.js中的amqp模块。 为消费者build立连接的代码如下所示: function init_consumers( ) { console.log( 'mq: consumers connection established. Starting to init stuff..' ); global.queue.consumers.connection = con_c; var fq_name = global.queue.fq_name; var aq_name = global.queue.aq_name; var q_opts = { durable:true }; var subscr_opt = { ack: true, prefetchCount: 1 }; var fq = con_c.queue( fq_name, q_opts, function() { console.log( 'mq: consumer f queue […]

RabbitMQ – 如何检测消费者队列已被破坏?

所有我正在开发的应用程序中,客户端正在与rabbitmq服务器创build队列,还有另一个服务器,将pipe理所有的计算服务器。 我面临着在服务器端检测队列销毁的问题。 例如:从android客户端创build名称为“a”的队列,现在用户将要杀死该应用程序,该时间队列将被销毁。 但服务器仍处于该队列的未知状态。 所以我想要听任何事件或任何方法来调用侦测队列已从客户端摧毁。 谢谢

如何构build一个消息总线与Node.js和API分布在几个听众?

这是我想要发生的事情: 我的用户会打到一个端点,比如/api/findFile/app.js 。 express正在监听这个调用(标准的RESTtypes的东西),并会联系几个工作人员(可以是任意数量)要求他们执行工作,具体查找文件。 当第一个这样做的时候,它应该回应express的function,以便将结果发回给用户。 我想像某种消息总线/ AMQP设置将需要。 我认为快递function可以向工作人员publish请求,每个人都subscribe这个事件: bus.publish('findFile', {fileName: 'app.js'}}; 沿着这些线的东西。 工人有: bus.subscribe('findFile', function(event) {…. 每个工人将检查它是否有硬盘上的文件。 该文件可以在MULTIPLE工作盘上。 所以当第一个find它的时候,我想以某种方式中止别人回来expression。 为了回到expression,我想我们会做一个反向的pub/sub订阅,这次是明确地听取回应? 任何想法如何正确地devise这个? 特别是与比赛条件。

RabbitMQ在ACK之后顺序接收消息

我正在寻找获得以下模式,以“顺序”与RabbitMQ接收消息。 我使用Node.js框架作为消费者。 比方说队列中有3条消息,我需要这种模式: 接收消息#1 – >进程 – >发送ACK – >接收消息#2 – >进程 – >发送ACK接收消息#3 – >进程 – >发送ACK 但是发生了什么事是node.js消费者并行地接收了所有3条消息,并且RabbitMQ等待个别ACK返回以从队列中移除消息。 这不是顺序的。 我的队列是耐用的,需要ACK。 有什么我失踪? 请帮忙!

RabbitMQ:是否有可能删除队列时,他们是空的?

理想情况下,我想在RabbitMQ变空时删除队列。 基本上,我想要一个队列包含积压的消息,然后当发生什么事情时,这些消息将被发送,直到队列为空。 一旦队列为空,我想删除这个。 这可能吗? 我正在使用节点。

重用callback/重用通道的参数

我想在不同的节点模块中重用一个RabbitMQ通道。 由于通道是asynchronous创build的,我不确定最好的方法是将这个通道对象“注入”到其他模块中。 如果可能的话,我想避免像DI容器这样的外部依赖。 下面,你会发现我的简化代码。 提前感谢您的任何build议。 web.js require('./rabbitmq')(function (err, conn) { … // Start web server var http = require('./http'); var serverInstance = http.listen(process.env.PORT || 8000, function () { var host = serverInstance.address().address; var port = serverInstance.address().port; }); }); rabbitmq.js : module.exports = function (done) { … amqp.connect(rabbitMQUri, function (err, conn) { … conn.createChannel(function(err, ch) { […]