2009-07-11 Michael Barker <mike@middlesoft.co.uk>
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / RabbitMQMessageQueue.cs
index eacc97675d2584647871fb65c5c31390470c5246..03069f252ad13439387c841a38f1d57c6566ad6a 100644 (file)
@@ -43,7 +43,12 @@ using RabbitMQ.Util;
 
 namespace Mono.Messaging.RabbitMQ {
 
-       public class RabbitMQMessageQueue : IMessageQueue {
+       /// <summary>
+       /// RabbitMQ Implementation of a message queue.  Currrently this implementation
+       /// attempts to be as stateless as possible.  Connection the AMQP server
+       /// are only created as needed.
+       /// </summary>
+       public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
                
                private bool authenticate = false;
                private short basePriority = 0;
@@ -55,14 +60,28 @@ namespace Mono.Messaging.RabbitMQ {
                private ISynchronizeInvoke synchronizingObject = null;
                private bool useJournalQueue = false;
                private QueueReference qRef = QueueReference.DEFAULT;
+               private readonly RabbitMQMessagingProvider provider;
+               private readonly MessageFactory helper;
+               private readonly bool transactional;
                
-               public RabbitMQMessageQueue ()
+               public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
+                                            bool transactional)
+                       : this (provider, QueueReference.DEFAULT, transactional)
                {
                }
                
-               public RabbitMQMessageQueue (QueueReference qRef)
+               public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
+                                            QueueReference qRef, 
+                                            bool transactional)
                {
+                       this.provider = provider;
+                       this.helper = new MessageFactory (provider);
                        this.qRef = qRef;
+                       this.transactional = transactional;
+               }
+               
+               protected override IMessageQueue Queue {
+                       get { return this; }
                }
 
                public bool Authenticate {
@@ -130,7 +149,7 @@ namespace Mono.Messaging.RabbitMQ {
                }
                
                public bool Transactional {
-                       get { throw new NotImplementedException (); }
+                       get { return transactional; }
                }
                
                public bool UseJournalQueue {
@@ -155,23 +174,42 @@ namespace Mono.Messaging.RabbitMQ {
                        return version;
                }
                
-               private void SetDeliveryInfo (IMessage msg, IConnection cn)
+               private void SetDeliveryInfo (IMessage msg, long senderVersion,
+                                             string transactionId)
                {
-                       long senderVersion = GetVersion (cn);
                        msg.SetDeliveryInfo (Acknowledgment.None,
                                             DateTime.MinValue,
                                             this,
-                                            Guid.NewGuid ().ToString (),
+                                            Guid.NewGuid ().ToString () + "\\0",
                                             MessageType.Normal,
                                             new byte[0],
                                             senderVersion,
                                             DateTime.UtcNow,
                                             null,
-                                            null);
+                                            transactionId);
                }
                
+               public void Close ()
+               {
+                       // No-op (Queue are currently stateless)
+               }
+               
+               public static void Delete (QueueReference qRef)
+               {
+                       ConnectionFactory cf = new ConnectionFactory ();
+                       
+                       using (IConnection cn = cf.CreateConnection (qRef.Host)) {
+                               using (IModel model = cn.CreateModel ()) {
+                                       model.QueueDelete (qRef.Queue, false, false, false);
+                               }
+                       }
+               }                       
+               
                public void Send (IMessage msg)
                {
+                       if (QRef == QueueReference.DEFAULT)
+                               throw new MonoMessagingException ("Path has not been specified");
+                       
                        if (msg.BodyStream == null)
                                throw new ArgumentException ("Message is not serialized properly");
                
@@ -179,17 +217,9 @@ namespace Mono.Messaging.RabbitMQ {
                        
                        try {
                                using (IConnection cn = cf.CreateConnection (QRef.Host)) {
+                                       SetDeliveryInfo (msg, GetVersion (cn), null);
                                        using (IModel ch = cn.CreateModel ()) {
-                                               ushort ticket = ch.AccessRequest ("/data");
-                                               string finalName = ch.QueueDeclare (ticket, QRef.Queue, false);
-                                               SetDeliveryInfo (msg, cn);
-                                               IMessageBuilder mb = MessageFactory.WriteMessage (ch, msg);
-                                               Console.WriteLine("Body.Length In {0}", mb.GetContentBody ().Length);
-                                                                                               
-                                               ch.BasicPublish (ticket, "",
-                                                                finalName,
-                                                                (IBasicProperties) mb.GetContentHeader(),
-                                                                mb.GetContentBody ());
+                                               Send (ch, msg);
                                        }
                                }
                        } catch (BrokerUnreachableException e) {
@@ -197,32 +227,483 @@ namespace Mono.Messaging.RabbitMQ {
                        }
                }
                
-               public IMessage Receive ()
+               public void Send (IMessage msg, IMessageQueueTransaction transaction)
+               {
+                       if (QRef == QueueReference.DEFAULT)
+                               throw new MonoMessagingException ("Path has not been specified");
+                       
+                       if (msg.BodyStream == null)
+                               throw new ArgumentException ("Message is not serialized properly");
+                       
+                       RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
+                       
+                       tx.RunSend (SendInContext, msg);
+               }
+               
+               public void Send (IMessage msg, MessageQueueTransactionType transactionType)
+               {
+                       switch (transactionType) {
+                       case MessageQueueTransactionType.Single:
+                               using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
+                                       try {
+                                               Send (msg, tx);
+                                               tx.Commit ();
+                                       } catch (Exception e) {
+                                               tx.Abort ();
+                                               throw new MonoMessagingException(e.Message, e);
+                                       }
+                               }
+                               break;
+
+                       case MessageQueueTransactionType.None:
+                               Send (msg);
+                               break;
+
+                       case MessageQueueTransactionType.Automatic:
+                               throw new NotSupportedException("Automatic transaction types not supported");
+                       }
+               }
+               
+               private void SendInContext (ref string host, ref IConnection cn, 
+                                           ref IModel model, IMessage msg, string txId)
+               {
+                       if (host == null)
+                               host = QRef.Host;
+                       else if (host != QRef.Host)
+                               throw new MonoMessagingException ("Transactions can not span multiple hosts");
+                       
+                       if (cn == null) {
+                               ConnectionFactory cf = new ConnectionFactory ();
+                               cn = cf.CreateConnection (host);
+                       }
+                       
+                       if (model == null) {
+                               model = cn.CreateModel ();
+                               model.TxSelect ();
+                       }
+                       
+                       SetDeliveryInfo (msg, GetVersion (cn), txId);
+                       Send (model, msg);
+               }
+               
+               private void Send (IModel model, IMessage msg)
+               {
+                       string finalName = model.QueueDeclare (QRef.Queue, true);
+                       IMessageBuilder mb = helper.WriteMessage (model, msg);
+
+                       model.BasicPublish ("", finalName,
+                                           (IBasicProperties) mb.GetContentHeader(),
+                                           mb.GetContentBody ());
+               }
+               
+               public void Purge ()
+               {
+                       ConnectionFactory cf = new ConnectionFactory ();
+
+                       using (IConnection cn = cf.CreateConnection (QRef.Host)) {
+                               using (IModel model = cn.CreateModel ()) {
+                                       model.QueuePurge (QRef.Queue, false);
+                               }
+                       }
+               }
+               
+               public IMessage Peek ()
                {
                        ConnectionFactory cf = new ConnectionFactory ();
 
                        using (IConnection cn = cf.CreateConnection (QRef.Host)) {
                                using (IModel ch = cn.CreateModel ()) {
-                                       ushort ticket = ch.AccessRequest ("/data");
-                                       string finalName = ch.QueueDeclare (ticket, QRef.Queue, false);
+                                       return Receive (ch, -1, false);
+                               }
+                       }
+               }
+               
+               public IMessage Peek (TimeSpan timeout)
+               {
+                       return Run (Peeker (timeout));
+               }
+               
+               public IMessage PeekById (string id)
+               {
+                       return Run (Peeker (ById (id)));
+               }
+
+               public IMessage PeekById (string id, TimeSpan timeout)
+               {
+                       return Run (Peeker (timeout, ById (id)));
+               }
+               
+               public IMessage PeekByCorrelationId (string id)
+               {
+                       return Run (Peeker (ByCorrelationId (id)));
+               }
+
+               public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
+               {
+                       return Run (Peeker (timeout, ByCorrelationId (id)));
+               }
+               
+               public IMessage Receive ()
+               {
+                       return Run (Receiver ());
+               }
+               
+               public IMessage Receive (TimeSpan timeout)
+               {
+                       return Run (Receiver (timeout));
+               }
+               
+               public IMessage Receive (TimeSpan timeout,
+                                        IMessageQueueTransaction transaction)
+               {
+                       return Run (transaction, Receiver (timeout));
+               }
+               
+               public IMessage Receive (TimeSpan timeout,
+                                        MessageQueueTransactionType transactionType)
+               {
+                       return Run (transactionType, Receiver (timeout));
+               }
+               
+               public IMessage Receive (IMessageQueueTransaction transaction)
+               {
+                       return Run (transaction, Receiver());
+               }
+               
+               public IMessage Receive (MessageQueueTransactionType transactionType)
+               {
+                       return Run (transactionType, Receiver ());
+               }               
+
+               public IMessage ReceiveById (string id)
+               {
+                       return Run (Receiver (ById (id)));
+               }
+
+               public IMessage ReceiveById (string id, TimeSpan timeout)
+               {
+                       return Run (Receiver (timeout, ById (id)));
+               }
+               
+               public IMessage ReceiveById (string id,
+                                            IMessageQueueTransaction transaction)
+               {
+                       return Run (transaction, Receiver (ById (id)));
+               }
+               
+               public IMessage ReceiveById (string id,
+                                            MessageQueueTransactionType transactionType)
+               {
+                       return Run (transactionType, Receiver (ById (id)));
+               }
+               
+               public IMessage ReceiveById (string id, TimeSpan timeout,
+                                            IMessageQueueTransaction transaction)
+               {
+                       return Run (transaction, Receiver (timeout, ById (id)));
+               }
+               
+               public IMessage ReceiveById (string id, TimeSpan timeout,
+                                            MessageQueueTransactionType transactionType)
+               {
+                       return Run (transactionType, Receiver (timeout, ById (id)));
+               }
+               
+               public IMessage ReceiveByCorrelationId (string id)
+               {
+                       return Run (Receiver (ByCorrelationId (id)));
+               }
+               
+               public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
+               {
+                       return Run (Receiver (timeout, ByCorrelationId (id)));
+               }
+               
+               public IMessage ReceiveByCorrelationId (string id,
+                                                       IMessageQueueTransaction transaction)
+               {
+                       return Run (transaction, Receiver (ByCorrelationId (id)));
+               }
+               
+               public IMessage ReceiveByCorrelationId (string id,
+                                                       MessageQueueTransactionType transactionType)
+               {
+                       return Run (transactionType, Receiver (ByCorrelationId (id)));
+               }
+               
+               public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
+                                                       IMessageQueueTransaction transaction)
+               {
+                       return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
+               }
+               
+               public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
+                                                       MessageQueueTransactionType transactionType)
+               {
+                       return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
+               }
+               
+               public IMessageEnumerator GetMessageEnumerator ()
+               {
+                       return new RabbitMQMessageEnumerator (helper, QRef);
+               }
+               
+               private IMessage Run (MessageQueueTransactionType transactionType,
+                                     TxReceiver.DoReceive r)
+               {
+                       switch (transactionType) {
+                       case MessageQueueTransactionType.Single:
+                               using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
+                                       bool success = false;
+                                       try {
+                                               IMessage msg = Run (tx, r);
+                                               tx.Commit ();
+                                               success = true;
+                                               return msg;
+                                       } finally {
+                                               if (!success)
+                                                       tx.Abort ();
+                                       }
+                               }
+
+                       case MessageQueueTransactionType.None:
+                               return Run (r);
+
+                       default:
+                               throw new NotSupportedException(transactionType + " not supported");
+                       }
+               }               
+
+               private IMessage Run (IMessageQueueTransaction transaction,
+                                     TxReceiver.DoReceive r)
+               {
+                       TxReceiver txr = new TxReceiver (this, r);
+                       RabbitMQMessageQueueTransaction tx = 
+                               (RabbitMQMessageQueueTransaction) transaction;
+                       return tx.RunReceive (txr.ReceiveInContext);                    
+               }
+               
+               private IMessage Run (TxReceiver.DoReceive r)
+               {
+                       ConnectionFactory cf = new ConnectionFactory ();
+                       using (IConnection cn = cf.CreateConnection (QRef.Host)) {
+                               using (IModel model = cn.CreateModel ()) {
+                                       return r (this, model);
+                               }
+                       }
+               }
+               
+               private IMessage ReceiveInContext (ref string host, ref IConnection cn, 
+                                                  ref IModel model, string txId)
+               {
+                       if (host == null)
+                               host = QRef.Host;
+                       else if (host != QRef.Host)
+                               throw new MonoMessagingException ("Transactions can not span multiple hosts");
+                       
+                       if (cn == null) {
+                               ConnectionFactory cf = new ConnectionFactory ();
+                               cn = cf.CreateConnection (host);
+                       }
+                       
+                       if (model == null) {
+                               model = cn.CreateModel ();
+                               model.TxSelect ();
+                       }
+                       
+                       return Receive (model, -1, true);
+               }               
+
+               private class TxReceiver
+               {
+                       private readonly DoReceive doReceive;
+                       private readonly RabbitMQMessageQueue q;
+                       
+                       public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
+                               this.q = q;
+                               this.doReceive = doReceive;
+                       }
+                       
+                       public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
+                       
+                       public IMessage ReceiveInContext (ref string host, ref IConnection cn, 
+                                                         ref IModel model, string txId)
+                       {
+                               if (host == null)
+                                       host = q.QRef.Host;
+                               else if (host != q.QRef.Host)
+                                       throw new MonoMessagingException ("Transactions can not span multiple hosts");
+                               
+                               if (cn == null) {
+                                       ConnectionFactory cf = new ConnectionFactory ();
+                                       cn = cf.CreateConnection (host);
+                               }
+                               
+                               if (model == null) {
+                                       model = cn.CreateModel ();
+                                       model.TxSelect ();
+                               }
+                               
+                               return doReceive (q, model);
+                       }
+               }
+               
+               private class DoReceiveWithTimeout
+               {
+                       private readonly int timeout;
+                       private readonly IsMatch matcher;
+                       private readonly bool ack;
+                       
+                       public DoReceiveWithTimeout (int timeout, IsMatch matcher)
+                               : this (timeout, matcher, true)
+                       {
+                       }
+                       
+                       public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
+                       {
+                               if (matcher != null && timeout == -1)
+                                       this.timeout = 500;
+                               else 
+                                       this.timeout = timeout;
+                               this.matcher = matcher;
+                               this.ack = ack;
+                       }
+                       
+                       public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
+                       {
+                               if (matcher == null)
+                                       return q.Receive (model, timeout, ack);
+                               else
+                                       return q.Receive (model, timeout, ack, matcher);
+                       }
+               }
+               
+               private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
+                                                             IsMatch matcher)
+               {
+                       int to = MessageFactory.TimeSpanToInt32 (timeout);
+                       return new DoReceiveWithTimeout (to, matcher).DoReceive;
+               }
+               
+               private static TxReceiver.DoReceive Receiver (IsMatch matcher)
+               {
+                       return new DoReceiveWithTimeout (-1, matcher).DoReceive;
+               }
+               
+               private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
+               {
+                       int to = MessageFactory.TimeSpanToInt32 (timeout);
+                       return new DoReceiveWithTimeout (to, null).DoReceive;
+               }
+
+               private TxReceiver.DoReceive Receiver ()
+               {
+                       return new DoReceiveWithTimeout (-1, null).DoReceive;
+               }               
+               
+               private TxReceiver.DoReceive Peeker ()
+               {
+                       return new DoReceiveWithTimeout (-1, null).DoReceive;
+               }               
+               
+               private TxReceiver.DoReceive Peeker (TimeSpan timeout)
+               {
+                       int to = MessageFactory.TimeSpanToInt32 (timeout);
+                       return new DoReceiveWithTimeout (to, null, false).DoReceive;
+               }               
+               
+               private TxReceiver.DoReceive Peeker (IsMatch matcher)
+               {
+                       return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
+               }               
+               
+               private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
+               {
+                       int to = MessageFactory.TimeSpanToInt32 (timeout);
+                       return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
+               }
+               
+               delegate bool IsMatch (BasicDeliverEventArgs result);           
+               
+               private class IdMatcher
+               {
+                       private readonly string id;
+                       public IdMatcher (string id)
+                       {
+                               this.id = id;
+                       }
+                       
+                       public bool MatchById (BasicDeliverEventArgs result)
+                       {
+                               return result.BasicProperties.MessageId == id;
+                       }
+               }
+               
+               private static IsMatch ById (string id)
+               {
+                       return new IdMatcher (id).MatchById;
+               }
+               
+               private class CorrelationIdMatcher
+               {
+                       private readonly string correlationId;
+                       public CorrelationIdMatcher (string correlationId)
+                       {
+                               this.correlationId = correlationId;
+                       }
+                       
+                       public bool MatchById (BasicDeliverEventArgs result)
+                       {
+                               return result.BasicProperties.CorrelationId == correlationId;
+                       }
+               }
+               
+               private static IsMatch ByCorrelationId (string correlationId)
+               {
+                       return new CorrelationIdMatcher (correlationId).MatchById;
+               }
+               
+               private IMessage Receive (IModel model, int timeout, bool doAck)
+               {
+                       string finalName = model.QueueDeclare (QRef.Queue, false);
+                       
+                       using (Subscription sub = new Subscription (model, finalName)) {
+                               BasicDeliverEventArgs result;
+                               if (sub.Next (timeout, out result)) {
+                                       IMessage m = helper.ReadMessage (QRef, result);
+                                       if (doAck)
+                                               sub.Ack (result);
+                                       return m;
+                               } else {
+                                       throw new MonoMessagingException ("No Message Available");
+                               }
+                       }
+               }
+                               
+               private IMessage Receive (IModel model, int timeout, 
+                                         bool doAck, IsMatch matcher)
+               {
+                       string finalName = model.QueueDeclare (QRef.Queue, false);
+                       
+                       using (Subscription sub = new Subscription (model, finalName)) {
+                               BasicDeliverEventArgs result;
+                               while (sub.Next (timeout, out result)) {
                                        
-                                       Subscription sub = new Subscription (ch, ticket, finalName);
-                                       BasicDeliverEventArgs result = sub.Next ();
-                                       sub.Ack (result);
-                                       sub.Close ();
-                                       if (result == null) {
-                                               throw new MonoMessagingException ("No Message Available");
-                                       } else {
-                                               IMessage m = MessageFactory.ReadMessage (QRef, result);
+                                       if (matcher (result)) {
+                                               IMessage m = helper.ReadMessage (QRef, result);
+                                               if (doAck)
+                                                       sub.Ack (result);
                                                return m;
                                        }
                                }
+                               
+                               throw new MessageUnavailableException ("Message not available");
                        }
                }
                
-               public IMessageEnumerator GetMessageEnumerator ()
+               private RabbitMQMessageQueueTransaction GetTx ()
                {
-                       return new RabbitMQMessageEnumerator (QRef);
-               }
+                       return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
+               }               
        }
 }