2 // Mono.Messaging.RabbitMQ
5 // Michael Barker (mike@middlesoft.co.uk)
7 // (C) 2008 Michael Barker
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 using System.Collections;
33 using System.ComponentModel;
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
44 namespace Mono.Messaging.RabbitMQ {
47 /// RabbitMQ Implementation of a message queue. Currrently this implementation
48 /// attempts to be as stateless as possible. Connection the AMQP server
49 /// are only created as needed.
51 public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
53 private bool authenticate = false;
54 private short basePriority = 0;
55 private Guid category = Guid.Empty;
56 private bool denySharedReceive = false;
57 private EncryptionRequired encryptionRequired;
58 private long maximumJournalSize = -1;
59 private long maximumQueueSize = -1;
60 private ISynchronizeInvoke synchronizingObject = null;
61 private bool useJournalQueue = false;
62 private QueueReference qRef = QueueReference.DEFAULT;
63 private readonly RabbitMQMessagingProvider provider;
64 private readonly MessageFactory helper;
65 private readonly string realm;
66 private readonly bool transactional;
68 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
70 : this (provider, QueueReference.DEFAULT, transactional)
74 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
77 : this (provider, "/data", qRef, transactional)
81 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
82 string realm, QueueReference qRef,
85 this.provider = provider;
86 this.helper = new MessageFactory (provider);
89 this.transactional = transactional;
92 protected override IMessageQueue Queue {
96 public bool Authenticate {
97 get { return authenticate; }
98 set { authenticate = value; }
101 public short BasePriority {
102 get { return basePriority; }
103 set { basePriority = value; }
106 public bool CanRead {
107 get { throw new NotImplementedException (); }
110 public bool CanWrite {
111 get { throw new NotImplementedException (); }
114 public Guid Category {
115 get { return category; }
116 set { category = value; }
119 public DateTime CreateTime {
120 get { throw new NotImplementedException (); }
123 public bool DenySharedReceive {
124 get { return denySharedReceive; }
125 set { denySharedReceive = value; }
128 public EncryptionRequired EncryptionRequired {
129 get { return encryptionRequired; }
130 set { encryptionRequired = value; }
134 get { throw new NotImplementedException (); }
137 public DateTime LastModifyTime {
138 get { throw new NotImplementedException (); }
141 public long MaximumJournalSize {
142 get { return maximumJournalSize; }
143 set { maximumJournalSize = value; }
146 public long MaximumQueueSize {
147 get { return maximumQueueSize; }
148 set { maximumQueueSize = value; }
151 public IntPtr ReadHandle {
152 get { throw new NotImplementedException (); }
155 public ISynchronizeInvoke SynchronizingObject {
156 get { return synchronizingObject; }
157 set { synchronizingObject = value; }
160 public bool Transactional {
161 get { return transactional; }
164 public bool UseJournalQueue {
165 get { return useJournalQueue; }
166 set { useJournalQueue = value; }
169 public IntPtr WriteHandle {
170 get { throw new NotImplementedException (); }
173 public QueueReference QRef {
175 set { qRef = value; }
178 private static long GetVersion (IConnection cn)
180 long version = cn.Protocol.MajorVersion;
181 version = version << 32;
182 version += cn.Protocol.MinorVersion;
186 private void SetDeliveryInfo (IMessage msg, long senderVersion,
187 string transactionId)
189 msg.SetDeliveryInfo (Acknowledgment.None,
192 Guid.NewGuid ().ToString () + "\\0",
203 // No-op (Queue are currently stateless)
206 public static void Delete (string realm, QueueReference qRef)
208 ConnectionFactory cf = new ConnectionFactory ();
210 using (IConnection cn = cf.CreateConnection (qRef.Host)) {
211 using (IModel model = cn.CreateModel ()) {
212 ushort ticket = model.AccessRequest (realm);
213 model.QueueDelete (ticket, qRef.Queue, false, false, false);
218 public void Send (IMessage msg)
220 if (QRef == QueueReference.DEFAULT)
221 throw new MonoMessagingException ("Path has not been specified");
223 if (msg.BodyStream == null)
224 throw new ArgumentException ("Message is not serialized properly");
226 ConnectionFactory cf = new ConnectionFactory ();
229 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
230 SetDeliveryInfo (msg, GetVersion (cn), null);
231 using (IModel ch = cn.CreateModel ()) {
235 } catch (BrokerUnreachableException e) {
236 throw new ConnectionException (QRef, e);
240 public void Send (IMessage msg, IMessageQueueTransaction transaction)
242 if (QRef == QueueReference.DEFAULT)
243 throw new MonoMessagingException ("Path has not been specified");
245 if (msg.BodyStream == null)
246 throw new ArgumentException ("Message is not serialized properly");
248 RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
250 tx.RunSend (SendInContext, msg);
253 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
255 switch (transactionType) {
256 case MessageQueueTransactionType.Single:
257 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
261 } catch (Exception e) {
263 throw new MonoMessagingException(e.Message, e);
268 case MessageQueueTransactionType.None:
272 case MessageQueueTransactionType.Automatic:
273 throw new NotSupportedException("Automatic transaction types not supported");
277 private void SendInContext (ref string host, ref IConnection cn,
278 ref IModel model, IMessage msg, string txId)
282 else if (host != QRef.Host)
283 throw new MonoMessagingException ("Transactions can not span multiple hosts");
286 ConnectionFactory cf = new ConnectionFactory ();
287 cn = cf.CreateConnection (host);
291 model = cn.CreateModel ();
295 SetDeliveryInfo (msg, GetVersion (cn), txId);
299 private void Send (IModel model, IMessage msg)
301 ushort ticket = model.AccessRequest ("/data");
302 string finalName = model.QueueDeclare (ticket, QRef.Queue, true);
303 IMessageBuilder mb = helper.WriteMessage (model, msg);
305 model.BasicPublish (ticket, "", finalName,
306 (IBasicProperties) mb.GetContentHeader(),
307 mb.GetContentBody ());
312 ConnectionFactory cf = new ConnectionFactory ();
314 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
315 using (IModel model = cn.CreateModel ()) {
316 ushort ticket = model.AccessRequest (realm);
317 model.QueuePurge (ticket, QRef.Queue, false);
322 public IMessage Peek ()
324 ConnectionFactory cf = new ConnectionFactory ();
326 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
327 using (IModel ch = cn.CreateModel ()) {
328 return Receive (ch, -1, false);
333 public IMessage Peek (TimeSpan timeout)
335 return Run (Peeker (timeout));
338 public IMessage PeekById (string id)
340 return Run (Peeker (ById (id)));
343 public IMessage PeekById (string id, TimeSpan timeout)
345 return Run (Peeker (timeout, ById (id)));
348 public IMessage PeekByCorrelationId (string id)
350 return Run (Peeker (ByCorrelationId (id)));
353 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
355 return Run (Peeker (timeout, ByCorrelationId (id)));
358 public IMessage Receive ()
360 return Run (Receiver ());
363 public IMessage Receive (TimeSpan timeout)
365 return Run (Receiver (timeout));
368 public IMessage Receive (TimeSpan timeout,
369 IMessageQueueTransaction transaction)
371 return Run (transaction, Receiver (timeout));
374 public IMessage Receive (TimeSpan timeout,
375 MessageQueueTransactionType transactionType)
377 return Run (transactionType, Receiver (timeout));
380 public IMessage Receive (IMessageQueueTransaction transaction)
382 return Run (transaction, Receiver());
385 public IMessage Receive (MessageQueueTransactionType transactionType)
387 return Run (transactionType, Receiver ());
390 public IMessage ReceiveById (string id)
392 return Run (Receiver (ById (id)));
395 public IMessage ReceiveById (string id, TimeSpan timeout)
397 return Run (Receiver (timeout, ById (id)));
400 public IMessage ReceiveById (string id,
401 IMessageQueueTransaction transaction)
403 return Run (transaction, Receiver (ById (id)));
406 public IMessage ReceiveById (string id,
407 MessageQueueTransactionType transactionType)
409 return Run (transactionType, Receiver (ById (id)));
412 public IMessage ReceiveById (string id, TimeSpan timeout,
413 IMessageQueueTransaction transaction)
415 return Run (transaction, Receiver (timeout, ById (id)));
418 public IMessage ReceiveById (string id, TimeSpan timeout,
419 MessageQueueTransactionType transactionType)
421 return Run (transactionType, Receiver (timeout, ById (id)));
424 public IMessage ReceiveByCorrelationId (string id)
426 return Run (Receiver (ByCorrelationId (id)));
429 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
431 return Run (Receiver (timeout, ByCorrelationId (id)));
434 public IMessage ReceiveByCorrelationId (string id,
435 IMessageQueueTransaction transaction)
437 return Run (transaction, Receiver (ByCorrelationId (id)));
440 public IMessage ReceiveByCorrelationId (string id,
441 MessageQueueTransactionType transactionType)
443 return Run (transactionType, Receiver (ByCorrelationId (id)));
446 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
447 IMessageQueueTransaction transaction)
449 return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
452 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
453 MessageQueueTransactionType transactionType)
455 return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
458 public IMessageEnumerator GetMessageEnumerator ()
460 return new RabbitMQMessageEnumerator (helper, QRef);
463 private IMessage Run (MessageQueueTransactionType transactionType,
464 TxReceiver.DoReceive r)
466 switch (transactionType) {
467 case MessageQueueTransactionType.Single:
468 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
469 bool success = false;
471 IMessage msg = Run (tx, r);
481 case MessageQueueTransactionType.None:
485 throw new NotSupportedException(transactionType + " not supported");
489 private IMessage Run (IMessageQueueTransaction transaction,
490 TxReceiver.DoReceive r)
492 TxReceiver txr = new TxReceiver (this, r);
493 RabbitMQMessageQueueTransaction tx =
494 (RabbitMQMessageQueueTransaction) transaction;
495 return tx.RunReceive (txr.ReceiveInContext);
498 private IMessage Run (TxReceiver.DoReceive r)
500 ConnectionFactory cf = new ConnectionFactory ();
501 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
502 using (IModel model = cn.CreateModel ()) {
503 return r (this, model);
508 private IMessage ReceiveInContext (ref string host, ref IConnection cn,
509 ref IModel model, string txId)
513 else if (host != QRef.Host)
514 throw new MonoMessagingException ("Transactions can not span multiple hosts");
517 ConnectionFactory cf = new ConnectionFactory ();
518 cn = cf.CreateConnection (host);
522 model = cn.CreateModel ();
526 return Receive (model, -1, true);
529 private class TxReceiver
531 private readonly DoReceive doReceive;
532 private readonly RabbitMQMessageQueue q;
534 public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
536 this.doReceive = doReceive;
539 public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
541 public IMessage ReceiveInContext (ref string host, ref IConnection cn,
542 ref IModel model, string txId)
546 else if (host != q.QRef.Host)
547 throw new MonoMessagingException ("Transactions can not span multiple hosts");
550 ConnectionFactory cf = new ConnectionFactory ();
551 cn = cf.CreateConnection (host);
555 model = cn.CreateModel ();
559 return doReceive (q, model);
563 private class DoReceiveWithTimeout
565 private readonly int timeout;
566 private readonly IsMatch matcher;
567 private readonly bool ack;
569 public DoReceiveWithTimeout (int timeout, IsMatch matcher)
570 : this (timeout, matcher, true)
574 public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
576 if (matcher != null && timeout == -1)
579 this.timeout = timeout;
580 this.matcher = matcher;
584 public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
587 return q.Receive (model, timeout, ack);
589 return q.Receive (model, timeout, ack, matcher);
593 private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
596 int to = TimeSpanToInt32 (timeout);
597 return new DoReceiveWithTimeout (to, matcher).DoReceive;
600 private static TxReceiver.DoReceive Receiver (IsMatch matcher)
602 return new DoReceiveWithTimeout (-1, matcher).DoReceive;
605 private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
607 int to = TimeSpanToInt32 (timeout);
608 return new DoReceiveWithTimeout (to, null).DoReceive;
611 private TxReceiver.DoReceive Receiver ()
613 return new DoReceiveWithTimeout (-1, null).DoReceive;
616 private TxReceiver.DoReceive Peeker ()
618 return new DoReceiveWithTimeout (-1, null).DoReceive;
621 private TxReceiver.DoReceive Peeker (TimeSpan timeout)
623 int to = TimeSpanToInt32 (timeout);
624 return new DoReceiveWithTimeout (to, null, false).DoReceive;
627 private TxReceiver.DoReceive Peeker (IsMatch matcher)
629 return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
632 private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
634 int to = TimeSpanToInt32 (timeout);
635 return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
638 delegate bool IsMatch (BasicDeliverEventArgs result);
640 private class IdMatcher
642 private readonly string id;
643 public IdMatcher (string id)
648 public bool MatchById (BasicDeliverEventArgs result)
650 return result.BasicProperties.MessageId == id;
654 private static IsMatch ById (string id)
656 return new IdMatcher (id).MatchById;
659 private class CorrelationIdMatcher
661 private readonly string correlationId;
662 public CorrelationIdMatcher (string correlationId)
664 this.correlationId = correlationId;
667 public bool MatchById (BasicDeliverEventArgs result)
669 return result.BasicProperties.CorrelationId == correlationId;
673 private static IsMatch ByCorrelationId (string correlationId)
675 return new CorrelationIdMatcher (correlationId).MatchById;
678 private IMessage Receive (IModel model, int timeout, bool doAck)
680 ushort ticket = model.AccessRequest (realm);
681 string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
683 using (Subscription sub = new Subscription (model, ticket, finalName)) {
684 BasicDeliverEventArgs result;
685 if (sub.Next (timeout, out result)) {
686 IMessage m = helper.ReadMessage (QRef, result);
691 throw new MonoMessagingException ("No Message Available");
696 private IMessage Receive (IModel model, int timeout,
697 bool doAck, IsMatch matcher)
699 ushort ticket = model.AccessRequest (realm);
700 string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
702 using (Subscription sub = new Subscription (model, ticket, finalName)) {
703 BasicDeliverEventArgs result;
704 while (sub.Next (timeout, out result)) {
706 if (matcher (result)) {
707 IMessage m = helper.ReadMessage (QRef, result);
714 throw new MessageUnavailableException ("Message not available");
718 private RabbitMQMessageQueueTransaction GetTx ()
720 return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
723 private static int TimeSpanToInt32 (TimeSpan timespan)
725 if (timespan == TimeSpan.MaxValue)
728 return (int) timespan.TotalMilliseconds;