Node-amqp – X尝试后拒绝消息

我如何实现几个可configuration的requeue尝试后拒绝消息的机制?

换句话说,如果我正在订阅一个队列,我希望保证相同的消息不会再多出现X次。

我的代码示例:

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) { try{ doSomething(data); } catch(e) { message.reject(true); } } 

       

网上收集的解决方案 "Node-amqp – X尝试后拒绝消息"

在我看来,最好的解决scheme是在应用程序中处理这些错误,并在应用程序决定不能处理消息时拒绝它们。

如果您不想丢失信息,应用程序只有在将相同的消息发送到错误队列后才会拒绝该消息。

代码未经testing:

 q.subscribe({ack: true}, function () { var numOfRetries = 0; var args = arguments; var self = this; var promise = doWork.apply(self, args); for (var numOfRetries = 0; numOfRetries < MAX_RETRIES; numOfRetries++) { promise = promise.fail(function () { return doWork.apply(self, args); }); } promise.fail(function () { sendMessageToErrorQueue.apply(self, args); rejectMessage.apply(self, args); }) }) 

一个可能的解决scheme是使用您定义的某种散列函数对消息进行散列,然后检查该散列的caching对象。 如果存在,请将caching中的内容添加到可configuration的最大值,如果不存在,请将其设置为1.下面是一个快速且脏的原型(请注意, mcache对象应在所有订户的范围内):

 var mcache = {}, maxRetries = 3; q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) { var messagehash = hash(message); if(mcache[messagehash] === undefined){ mcache[messagehash] = 0; } if(mcache[messagehash] > maxRetries) { q.shift(true,false); //reject true, requeue false (discard message) delete mcache[messagehash]; //don't leak memory } else { try{ doSomething(data); q.shift(false); //reject false delete mcache[messagehash]; //don't leak memory } catch(e) { mcache[messagehash]++; q.shift(true,true); //reject true, requeue true } } } 

如果消息具有GUID,则可以简单地在散列函数中返回该消息。