2 // Mono.Messaging.RabbitMQ
5 // Michael Barker (mike@middlesoft.co.uk)
7 // (C) 2008 Michael Barker
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 using System.Collections;
33 using System.ComponentModel;
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
44 namespace Mono.Messaging.RabbitMQ {
47 /// RabbitMQ Implementation of a message queue. Currrently this implementation
48 /// attempts to be as stateless as possible. Connection the AMQP server
49 /// are only created as needed.
51 public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
53 private bool authenticate = false;
54 private short basePriority = 0;
55 private Guid category = Guid.Empty;
56 private bool denySharedReceive = false;
57 private EncryptionRequired encryptionRequired;
58 private long maximumJournalSize = -1;
59 private long maximumQueueSize = -1;
60 private ISynchronizeInvoke synchronizingObject = null;
61 private bool useJournalQueue = false;
62 private QueueReference qRef = QueueReference.DEFAULT;
63 private readonly RabbitMQMessagingProvider provider;
64 private readonly MessageFactory helper;
65 private readonly bool transactional;
67 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
69 : this (provider, QueueReference.DEFAULT, transactional)
73 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
77 this.provider = provider;
78 this.helper = new MessageFactory (provider);
80 this.transactional = transactional;
83 protected override IMessageQueue Queue {
87 public bool Authenticate {
88 get { return authenticate; }
89 set { authenticate = value; }
92 public short BasePriority {
93 get { return basePriority; }
94 set { basePriority = value; }
98 get { throw new NotImplementedException (); }
101 public bool CanWrite {
102 get { throw new NotImplementedException (); }
105 public Guid Category {
106 get { return category; }
107 set { category = value; }
110 public DateTime CreateTime {
111 get { throw new NotImplementedException (); }
114 public bool DenySharedReceive {
115 get { return denySharedReceive; }
116 set { denySharedReceive = value; }
119 public EncryptionRequired EncryptionRequired {
120 get { return encryptionRequired; }
121 set { encryptionRequired = value; }
125 get { throw new NotImplementedException (); }
128 public DateTime LastModifyTime {
129 get { throw new NotImplementedException (); }
132 public long MaximumJournalSize {
133 get { return maximumJournalSize; }
134 set { maximumJournalSize = value; }
137 public long MaximumQueueSize {
138 get { return maximumQueueSize; }
139 set { maximumQueueSize = value; }
142 public IntPtr ReadHandle {
143 get { throw new NotImplementedException (); }
146 public ISynchronizeInvoke SynchronizingObject {
147 get { return synchronizingObject; }
148 set { synchronizingObject = value; }
151 public bool Transactional {
152 get { return transactional; }
155 public bool UseJournalQueue {
156 get { return useJournalQueue; }
157 set { useJournalQueue = value; }
160 public IntPtr WriteHandle {
161 get { throw new NotImplementedException (); }
164 public QueueReference QRef {
166 set { qRef = value; }
169 private static long GetVersion (IConnection cn)
171 long version = cn.Protocol.MajorVersion;
172 version = version << 32;
173 version += cn.Protocol.MinorVersion;
177 private void SetDeliveryInfo (IMessage msg, long senderVersion,
178 string transactionId)
180 msg.SetDeliveryInfo (Acknowledgment.None,
183 Guid.NewGuid ().ToString () + "\\0",
194 // No-op (Queue are currently stateless)
197 public static void Delete (QueueReference qRef)
199 using (IConnection cn = CreateConnection (qRef)) {
200 using (IModel model = cn.CreateModel ()) {
201 model.QueueDelete (qRef.Queue, false, false, false);
206 public void Send (IMessage msg)
208 if (QRef == QueueReference.DEFAULT)
209 throw new MonoMessagingException ("Path has not been specified");
211 if (msg.BodyStream == null)
212 throw new ArgumentException ("Message is not serialized properly");
215 using (IConnection cn = CreateConnection (QRef)) {
216 SetDeliveryInfo (msg, GetVersion (cn), null);
217 using (IModel ch = cn.CreateModel ()) {
221 } catch (BrokerUnreachableException e) {
222 throw new ConnectionException (QRef, e);
226 public void Send (IMessage msg, IMessageQueueTransaction transaction)
228 if (QRef == QueueReference.DEFAULT)
229 throw new MonoMessagingException ("Path has not been specified");
231 if (msg.BodyStream == null)
232 throw new ArgumentException ("Message is not serialized properly");
234 RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
236 tx.RunSend (SendInContext, msg);
239 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
241 switch (transactionType) {
242 case MessageQueueTransactionType.Single:
243 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
247 } catch (Exception e) {
249 throw new MonoMessagingException(e.Message, e);
254 case MessageQueueTransactionType.None:
258 case MessageQueueTransactionType.Automatic:
259 throw new NotSupportedException("Automatic transaction types not supported");
263 private void SendInContext (ref string host, ref IConnection cn,
264 ref IModel model, IMessage msg, string txId)
268 else if (host != QRef.Host)
269 throw new MonoMessagingException ("Transactions can not span multiple hosts");
272 cn = CreateConnection (QRef);
275 model = cn.CreateModel ();
279 SetDeliveryInfo (msg, GetVersion (cn), txId);
283 private void Send (IModel model, IMessage msg)
285 string finalName = model.QueueDeclare (QRef.Queue, true);
286 IMessageBuilder mb = helper.WriteMessage (model, msg);
288 model.BasicPublish ("", finalName,
289 (IBasicProperties) mb.GetContentHeader(),
290 mb.GetContentBody ());
295 using (IConnection cn = CreateConnection (QRef)) {
296 using (IModel model = cn.CreateModel ()) {
297 model.QueuePurge (QRef.Queue, false);
302 public IMessage Peek ()
304 using (IConnection cn = CreateConnection (QRef)) {
305 using (IModel ch = cn.CreateModel ()) {
306 return Receive (ch, -1, false);
311 public IMessage Peek (TimeSpan timeout)
313 return Run (Peeker (timeout));
316 public IMessage PeekById (string id)
318 return Run (Peeker (ById (id)));
321 public IMessage PeekById (string id, TimeSpan timeout)
323 return Run (Peeker (timeout, ById (id)));
326 public IMessage PeekByCorrelationId (string id)
328 return Run (Peeker (ByCorrelationId (id)));
331 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
333 return Run (Peeker (timeout, ByCorrelationId (id)));
336 public IMessage Receive ()
338 return Run (Receiver ());
341 public IMessage Receive (TimeSpan timeout)
343 return Run (Receiver (timeout));
346 public IMessage Receive (TimeSpan timeout,
347 IMessageQueueTransaction transaction)
349 return Run (transaction, Receiver (timeout));
352 public IMessage Receive (TimeSpan timeout,
353 MessageQueueTransactionType transactionType)
355 return Run (transactionType, Receiver (timeout));
358 public IMessage Receive (IMessageQueueTransaction transaction)
360 return Run (transaction, Receiver());
363 public IMessage Receive (MessageQueueTransactionType transactionType)
365 return Run (transactionType, Receiver ());
368 public IMessage ReceiveById (string id)
370 return Run (Receiver (ById (id)));
373 public IMessage ReceiveById (string id, TimeSpan timeout)
375 return Run (Receiver (timeout, ById (id)));
378 public IMessage ReceiveById (string id,
379 IMessageQueueTransaction transaction)
381 return Run (transaction, Receiver (ById (id)));
384 public IMessage ReceiveById (string id,
385 MessageQueueTransactionType transactionType)
387 return Run (transactionType, Receiver (ById (id)));
390 public IMessage ReceiveById (string id, TimeSpan timeout,
391 IMessageQueueTransaction transaction)
393 return Run (transaction, Receiver (timeout, ById (id)));
396 public IMessage ReceiveById (string id, TimeSpan timeout,
397 MessageQueueTransactionType transactionType)
399 return Run (transactionType, Receiver (timeout, ById (id)));
402 public IMessage ReceiveByCorrelationId (string id)
404 return Run (Receiver (ByCorrelationId (id)));
407 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
409 return Run (Receiver (timeout, ByCorrelationId (id)));
412 public IMessage ReceiveByCorrelationId (string id,
413 IMessageQueueTransaction transaction)
415 return Run (transaction, Receiver (ByCorrelationId (id)));
418 public IMessage ReceiveByCorrelationId (string id,
419 MessageQueueTransactionType transactionType)
421 return Run (transactionType, Receiver (ByCorrelationId (id)));
424 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
425 IMessageQueueTransaction transaction)
427 return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
430 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
431 MessageQueueTransactionType transactionType)
433 return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
436 public IMessageEnumerator GetMessageEnumerator ()
438 return new RabbitMQMessageEnumerator (helper, QRef);
441 private delegate IMessage RecieveDelegate (RabbitMQMessageQueue q,
444 private IMessage Run (MessageQueueTransactionType transactionType,
447 switch (transactionType) {
448 case MessageQueueTransactionType.Single:
449 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
450 bool success = false;
452 IMessage msg = Run (tx, r);
462 case MessageQueueTransactionType.None:
466 throw new NotSupportedException(transactionType + " not supported");
470 private IMessage Run (IMessageQueueTransaction transaction,
473 TxReceiver txr = new TxReceiver (this, r);
474 RabbitMQMessageQueueTransaction tx =
475 (RabbitMQMessageQueueTransaction) transaction;
476 return tx.RunReceive (txr.ReceiveInContext);
479 private IMessage Run (RecieveDelegate r)
481 using (IConnection cn = CreateConnection (QRef)) {
482 using (IModel model = cn.CreateModel ()) {
483 return r (this, model);
488 private class TxReceiver
490 private readonly RecieveDelegate doReceive;
491 private readonly RabbitMQMessageQueue q;
493 public TxReceiver(RabbitMQMessageQueue q, RecieveDelegate doReceive) {
495 this.doReceive = doReceive;
498 public IMessage ReceiveInContext (ref string host, ref IConnection cn,
499 ref IModel model, string txId)
503 else if (host != q.QRef.Host)
504 throw new MonoMessagingException ("Transactions can not span multiple hosts");
507 cn = CreateConnection (q.QRef);
510 model = cn.CreateModel ();
514 return doReceive (q, model);
518 private class RecieveDelegateFactory
520 private readonly int timeout;
521 private readonly IsMatch matcher;
522 private readonly bool ack;
524 public RecieveDelegateFactory (int timeout, IsMatch matcher)
525 : this (timeout, matcher, true)
529 public RecieveDelegateFactory (int timeout, IsMatch matcher, bool ack)
531 if (matcher != null && timeout == -1)
534 this.timeout = timeout;
535 this.matcher = matcher;
539 public IMessage RecieveDelegate (RabbitMQMessageQueue q, IModel model)
542 return q.Receive (model, timeout, ack);
544 return q.Receive (model, timeout, ack, matcher);
548 private static RecieveDelegate Receiver (TimeSpan timeout,
551 int to = MessageFactory.TimeSpanToInt32 (timeout);
552 return new RecieveDelegateFactory (to, matcher).RecieveDelegate;
555 private static RecieveDelegate Receiver (IsMatch matcher)
557 return new RecieveDelegateFactory (-1, matcher).RecieveDelegate;
560 private static RecieveDelegate Receiver (TimeSpan timeout)
562 int to = MessageFactory.TimeSpanToInt32 (timeout);
563 return new RecieveDelegateFactory (to, null).RecieveDelegate;
566 private RecieveDelegate Receiver ()
568 return new RecieveDelegateFactory (-1, null).RecieveDelegate;
571 private RecieveDelegate Peeker (TimeSpan timeout)
573 int to = MessageFactory.TimeSpanToInt32 (timeout);
574 return new RecieveDelegateFactory (to, null, false).RecieveDelegate;
577 private RecieveDelegate Peeker (IsMatch matcher)
579 return new RecieveDelegateFactory (-1, matcher, false).RecieveDelegate;
582 private RecieveDelegate Peeker (TimeSpan timeout, IsMatch matcher)
584 int to = MessageFactory.TimeSpanToInt32 (timeout);
585 return new RecieveDelegateFactory (to, matcher, false).RecieveDelegate;
588 delegate bool IsMatch (BasicDeliverEventArgs result);
590 private class IdMatcher
592 private readonly string id;
593 public IdMatcher (string id)
598 public bool MatchById (BasicDeliverEventArgs result)
600 return result.BasicProperties.MessageId == id;
604 private static IsMatch ById (string id)
606 return new IdMatcher (id).MatchById;
609 private class CorrelationIdMatcher
611 private readonly string correlationId;
612 public CorrelationIdMatcher (string correlationId)
614 this.correlationId = correlationId;
617 public bool MatchById (BasicDeliverEventArgs result)
619 return result.BasicProperties.CorrelationId == correlationId;
623 private static IsMatch ByCorrelationId (string correlationId)
625 return new CorrelationIdMatcher (correlationId).MatchById;
628 private IMessage Receive (IModel model, int timeout, bool doAck)
630 string finalName = model.QueueDeclare (QRef.Queue, false);
632 using (Subscription sub = new Subscription (model, finalName)) {
633 BasicDeliverEventArgs result;
634 if (sub.Next (timeout, out result)) {
635 IMessage m = helper.ReadMessage (QRef, result);
640 throw new MonoMessagingException ("No Message Available");
645 private IMessage Receive (IModel model, int timeout,
646 bool doAck, IsMatch matcher)
648 string finalName = model.QueueDeclare (QRef.Queue, false);
650 using (Subscription sub = new Subscription (model, finalName)) {
651 BasicDeliverEventArgs result;
652 while (sub.Next (timeout, out result)) {
654 if (matcher (result)) {
655 IMessage m = helper.ReadMessage (QRef, result);
662 throw new MessageUnavailableException ("Message not available");
666 private RabbitMQMessageQueueTransaction GetTx ()
668 return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
671 private static IConnection CreateConnection (QueueReference qRef)
673 ConnectionFactory cf = new ConnectionFactory ();
674 cf.Address = qRef.Host;
675 return cf.CreateConnection ();