2009-07-11 Michael Barker <mike@middlesoft.co.uk>
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / RabbitMQMessageQueue.cs
index 320ad9747a2b946ebf92fe2864386071d0ba4bd0..03069f252ad13439387c841a38f1d57c6566ad6a 100644 (file)
@@ -43,6 +43,11 @@ using RabbitMQ.Util;
 
 namespace Mono.Messaging.RabbitMQ {
 
+       /// <summary>
+       /// RabbitMQ Implementation of a message queue.  Currrently this implementation
+       /// attempts to be as stateless as possible.  Connection the AMQP server
+       /// are only created as needed.
+       /// </summary>
        public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
                
                private bool authenticate = false;
@@ -57,7 +62,6 @@ namespace Mono.Messaging.RabbitMQ {
                private QueueReference qRef = QueueReference.DEFAULT;
                private readonly RabbitMQMessagingProvider provider;
                private readonly MessageFactory helper;
-               private readonly string realm;
                private readonly bool transactional;
                
                public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
@@ -69,17 +73,9 @@ namespace Mono.Messaging.RabbitMQ {
                public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
                                             QueueReference qRef, 
                                             bool transactional)
-                       : this (provider, "/data", qRef, transactional)
-               {
-               }
-               
-               public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
-                                            string realm, QueueReference qRef,
-                                            bool transactional)
                {
                        this.provider = provider;
                        this.helper = new MessageFactory (provider);
-                       this.realm = realm;
                        this.qRef = qRef;
                        this.transactional = transactional;
                }
@@ -198,14 +194,13 @@ namespace Mono.Messaging.RabbitMQ {
                        // No-op (Queue are currently stateless)
                }
                
-               public static void Delete (string realm, QueueReference qRef)
+               public static void Delete (QueueReference qRef)
                {
                        ConnectionFactory cf = new ConnectionFactory ();
                        
                        using (IConnection cn = cf.CreateConnection (qRef.Host)) {
                                using (IModel model = cn.CreateModel ()) {
-                                       ushort ticket = model.AccessRequest (realm);
-                                       model.QueueDelete (ticket, qRef.Queue, false, false, false);
+                                       model.QueueDelete (qRef.Queue, false, false, false);
                                }
                        }
                }                       
@@ -293,11 +288,10 @@ namespace Mono.Messaging.RabbitMQ {
                
                private void Send (IModel model, IMessage msg)
                {
-                       ushort ticket = model.AccessRequest ("/data");
-                       string finalName = model.QueueDeclare (ticket, QRef.Queue, true);
+                       string finalName = model.QueueDeclare (QRef.Queue, true);
                        IMessageBuilder mb = helper.WriteMessage (model, msg);
 
-                       model.BasicPublish (ticket, "", finalName,
+                       model.BasicPublish ("", finalName,
                                            (IBasicProperties) mb.GetContentHeader(),
                                            mb.GetContentBody ());
                }
@@ -308,8 +302,7 @@ namespace Mono.Messaging.RabbitMQ {
 
                        using (IConnection cn = cf.CreateConnection (QRef.Host)) {
                                using (IModel model = cn.CreateModel ()) {
-                                       ushort ticket = model.AccessRequest (realm);
-                                       model.QueuePurge (ticket, QRef.Queue, false);
+                                       model.QueuePurge (QRef.Queue, false);
                                }
                        }
                }
@@ -588,7 +581,7 @@ namespace Mono.Messaging.RabbitMQ {
                private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
                                                              IsMatch matcher)
                {
-                       int to = TimeSpanToInt32 (timeout);
+                       int to = MessageFactory.TimeSpanToInt32 (timeout);
                        return new DoReceiveWithTimeout (to, matcher).DoReceive;
                }
                
@@ -599,7 +592,7 @@ namespace Mono.Messaging.RabbitMQ {
                
                private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
                {
-                       int to = TimeSpanToInt32 (timeout);
+                       int to = MessageFactory.TimeSpanToInt32 (timeout);
                        return new DoReceiveWithTimeout (to, null).DoReceive;
                }
 
@@ -615,7 +608,7 @@ namespace Mono.Messaging.RabbitMQ {
                
                private TxReceiver.DoReceive Peeker (TimeSpan timeout)
                {
-                       int to = TimeSpanToInt32 (timeout);
+                       int to = MessageFactory.TimeSpanToInt32 (timeout);
                        return new DoReceiveWithTimeout (to, null, false).DoReceive;
                }               
                
@@ -626,7 +619,7 @@ namespace Mono.Messaging.RabbitMQ {
                
                private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
                {
-                       int to = TimeSpanToInt32 (timeout);
+                       int to = MessageFactory.TimeSpanToInt32 (timeout);
                        return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
                }
                
@@ -672,10 +665,9 @@ namespace Mono.Messaging.RabbitMQ {
                
                private IMessage Receive (IModel model, int timeout, bool doAck)
                {
-                       ushort ticket = model.AccessRequest (realm);
-                       string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
+                       string finalName = model.QueueDeclare (QRef.Queue, false);
                        
-                       using (Subscription sub = new Subscription (model, ticket, finalName)) {
+                       using (Subscription sub = new Subscription (model, finalName)) {
                                BasicDeliverEventArgs result;
                                if (sub.Next (timeout, out result)) {
                                        IMessage m = helper.ReadMessage (QRef, result);
@@ -691,10 +683,9 @@ namespace Mono.Messaging.RabbitMQ {
                private IMessage Receive (IModel model, int timeout, 
                                          bool doAck, IsMatch matcher)
                {
-                       ushort ticket = model.AccessRequest (realm);
-                       string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
+                       string finalName = model.QueueDeclare (QRef.Queue, false);
                        
-                       using (Subscription sub = new Subscription (model, ticket, finalName)) {
+                       using (Subscription sub = new Subscription (model, finalName)) {
                                BasicDeliverEventArgs result;
                                while (sub.Next (timeout, out result)) {
                                        
@@ -714,13 +705,5 @@ namespace Mono.Messaging.RabbitMQ {
                {
                        return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
                }               
-               
-               private static int TimeSpanToInt32 (TimeSpan timespan)
-               {
-                       if (timespan == TimeSpan.MaxValue)
-                               return -1;
-                       else
-                               return (int) timespan.TotalMilliseconds;
-               }
        }
 }