2 // Mono.Messaging.RabbitMQ
5 // Michael Barker (mike@middlesoft.co.uk)
7 // (C) 2008 Michael Barker
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 using System.Collections;
33 using System.ComponentModel;
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
44 namespace Mono.Messaging.RabbitMQ {
47 /// RabbitMQ Implementation of a message queue. Currrently this implementation
48 /// attempts to be as stateless as possible. Connection the AMQP server
49 /// are only created as needed.
51 public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
53 private bool authenticate = false;
54 private short basePriority = 0;
55 private Guid category = Guid.Empty;
56 private bool denySharedReceive = false;
57 private EncryptionRequired encryptionRequired;
58 private long maximumJournalSize = -1;
59 private long maximumQueueSize = -1;
60 private ISynchronizeInvoke synchronizingObject = null;
61 private bool useJournalQueue = false;
62 private QueueReference qRef = QueueReference.DEFAULT;
63 private readonly RabbitMQMessagingProvider provider;
64 private readonly MessageFactory helper;
65 private readonly bool transactional;
67 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
69 : this (provider, QueueReference.DEFAULT, transactional)
73 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
77 this.provider = provider;
78 this.helper = new MessageFactory (provider);
80 this.transactional = transactional;
83 protected override IMessageQueue Queue {
87 public bool Authenticate {
88 get { return authenticate; }
89 set { authenticate = value; }
92 public short BasePriority {
93 get { return basePriority; }
94 set { basePriority = value; }
98 get { throw new NotImplementedException (); }
101 public bool CanWrite {
102 get { throw new NotImplementedException (); }
105 public Guid Category {
106 get { return category; }
107 set { category = value; }
110 public DateTime CreateTime {
111 get { throw new NotImplementedException (); }
114 public bool DenySharedReceive {
115 get { return denySharedReceive; }
116 set { denySharedReceive = value; }
119 public EncryptionRequired EncryptionRequired {
120 get { return encryptionRequired; }
121 set { encryptionRequired = value; }
125 get { throw new NotImplementedException (); }
128 public DateTime LastModifyTime {
129 get { throw new NotImplementedException (); }
132 public long MaximumJournalSize {
133 get { return maximumJournalSize; }
134 set { maximumJournalSize = value; }
137 public long MaximumQueueSize {
138 get { return maximumQueueSize; }
139 set { maximumQueueSize = value; }
142 public IntPtr ReadHandle {
143 get { throw new NotImplementedException (); }
146 public ISynchronizeInvoke SynchronizingObject {
147 get { return synchronizingObject; }
148 set { synchronizingObject = value; }
151 public bool Transactional {
152 get { return transactional; }
155 public bool UseJournalQueue {
156 get { return useJournalQueue; }
157 set { useJournalQueue = value; }
160 public IntPtr WriteHandle {
161 get { throw new NotImplementedException (); }
164 public QueueReference QRef {
166 set { qRef = value; }
169 private static long GetVersion (IConnection cn)
171 long version = cn.Protocol.MajorVersion;
172 version = version << 32;
173 version += cn.Protocol.MinorVersion;
177 private void SetDeliveryInfo (IMessage msg, long senderVersion,
178 string transactionId)
180 msg.SetDeliveryInfo (Acknowledgment.None,
183 Guid.NewGuid ().ToString () + "\\0",
194 // No-op (Queue are currently stateless)
197 public static void Delete (QueueReference qRef)
199 ConnectionFactory cf = new ConnectionFactory ();
201 using (IConnection cn = cf.CreateConnection (qRef.Host)) {
202 using (IModel model = cn.CreateModel ()) {
203 model.QueueDelete (qRef.Queue, false, false, false);
208 public void Send (IMessage msg)
210 if (QRef == QueueReference.DEFAULT)
211 throw new MonoMessagingException ("Path has not been specified");
213 if (msg.BodyStream == null)
214 throw new ArgumentException ("Message is not serialized properly");
216 ConnectionFactory cf = new ConnectionFactory ();
219 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
220 SetDeliveryInfo (msg, GetVersion (cn), null);
221 using (IModel ch = cn.CreateModel ()) {
225 } catch (BrokerUnreachableException e) {
226 throw new ConnectionException (QRef, e);
230 public void Send (IMessage msg, IMessageQueueTransaction transaction)
232 if (QRef == QueueReference.DEFAULT)
233 throw new MonoMessagingException ("Path has not been specified");
235 if (msg.BodyStream == null)
236 throw new ArgumentException ("Message is not serialized properly");
238 RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
240 tx.RunSend (SendInContext, msg);
243 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
245 switch (transactionType) {
246 case MessageQueueTransactionType.Single:
247 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
251 } catch (Exception e) {
253 throw new MonoMessagingException(e.Message, e);
258 case MessageQueueTransactionType.None:
262 case MessageQueueTransactionType.Automatic:
263 throw new NotSupportedException("Automatic transaction types not supported");
267 private void SendInContext (ref string host, ref IConnection cn,
268 ref IModel model, IMessage msg, string txId)
272 else if (host != QRef.Host)
273 throw new MonoMessagingException ("Transactions can not span multiple hosts");
276 ConnectionFactory cf = new ConnectionFactory ();
277 cn = cf.CreateConnection (host);
281 model = cn.CreateModel ();
285 SetDeliveryInfo (msg, GetVersion (cn), txId);
289 private void Send (IModel model, IMessage msg)
291 string finalName = model.QueueDeclare (QRef.Queue, true);
292 IMessageBuilder mb = helper.WriteMessage (model, msg);
294 model.BasicPublish ("", finalName,
295 (IBasicProperties) mb.GetContentHeader(),
296 mb.GetContentBody ());
301 ConnectionFactory cf = new ConnectionFactory ();
303 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
304 using (IModel model = cn.CreateModel ()) {
305 model.QueuePurge (QRef.Queue, false);
310 public IMessage Peek ()
312 ConnectionFactory cf = new ConnectionFactory ();
314 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
315 using (IModel ch = cn.CreateModel ()) {
316 return Receive (ch, -1, false);
321 public IMessage Peek (TimeSpan timeout)
323 return Run (Peeker (timeout));
326 public IMessage PeekById (string id)
328 return Run (Peeker (ById (id)));
331 public IMessage PeekById (string id, TimeSpan timeout)
333 return Run (Peeker (timeout, ById (id)));
336 public IMessage PeekByCorrelationId (string id)
338 return Run (Peeker (ByCorrelationId (id)));
341 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
343 return Run (Peeker (timeout, ByCorrelationId (id)));
346 public IMessage Receive ()
348 return Run (Receiver ());
351 public IMessage Receive (TimeSpan timeout)
353 return Run (Receiver (timeout));
356 public IMessage Receive (TimeSpan timeout,
357 IMessageQueueTransaction transaction)
359 return Run (transaction, Receiver (timeout));
362 public IMessage Receive (TimeSpan timeout,
363 MessageQueueTransactionType transactionType)
365 return Run (transactionType, Receiver (timeout));
368 public IMessage Receive (IMessageQueueTransaction transaction)
370 return Run (transaction, Receiver());
373 public IMessage Receive (MessageQueueTransactionType transactionType)
375 return Run (transactionType, Receiver ());
378 public IMessage ReceiveById (string id)
380 return Run (Receiver (ById (id)));
383 public IMessage ReceiveById (string id, TimeSpan timeout)
385 return Run (Receiver (timeout, ById (id)));
388 public IMessage ReceiveById (string id,
389 IMessageQueueTransaction transaction)
391 return Run (transaction, Receiver (ById (id)));
394 public IMessage ReceiveById (string id,
395 MessageQueueTransactionType transactionType)
397 return Run (transactionType, Receiver (ById (id)));
400 public IMessage ReceiveById (string id, TimeSpan timeout,
401 IMessageQueueTransaction transaction)
403 return Run (transaction, Receiver (timeout, ById (id)));
406 public IMessage ReceiveById (string id, TimeSpan timeout,
407 MessageQueueTransactionType transactionType)
409 return Run (transactionType, Receiver (timeout, ById (id)));
412 public IMessage ReceiveByCorrelationId (string id)
414 return Run (Receiver (ByCorrelationId (id)));
417 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
419 return Run (Receiver (timeout, ByCorrelationId (id)));
422 public IMessage ReceiveByCorrelationId (string id,
423 IMessageQueueTransaction transaction)
425 return Run (transaction, Receiver (ByCorrelationId (id)));
428 public IMessage ReceiveByCorrelationId (string id,
429 MessageQueueTransactionType transactionType)
431 return Run (transactionType, Receiver (ByCorrelationId (id)));
434 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
435 IMessageQueueTransaction transaction)
437 return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
440 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
441 MessageQueueTransactionType transactionType)
443 return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
446 public IMessageEnumerator GetMessageEnumerator ()
448 return new RabbitMQMessageEnumerator (helper, QRef);
451 private IMessage Run (MessageQueueTransactionType transactionType,
452 TxReceiver.DoReceive r)
454 switch (transactionType) {
455 case MessageQueueTransactionType.Single:
456 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
457 bool success = false;
459 IMessage msg = Run (tx, r);
469 case MessageQueueTransactionType.None:
473 throw new NotSupportedException(transactionType + " not supported");
477 private IMessage Run (IMessageQueueTransaction transaction,
478 TxReceiver.DoReceive r)
480 TxReceiver txr = new TxReceiver (this, r);
481 RabbitMQMessageQueueTransaction tx =
482 (RabbitMQMessageQueueTransaction) transaction;
483 return tx.RunReceive (txr.ReceiveInContext);
486 private IMessage Run (TxReceiver.DoReceive r)
488 ConnectionFactory cf = new ConnectionFactory ();
489 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
490 using (IModel model = cn.CreateModel ()) {
491 return r (this, model);
496 private IMessage ReceiveInContext (ref string host, ref IConnection cn,
497 ref IModel model, string txId)
501 else if (host != QRef.Host)
502 throw new MonoMessagingException ("Transactions can not span multiple hosts");
505 ConnectionFactory cf = new ConnectionFactory ();
506 cn = cf.CreateConnection (host);
510 model = cn.CreateModel ();
514 return Receive (model, -1, true);
517 private class TxReceiver
519 private readonly DoReceive doReceive;
520 private readonly RabbitMQMessageQueue q;
522 public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
524 this.doReceive = doReceive;
527 public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
529 public IMessage ReceiveInContext (ref string host, ref IConnection cn,
530 ref IModel model, string txId)
534 else if (host != q.QRef.Host)
535 throw new MonoMessagingException ("Transactions can not span multiple hosts");
538 ConnectionFactory cf = new ConnectionFactory ();
539 cn = cf.CreateConnection (host);
543 model = cn.CreateModel ();
547 return doReceive (q, model);
551 private class DoReceiveWithTimeout
553 private readonly int timeout;
554 private readonly IsMatch matcher;
555 private readonly bool ack;
557 public DoReceiveWithTimeout (int timeout, IsMatch matcher)
558 : this (timeout, matcher, true)
562 public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
564 if (matcher != null && timeout == -1)
567 this.timeout = timeout;
568 this.matcher = matcher;
572 public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
575 return q.Receive (model, timeout, ack);
577 return q.Receive (model, timeout, ack, matcher);
581 private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
584 int to = MessageFactory.TimeSpanToInt32 (timeout);
585 return new DoReceiveWithTimeout (to, matcher).DoReceive;
588 private static TxReceiver.DoReceive Receiver (IsMatch matcher)
590 return new DoReceiveWithTimeout (-1, matcher).DoReceive;
593 private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
595 int to = MessageFactory.TimeSpanToInt32 (timeout);
596 return new DoReceiveWithTimeout (to, null).DoReceive;
599 private TxReceiver.DoReceive Receiver ()
601 return new DoReceiveWithTimeout (-1, null).DoReceive;
604 private TxReceiver.DoReceive Peeker ()
606 return new DoReceiveWithTimeout (-1, null).DoReceive;
609 private TxReceiver.DoReceive Peeker (TimeSpan timeout)
611 int to = MessageFactory.TimeSpanToInt32 (timeout);
612 return new DoReceiveWithTimeout (to, null, false).DoReceive;
615 private TxReceiver.DoReceive Peeker (IsMatch matcher)
617 return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
620 private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
622 int to = MessageFactory.TimeSpanToInt32 (timeout);
623 return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
626 delegate bool IsMatch (BasicDeliverEventArgs result);
628 private class IdMatcher
630 private readonly string id;
631 public IdMatcher (string id)
636 public bool MatchById (BasicDeliverEventArgs result)
638 return result.BasicProperties.MessageId == id;
642 private static IsMatch ById (string id)
644 return new IdMatcher (id).MatchById;
647 private class CorrelationIdMatcher
649 private readonly string correlationId;
650 public CorrelationIdMatcher (string correlationId)
652 this.correlationId = correlationId;
655 public bool MatchById (BasicDeliverEventArgs result)
657 return result.BasicProperties.CorrelationId == correlationId;
661 private static IsMatch ByCorrelationId (string correlationId)
663 return new CorrelationIdMatcher (correlationId).MatchById;
666 private IMessage Receive (IModel model, int timeout, bool doAck)
668 string finalName = model.QueueDeclare (QRef.Queue, false);
670 using (Subscription sub = new Subscription (model, finalName)) {
671 BasicDeliverEventArgs result;
672 if (sub.Next (timeout, out result)) {
673 IMessage m = helper.ReadMessage (QRef, result);
678 throw new MonoMessagingException ("No Message Available");
683 private IMessage Receive (IModel model, int timeout,
684 bool doAck, IsMatch matcher)
686 string finalName = model.QueueDeclare (QRef.Queue, false);
688 using (Subscription sub = new Subscription (model, finalName)) {
689 BasicDeliverEventArgs result;
690 while (sub.Next (timeout, out result)) {
692 if (matcher (result)) {
693 IMessage m = helper.ReadMessage (QRef, result);
700 throw new MessageUnavailableException ("Message not available");
704 private RabbitMQMessageQueueTransaction GetTx ()
706 return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();