namespace Mono.Messaging.RabbitMQ {
+ /// <summary>
+ /// RabbitMQ Implementation of a message queue. Currrently this implementation
+ /// attempts to be as stateless as possible. Connection the AMQP server
+ /// are only created as needed.
+ /// </summary>
public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
private bool authenticate = false;
private QueueReference qRef = QueueReference.DEFAULT;
private readonly RabbitMQMessagingProvider provider;
private readonly MessageFactory helper;
- private readonly string realm;
private readonly bool transactional;
public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
QueueReference qRef,
bool transactional)
- : this (provider, "/data", qRef, transactional)
- {
- }
-
- public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
- string realm, QueueReference qRef,
- bool transactional)
{
this.provider = provider;
this.helper = new MessageFactory (provider);
- this.realm = realm;
this.qRef = qRef;
this.transactional = transactional;
}
// No-op (Queue are currently stateless)
}
- public static void Delete (string realm, QueueReference qRef)
+ public static void Delete (QueueReference qRef)
{
ConnectionFactory cf = new ConnectionFactory ();
using (IConnection cn = cf.CreateConnection (qRef.Host)) {
using (IModel model = cn.CreateModel ()) {
- ushort ticket = model.AccessRequest (realm);
- model.QueueDelete (ticket, qRef.Queue, false, false, false);
+ model.QueueDelete (qRef.Queue, false, false, false);
}
}
}
private void Send (IModel model, IMessage msg)
{
- ushort ticket = model.AccessRequest ("/data");
- string finalName = model.QueueDeclare (ticket, QRef.Queue, true);
+ string finalName = model.QueueDeclare (QRef.Queue, true);
IMessageBuilder mb = helper.WriteMessage (model, msg);
- model.BasicPublish (ticket, "", finalName,
+ model.BasicPublish ("", finalName,
(IBasicProperties) mb.GetContentHeader(),
mb.GetContentBody ());
}
using (IConnection cn = cf.CreateConnection (QRef.Host)) {
using (IModel model = cn.CreateModel ()) {
- ushort ticket = model.AccessRequest (realm);
- model.QueuePurge (ticket, QRef.Queue, false);
+ model.QueuePurge (QRef.Queue, false);
}
}
}
private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
IsMatch matcher)
{
- int to = TimeSpanToInt32 (timeout);
+ int to = MessageFactory.TimeSpanToInt32 (timeout);
return new DoReceiveWithTimeout (to, matcher).DoReceive;
}
private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
{
- int to = TimeSpanToInt32 (timeout);
+ int to = MessageFactory.TimeSpanToInt32 (timeout);
return new DoReceiveWithTimeout (to, null).DoReceive;
}
private TxReceiver.DoReceive Peeker (TimeSpan timeout)
{
- int to = TimeSpanToInt32 (timeout);
+ int to = MessageFactory.TimeSpanToInt32 (timeout);
return new DoReceiveWithTimeout (to, null, false).DoReceive;
}
private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
{
- int to = TimeSpanToInt32 (timeout);
+ int to = MessageFactory.TimeSpanToInt32 (timeout);
return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
}
private IMessage Receive (IModel model, int timeout, bool doAck)
{
- ushort ticket = model.AccessRequest (realm);
- string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
+ string finalName = model.QueueDeclare (QRef.Queue, false);
- using (Subscription sub = new Subscription (model, ticket, finalName)) {
+ using (Subscription sub = new Subscription (model, finalName)) {
BasicDeliverEventArgs result;
if (sub.Next (timeout, out result)) {
IMessage m = helper.ReadMessage (QRef, result);
private IMessage Receive (IModel model, int timeout,
bool doAck, IsMatch matcher)
{
- ushort ticket = model.AccessRequest (realm);
- string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
+ string finalName = model.QueueDeclare (QRef.Queue, false);
- using (Subscription sub = new Subscription (model, ticket, finalName)) {
+ using (Subscription sub = new Subscription (model, finalName)) {
BasicDeliverEventArgs result;
while (sub.Next (timeout, out result)) {
{
return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
}
-
- private static int TimeSpanToInt32 (TimeSpan timespan)
- {
- if (timespan == TimeSpan.MaxValue)
- return -1;
- else
- return (int) timespan.TotalMilliseconds;
- }
}
}