2 // Mono.Messaging.RabbitMQ
5 // Michael Barker (mike@middlesoft.co.uk)
7 // (C) 2008 Michael Barker
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 using System.Collections;
33 using System.ComponentModel;
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
44 namespace Mono.Messaging.RabbitMQ {
46 public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
48 private bool authenticate = false;
49 private short basePriority = 0;
50 private Guid category = Guid.Empty;
51 private bool denySharedReceive = false;
52 private EncryptionRequired encryptionRequired;
53 private long maximumJournalSize = -1;
54 private long maximumQueueSize = -1;
55 private ISynchronizeInvoke synchronizingObject = null;
56 private bool useJournalQueue = false;
57 private QueueReference qRef = QueueReference.DEFAULT;
58 private readonly RabbitMQMessagingProvider provider;
59 private readonly MessageFactory helper;
60 private readonly string realm;
61 private readonly bool transactional;
63 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
65 : this (provider, QueueReference.DEFAULT, transactional)
69 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
72 : this (provider, "/data", qRef, transactional)
76 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
77 string realm, QueueReference qRef,
80 this.provider = provider;
81 this.helper = new MessageFactory (provider);
84 this.transactional = transactional;
87 protected override IMessageQueue Queue {
91 public bool Authenticate {
92 get { return authenticate; }
93 set { authenticate = value; }
96 public short BasePriority {
97 get { return basePriority; }
98 set { basePriority = value; }
101 public bool CanRead {
102 get { throw new NotImplementedException (); }
105 public bool CanWrite {
106 get { throw new NotImplementedException (); }
109 public Guid Category {
110 get { return category; }
111 set { category = value; }
114 public DateTime CreateTime {
115 get { throw new NotImplementedException (); }
118 public bool DenySharedReceive {
119 get { return denySharedReceive; }
120 set { denySharedReceive = value; }
123 public EncryptionRequired EncryptionRequired {
124 get { return encryptionRequired; }
125 set { encryptionRequired = value; }
129 get { throw new NotImplementedException (); }
132 public DateTime LastModifyTime {
133 get { throw new NotImplementedException (); }
136 public long MaximumJournalSize {
137 get { return maximumJournalSize; }
138 set { maximumJournalSize = value; }
141 public long MaximumQueueSize {
142 get { return maximumQueueSize; }
143 set { maximumQueueSize = value; }
146 public IntPtr ReadHandle {
147 get { throw new NotImplementedException (); }
150 public ISynchronizeInvoke SynchronizingObject {
151 get { return synchronizingObject; }
152 set { synchronizingObject = value; }
155 public bool Transactional {
156 get { return transactional; }
159 public bool UseJournalQueue {
160 get { return useJournalQueue; }
161 set { useJournalQueue = value; }
164 public IntPtr WriteHandle {
165 get { throw new NotImplementedException (); }
168 public QueueReference QRef {
170 set { qRef = value; }
173 private static long GetVersion (IConnection cn)
175 long version = cn.Protocol.MajorVersion;
176 version = version << 32;
177 version += cn.Protocol.MinorVersion;
181 private void SetDeliveryInfo (IMessage msg, long senderVersion,
182 string transactionId)
184 msg.SetDeliveryInfo (Acknowledgment.None,
187 Guid.NewGuid ().ToString () + "\\0",
198 // No-op (Queue are currently stateless)
201 public static void Delete (string realm, QueueReference qRef)
203 ConnectionFactory cf = new ConnectionFactory ();
205 using (IConnection cn = cf.CreateConnection (qRef.Host)) {
206 using (IModel model = cn.CreateModel ()) {
207 ushort ticket = model.AccessRequest (realm);
208 model.QueueDelete (ticket, qRef.Queue, false, false, false);
213 public void Send (IMessage msg)
215 if (QRef == QueueReference.DEFAULT)
216 throw new MonoMessagingException ("Path has not been specified");
218 if (msg.BodyStream == null)
219 throw new ArgumentException ("Message is not serialized properly");
221 ConnectionFactory cf = new ConnectionFactory ();
224 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
225 SetDeliveryInfo (msg, GetVersion (cn), null);
226 using (IModel ch = cn.CreateModel ()) {
230 } catch (BrokerUnreachableException e) {
231 throw new ConnectionException (QRef, e);
235 public void Send (IMessage msg, IMessageQueueTransaction transaction)
237 if (QRef == QueueReference.DEFAULT)
238 throw new MonoMessagingException ("Path has not been specified");
240 if (msg.BodyStream == null)
241 throw new ArgumentException ("Message is not serialized properly");
243 RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
245 tx.RunSend (SendInContext, msg);
248 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
250 switch (transactionType) {
251 case MessageQueueTransactionType.Single:
252 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
256 } catch (Exception e) {
258 throw new MonoMessagingException(e.Message, e);
263 case MessageQueueTransactionType.None:
267 case MessageQueueTransactionType.Automatic:
268 throw new NotSupportedException("Automatic transaction types not supported");
272 private void SendInContext (ref string host, ref IConnection cn,
273 ref IModel model, IMessage msg, string txId)
277 else if (host != QRef.Host)
278 throw new MonoMessagingException ("Transactions can not span multiple hosts");
281 ConnectionFactory cf = new ConnectionFactory ();
282 cn = cf.CreateConnection (host);
286 model = cn.CreateModel ();
290 SetDeliveryInfo (msg, GetVersion (cn), txId);
294 private void Send (IModel model, IMessage msg)
296 ushort ticket = model.AccessRequest ("/data");
297 string finalName = model.QueueDeclare (ticket, QRef.Queue, true);
298 IMessageBuilder mb = helper.WriteMessage (model, msg);
300 model.BasicPublish (ticket, "", finalName,
301 (IBasicProperties) mb.GetContentHeader(),
302 mb.GetContentBody ());
307 ConnectionFactory cf = new ConnectionFactory ();
309 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
310 using (IModel model = cn.CreateModel ()) {
311 ushort ticket = model.AccessRequest (realm);
312 model.QueuePurge (ticket, QRef.Queue, false);
317 public IMessage Peek ()
319 ConnectionFactory cf = new ConnectionFactory ();
321 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
322 using (IModel ch = cn.CreateModel ()) {
323 return Receive (ch, -1, false);
328 public IMessage Peek (TimeSpan timeout)
330 return Run (Peeker (timeout));
333 public IMessage PeekById (string id)
335 return Run (Peeker (ById (id)));
338 public IMessage PeekById (string id, TimeSpan timeout)
340 return Run (Peeker (timeout, ById (id)));
343 public IMessage PeekByCorrelationId (string id)
345 return Run (Peeker (ByCorrelationId (id)));
348 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
350 return Run (Peeker (timeout, ByCorrelationId (id)));
353 public IMessage Receive ()
355 return Run (Receiver ());
358 public IMessage Receive (TimeSpan timeout)
360 return Run (Receiver (timeout));
363 public IMessage Receive (TimeSpan timeout,
364 IMessageQueueTransaction transaction)
366 return Run (transaction, Receiver (timeout));
369 public IMessage Receive (TimeSpan timeout,
370 MessageQueueTransactionType transactionType)
372 return Run (transactionType, Receiver (timeout));
375 public IMessage Receive (IMessageQueueTransaction transaction)
377 return Run (transaction, Receiver());
380 public IMessage Receive (MessageQueueTransactionType transactionType)
382 return Run (transactionType, Receiver ());
385 public IMessage ReceiveById (string id)
387 return Run (Receiver (ById (id)));
390 public IMessage ReceiveById (string id, TimeSpan timeout)
392 return Run (Receiver (timeout, ById (id)));
395 public IMessage ReceiveById (string id,
396 IMessageQueueTransaction transaction)
398 return Run (transaction, Receiver (ById (id)));
401 public IMessage ReceiveById (string id,
402 MessageQueueTransactionType transactionType)
404 return Run (transactionType, Receiver (ById (id)));
407 public IMessage ReceiveById (string id, TimeSpan timeout,
408 IMessageQueueTransaction transaction)
410 return Run (transaction, Receiver (timeout, ById (id)));
413 public IMessage ReceiveById (string id, TimeSpan timeout,
414 MessageQueueTransactionType transactionType)
416 return Run (transactionType, Receiver (timeout, ById (id)));
419 public IMessage ReceiveByCorrelationId (string id)
421 return Run (Receiver (ByCorrelationId (id)));
424 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
426 return Run (Receiver (timeout, ByCorrelationId (id)));
429 public IMessage ReceiveByCorrelationId (string id,
430 IMessageQueueTransaction transaction)
432 return Run (transaction, Receiver (ByCorrelationId (id)));
435 public IMessage ReceiveByCorrelationId (string id,
436 MessageQueueTransactionType transactionType)
438 return Run (transactionType, Receiver (ByCorrelationId (id)));
441 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
442 IMessageQueueTransaction transaction)
444 return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
447 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
448 MessageQueueTransactionType transactionType)
450 return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
453 public IMessageEnumerator GetMessageEnumerator ()
455 return new RabbitMQMessageEnumerator (helper, QRef);
458 private IMessage Run (MessageQueueTransactionType transactionType,
459 TxReceiver.DoReceive r)
461 switch (transactionType) {
462 case MessageQueueTransactionType.Single:
463 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
464 bool success = false;
466 IMessage msg = Run (tx, r);
476 case MessageQueueTransactionType.None:
480 throw new NotSupportedException(transactionType + " not supported");
484 private IMessage Run (IMessageQueueTransaction transaction,
485 TxReceiver.DoReceive r)
487 TxReceiver txr = new TxReceiver (this, r);
488 RabbitMQMessageQueueTransaction tx =
489 (RabbitMQMessageQueueTransaction) transaction;
490 return tx.RunReceive (txr.ReceiveInContext);
493 private IMessage Run (TxReceiver.DoReceive r)
495 ConnectionFactory cf = new ConnectionFactory ();
496 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
497 using (IModel model = cn.CreateModel ()) {
498 return r (this, model);
503 private IMessage ReceiveInContext (ref string host, ref IConnection cn,
504 ref IModel model, string txId)
508 else if (host != QRef.Host)
509 throw new MonoMessagingException ("Transactions can not span multiple hosts");
512 ConnectionFactory cf = new ConnectionFactory ();
513 cn = cf.CreateConnection (host);
517 model = cn.CreateModel ();
521 return Receive (model, -1, true);
524 private class TxReceiver
526 private readonly DoReceive doReceive;
527 private readonly RabbitMQMessageQueue q;
529 public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
531 this.doReceive = doReceive;
534 public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
536 public IMessage ReceiveInContext (ref string host, ref IConnection cn,
537 ref IModel model, string txId)
541 else if (host != q.QRef.Host)
542 throw new MonoMessagingException ("Transactions can not span multiple hosts");
545 ConnectionFactory cf = new ConnectionFactory ();
546 cn = cf.CreateConnection (host);
550 model = cn.CreateModel ();
554 return doReceive (q, model);
558 private class DoReceiveWithTimeout
560 private readonly int timeout;
561 private readonly IsMatch matcher;
562 private readonly bool ack;
564 public DoReceiveWithTimeout (int timeout, IsMatch matcher)
565 : this (timeout, matcher, true)
569 public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
571 if (matcher != null && timeout == -1)
574 this.timeout = timeout;
575 this.matcher = matcher;
579 public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
582 return q.Receive (model, timeout, ack);
584 return q.Receive (model, timeout, ack, matcher);
588 private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
591 int to = TimeSpanToInt32 (timeout);
592 return new DoReceiveWithTimeout (to, matcher).DoReceive;
595 private static TxReceiver.DoReceive Receiver (IsMatch matcher)
597 return new DoReceiveWithTimeout (-1, matcher).DoReceive;
600 private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
602 int to = TimeSpanToInt32 (timeout);
603 return new DoReceiveWithTimeout (to, null).DoReceive;
606 private TxReceiver.DoReceive Receiver ()
608 return new DoReceiveWithTimeout (-1, null).DoReceive;
611 private TxReceiver.DoReceive Peeker ()
613 return new DoReceiveWithTimeout (-1, null).DoReceive;
616 private TxReceiver.DoReceive Peeker (TimeSpan timeout)
618 int to = TimeSpanToInt32 (timeout);
619 return new DoReceiveWithTimeout (to, null, false).DoReceive;
622 private TxReceiver.DoReceive Peeker (IsMatch matcher)
624 return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
627 private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
629 int to = TimeSpanToInt32 (timeout);
630 return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
633 delegate bool IsMatch (BasicDeliverEventArgs result);
635 private class IdMatcher
637 private readonly string id;
638 public IdMatcher (string id)
643 public bool MatchById (BasicDeliverEventArgs result)
645 return result.BasicProperties.MessageId == id;
649 private static IsMatch ById (string id)
651 return new IdMatcher (id).MatchById;
654 private class CorrelationIdMatcher
656 private readonly string correlationId;
657 public CorrelationIdMatcher (string correlationId)
659 this.correlationId = correlationId;
662 public bool MatchById (BasicDeliverEventArgs result)
664 return result.BasicProperties.CorrelationId == correlationId;
668 private static IsMatch ByCorrelationId (string correlationId)
670 return new CorrelationIdMatcher (correlationId).MatchById;
673 private IMessage Receive (IModel model, int timeout, bool doAck)
675 ushort ticket = model.AccessRequest (realm);
676 string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
678 using (Subscription sub = new Subscription (model, ticket, finalName)) {
679 BasicDeliverEventArgs result;
680 if (sub.Next (timeout, out result)) {
681 IMessage m = helper.ReadMessage (QRef, result);
686 throw new MonoMessagingException ("No Message Available");
691 private IMessage Receive (IModel model, int timeout,
692 bool doAck, IsMatch matcher)
694 ushort ticket = model.AccessRequest (realm);
695 string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
697 using (Subscription sub = new Subscription (model, ticket, finalName)) {
698 BasicDeliverEventArgs result;
699 while (sub.Next (timeout, out result)) {
701 if (matcher (result)) {
702 IMessage m = helper.ReadMessage (QRef, result);
709 throw new MessageUnavailableException ("Message not available");
713 private RabbitMQMessageQueueTransaction GetTx ()
715 return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
718 private static int TimeSpanToInt32 (TimeSpan timespan)
720 if (timespan == TimeSpan.MaxValue)
723 return (int) timespan.TotalMilliseconds;