2 // Mono.Messaging.RabbitMQ
5 // Michael Barker (mike@middlesoft.co.uk)
7 // (C) 2008 Michael Barker
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 using System.Collections;
33 using System.ComponentModel;
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
44 namespace Mono.Messaging.RabbitMQ {
47 /// RabbitMQ Implementation of a message queue. Currrently this implementation
48 /// attempts to be as stateless as possible. Connection the AMQP server
49 /// are only created as needed.
51 public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
53 private bool authenticate = false;
54 private short basePriority = 0;
55 private Guid category = Guid.Empty;
56 private bool denySharedReceive = false;
57 private EncryptionRequired encryptionRequired;
58 private long maximumJournalSize = -1;
59 private long maximumQueueSize = -1;
60 private ISynchronizeInvoke synchronizingObject = null;
61 private bool useJournalQueue = false;
62 private QueueReference qRef = QueueReference.DEFAULT;
63 private readonly RabbitMQMessagingProvider provider;
64 private readonly MessageFactory helper;
65 private readonly string realm;
66 private readonly bool transactional;
68 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
70 : this (provider, QueueReference.DEFAULT, transactional)
74 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
77 : this (provider, "/data", qRef, transactional)
81 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
82 string realm, QueueReference qRef,
85 this.provider = provider;
86 this.helper = new MessageFactory (provider);
89 this.transactional = transactional;
92 protected override IMessageQueue Queue {
96 public bool Authenticate {
97 get { return authenticate; }
98 set { authenticate = value; }
101 public short BasePriority {
102 get { return basePriority; }
103 set { basePriority = value; }
106 public bool CanRead {
107 get { throw new NotImplementedException (); }
110 public bool CanWrite {
111 get { throw new NotImplementedException (); }
114 public Guid Category {
115 get { return category; }
116 set { category = value; }
119 public DateTime CreateTime {
120 get { throw new NotImplementedException (); }
123 public bool DenySharedReceive {
124 get { return denySharedReceive; }
125 set { denySharedReceive = value; }
128 public EncryptionRequired EncryptionRequired {
129 get { return encryptionRequired; }
130 set { encryptionRequired = value; }
134 get { throw new NotImplementedException (); }
137 public DateTime LastModifyTime {
138 get { throw new NotImplementedException (); }
141 public long MaximumJournalSize {
142 get { return maximumJournalSize; }
143 set { maximumJournalSize = value; }
146 public long MaximumQueueSize {
147 get { return maximumQueueSize; }
148 set { maximumQueueSize = value; }
151 public IntPtr ReadHandle {
152 get { throw new NotImplementedException (); }
155 public ISynchronizeInvoke SynchronizingObject {
156 get { return synchronizingObject; }
157 set { synchronizingObject = value; }
160 public bool Transactional {
161 get { return transactional; }
164 public bool UseJournalQueue {
165 get { return useJournalQueue; }
166 set { useJournalQueue = value; }
169 public IntPtr WriteHandle {
170 get { throw new NotImplementedException (); }
173 public QueueReference QRef {
175 set { qRef = value; }
178 private static long GetVersion (IConnection cn)
180 long version = cn.Protocol.MajorVersion;
181 version = version << 32;
182 version += cn.Protocol.MinorVersion;
186 private void SetDeliveryInfo (IMessage msg, long senderVersion,
187 string transactionId)
189 msg.SetDeliveryInfo (Acknowledgment.None,
192 Guid.NewGuid ().ToString () + "\\0",
203 // No-op (Queue are currently stateless)
206 public static void Delete (string realm, QueueReference qRef)
208 ConnectionFactory cf = new ConnectionFactory ();
210 using (IConnection cn = cf.CreateConnection (qRef.Host)) {
211 using (IModel model = cn.CreateModel ()) {
212 model.QueueDelete (qRef.Queue, false, false, false);
217 public void Send (IMessage msg)
219 if (QRef == QueueReference.DEFAULT)
220 throw new MonoMessagingException ("Path has not been specified");
222 if (msg.BodyStream == null)
223 throw new ArgumentException ("Message is not serialized properly");
225 ConnectionFactory cf = new ConnectionFactory ();
228 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
229 SetDeliveryInfo (msg, GetVersion (cn), null);
230 using (IModel ch = cn.CreateModel ()) {
234 } catch (BrokerUnreachableException e) {
235 throw new ConnectionException (QRef, e);
239 public void Send (IMessage msg, IMessageQueueTransaction transaction)
241 if (QRef == QueueReference.DEFAULT)
242 throw new MonoMessagingException ("Path has not been specified");
244 if (msg.BodyStream == null)
245 throw new ArgumentException ("Message is not serialized properly");
247 RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
249 tx.RunSend (SendInContext, msg);
252 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
254 switch (transactionType) {
255 case MessageQueueTransactionType.Single:
256 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
260 } catch (Exception e) {
262 throw new MonoMessagingException(e.Message, e);
267 case MessageQueueTransactionType.None:
271 case MessageQueueTransactionType.Automatic:
272 throw new NotSupportedException("Automatic transaction types not supported");
276 private void SendInContext (ref string host, ref IConnection cn,
277 ref IModel model, IMessage msg, string txId)
281 else if (host != QRef.Host)
282 throw new MonoMessagingException ("Transactions can not span multiple hosts");
285 ConnectionFactory cf = new ConnectionFactory ();
286 cn = cf.CreateConnection (host);
290 model = cn.CreateModel ();
294 SetDeliveryInfo (msg, GetVersion (cn), txId);
298 private void Send (IModel model, IMessage msg)
300 string finalName = model.QueueDeclare (QRef.Queue, true);
301 IMessageBuilder mb = helper.WriteMessage (model, msg);
303 model.BasicPublish ("", finalName,
304 (IBasicProperties) mb.GetContentHeader(),
305 mb.GetContentBody ());
310 ConnectionFactory cf = new ConnectionFactory ();
312 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
313 using (IModel model = cn.CreateModel ()) {
314 model.QueuePurge (QRef.Queue, false);
319 public IMessage Peek ()
321 ConnectionFactory cf = new ConnectionFactory ();
323 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
324 using (IModel ch = cn.CreateModel ()) {
325 return Receive (ch, -1, false);
330 public IMessage Peek (TimeSpan timeout)
332 return Run (Peeker (timeout));
335 public IMessage PeekById (string id)
337 return Run (Peeker (ById (id)));
340 public IMessage PeekById (string id, TimeSpan timeout)
342 return Run (Peeker (timeout, ById (id)));
345 public IMessage PeekByCorrelationId (string id)
347 return Run (Peeker (ByCorrelationId (id)));
350 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
352 return Run (Peeker (timeout, ByCorrelationId (id)));
355 public IMessage Receive ()
357 return Run (Receiver ());
360 public IMessage Receive (TimeSpan timeout)
362 return Run (Receiver (timeout));
365 public IMessage Receive (TimeSpan timeout,
366 IMessageQueueTransaction transaction)
368 return Run (transaction, Receiver (timeout));
371 public IMessage Receive (TimeSpan timeout,
372 MessageQueueTransactionType transactionType)
374 return Run (transactionType, Receiver (timeout));
377 public IMessage Receive (IMessageQueueTransaction transaction)
379 return Run (transaction, Receiver());
382 public IMessage Receive (MessageQueueTransactionType transactionType)
384 return Run (transactionType, Receiver ());
387 public IMessage ReceiveById (string id)
389 return Run (Receiver (ById (id)));
392 public IMessage ReceiveById (string id, TimeSpan timeout)
394 return Run (Receiver (timeout, ById (id)));
397 public IMessage ReceiveById (string id,
398 IMessageQueueTransaction transaction)
400 return Run (transaction, Receiver (ById (id)));
403 public IMessage ReceiveById (string id,
404 MessageQueueTransactionType transactionType)
406 return Run (transactionType, Receiver (ById (id)));
409 public IMessage ReceiveById (string id, TimeSpan timeout,
410 IMessageQueueTransaction transaction)
412 return Run (transaction, Receiver (timeout, ById (id)));
415 public IMessage ReceiveById (string id, TimeSpan timeout,
416 MessageQueueTransactionType transactionType)
418 return Run (transactionType, Receiver (timeout, ById (id)));
421 public IMessage ReceiveByCorrelationId (string id)
423 return Run (Receiver (ByCorrelationId (id)));
426 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
428 return Run (Receiver (timeout, ByCorrelationId (id)));
431 public IMessage ReceiveByCorrelationId (string id,
432 IMessageQueueTransaction transaction)
434 return Run (transaction, Receiver (ByCorrelationId (id)));
437 public IMessage ReceiveByCorrelationId (string id,
438 MessageQueueTransactionType transactionType)
440 return Run (transactionType, Receiver (ByCorrelationId (id)));
443 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
444 IMessageQueueTransaction transaction)
446 return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
449 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
450 MessageQueueTransactionType transactionType)
452 return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
455 public IMessageEnumerator GetMessageEnumerator ()
457 return new RabbitMQMessageEnumerator (helper, QRef);
460 private IMessage Run (MessageQueueTransactionType transactionType,
461 TxReceiver.DoReceive r)
463 switch (transactionType) {
464 case MessageQueueTransactionType.Single:
465 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
466 bool success = false;
468 IMessage msg = Run (tx, r);
478 case MessageQueueTransactionType.None:
482 throw new NotSupportedException(transactionType + " not supported");
486 private IMessage Run (IMessageQueueTransaction transaction,
487 TxReceiver.DoReceive r)
489 TxReceiver txr = new TxReceiver (this, r);
490 RabbitMQMessageQueueTransaction tx =
491 (RabbitMQMessageQueueTransaction) transaction;
492 return tx.RunReceive (txr.ReceiveInContext);
495 private IMessage Run (TxReceiver.DoReceive r)
497 ConnectionFactory cf = new ConnectionFactory ();
498 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
499 using (IModel model = cn.CreateModel ()) {
500 return r (this, model);
505 private IMessage ReceiveInContext (ref string host, ref IConnection cn,
506 ref IModel model, string txId)
510 else if (host != QRef.Host)
511 throw new MonoMessagingException ("Transactions can not span multiple hosts");
514 ConnectionFactory cf = new ConnectionFactory ();
515 cn = cf.CreateConnection (host);
519 model = cn.CreateModel ();
523 return Receive (model, -1, true);
526 private class TxReceiver
528 private readonly DoReceive doReceive;
529 private readonly RabbitMQMessageQueue q;
531 public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
533 this.doReceive = doReceive;
536 public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
538 public IMessage ReceiveInContext (ref string host, ref IConnection cn,
539 ref IModel model, string txId)
543 else if (host != q.QRef.Host)
544 throw new MonoMessagingException ("Transactions can not span multiple hosts");
547 ConnectionFactory cf = new ConnectionFactory ();
548 cn = cf.CreateConnection (host);
552 model = cn.CreateModel ();
556 return doReceive (q, model);
560 private class DoReceiveWithTimeout
562 private readonly int timeout;
563 private readonly IsMatch matcher;
564 private readonly bool ack;
566 public DoReceiveWithTimeout (int timeout, IsMatch matcher)
567 : this (timeout, matcher, true)
571 public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
573 if (matcher != null && timeout == -1)
576 this.timeout = timeout;
577 this.matcher = matcher;
581 public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
584 return q.Receive (model, timeout, ack);
586 return q.Receive (model, timeout, ack, matcher);
590 private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
593 int to = MessageFactory.TimeSpanToInt32 (timeout);
594 return new DoReceiveWithTimeout (to, matcher).DoReceive;
597 private static TxReceiver.DoReceive Receiver (IsMatch matcher)
599 return new DoReceiveWithTimeout (-1, matcher).DoReceive;
602 private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
604 int to = MessageFactory.TimeSpanToInt32 (timeout);
605 return new DoReceiveWithTimeout (to, null).DoReceive;
608 private TxReceiver.DoReceive Receiver ()
610 return new DoReceiveWithTimeout (-1, null).DoReceive;
613 private TxReceiver.DoReceive Peeker ()
615 return new DoReceiveWithTimeout (-1, null).DoReceive;
618 private TxReceiver.DoReceive Peeker (TimeSpan timeout)
620 int to = MessageFactory.TimeSpanToInt32 (timeout);
621 return new DoReceiveWithTimeout (to, null, false).DoReceive;
624 private TxReceiver.DoReceive Peeker (IsMatch matcher)
626 return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
629 private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
631 int to = MessageFactory.TimeSpanToInt32 (timeout);
632 return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
635 delegate bool IsMatch (BasicDeliverEventArgs result);
637 private class IdMatcher
639 private readonly string id;
640 public IdMatcher (string id)
645 public bool MatchById (BasicDeliverEventArgs result)
647 return result.BasicProperties.MessageId == id;
651 private static IsMatch ById (string id)
653 return new IdMatcher (id).MatchById;
656 private class CorrelationIdMatcher
658 private readonly string correlationId;
659 public CorrelationIdMatcher (string correlationId)
661 this.correlationId = correlationId;
664 public bool MatchById (BasicDeliverEventArgs result)
666 return result.BasicProperties.CorrelationId == correlationId;
670 private static IsMatch ByCorrelationId (string correlationId)
672 return new CorrelationIdMatcher (correlationId).MatchById;
675 private IMessage Receive (IModel model, int timeout, bool doAck)
677 string finalName = model.QueueDeclare (QRef.Queue, false);
679 using (Subscription sub = new Subscription (model, finalName)) {
680 BasicDeliverEventArgs result;
681 if (sub.Next (timeout, out result)) {
682 IMessage m = helper.ReadMessage (QRef, result);
687 throw new MonoMessagingException ("No Message Available");
692 private IMessage Receive (IModel model, int timeout,
693 bool doAck, IsMatch matcher)
695 string finalName = model.QueueDeclare (QRef.Queue, false);
697 using (Subscription sub = new Subscription (model, finalName)) {
698 BasicDeliverEventArgs result;
699 while (sub.Next (timeout, out result)) {
701 if (matcher (result)) {
702 IMessage m = helper.ReadMessage (QRef, result);
709 throw new MessageUnavailableException ("Message not available");
713 private RabbitMQMessageQueueTransaction GetTx ()
715 return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();