2 // Mono.Messaging.RabbitMQ
5 // Michael Barker (mike@middlesoft.co.uk)
7 // (C) 2008 Michael Barker
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 using System.Collections;
33 using System.ComponentModel;
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
44 namespace Mono.Messaging.RabbitMQ {
46 public class RabbitMQMessageQueue : IMessageQueue {
48 private bool authenticate = false;
49 private short basePriority = 0;
50 private Guid category = Guid.Empty;
51 private bool denySharedReceive = false;
52 private EncryptionRequired encryptionRequired;
53 private long maximumJournalSize = -1;
54 private long maximumQueueSize = -1;
55 private ISynchronizeInvoke synchronizingObject = null;
56 private bool useJournalQueue = false;
57 private QueueReference qRef = QueueReference.DEFAULT;
58 private readonly RabbitMQMessagingProvider provider;
59 private readonly MessageFactory helper;
60 private readonly string realm;
61 private readonly bool transactional;
63 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
65 : this (provider, QueueReference.DEFAULT, transactional)
69 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
72 : this (provider, "/data", qRef, transactional)
76 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
77 string realm, QueueReference qRef,
80 this.provider = provider;
81 this.helper = new MessageFactory (provider);
84 this.transactional = transactional;
87 public bool Authenticate {
88 get { return authenticate; }
89 set { authenticate = value; }
92 public short BasePriority {
93 get { return basePriority; }
94 set { basePriority = value; }
98 get { throw new NotImplementedException (); }
101 public bool CanWrite {
102 get { throw new NotImplementedException (); }
105 public Guid Category {
106 get { return category; }
107 set { category = value; }
110 public DateTime CreateTime {
111 get { throw new NotImplementedException (); }
114 public bool DenySharedReceive {
115 get { return denySharedReceive; }
116 set { denySharedReceive = value; }
119 public EncryptionRequired EncryptionRequired {
120 get { return encryptionRequired; }
121 set { encryptionRequired = value; }
125 get { throw new NotImplementedException (); }
128 public DateTime LastModifyTime {
129 get { throw new NotImplementedException (); }
132 public long MaximumJournalSize {
133 get { return maximumJournalSize; }
134 set { maximumJournalSize = value; }
137 public long MaximumQueueSize {
138 get { return maximumQueueSize; }
139 set { maximumQueueSize = value; }
142 public IntPtr ReadHandle {
143 get { throw new NotImplementedException (); }
146 public ISynchronizeInvoke SynchronizingObject {
147 get { return synchronizingObject; }
148 set { synchronizingObject = value; }
151 public bool Transactional {
152 get { return transactional; }
155 public bool UseJournalQueue {
156 get { return useJournalQueue; }
157 set { useJournalQueue = value; }
160 public IntPtr WriteHandle {
161 get { throw new NotImplementedException (); }
164 public QueueReference QRef {
166 set { qRef = value; }
169 private static long GetVersion (IConnection cn)
171 long version = cn.Protocol.MajorVersion;
172 version = version << 32;
173 version += cn.Protocol.MinorVersion;
177 private void SetDeliveryInfo (IMessage msg, long senderVersion,
178 string transactionId)
180 msg.SetDeliveryInfo (Acknowledgment.None,
183 Guid.NewGuid ().ToString () + "\\0",
194 // No-op (Queue are currently stateless)
197 public static void Delete (string realm, QueueReference qRef)
199 ConnectionFactory cf = new ConnectionFactory ();
201 using (IConnection cn = cf.CreateConnection (qRef.Host)) {
202 using (IModel model = cn.CreateModel ()) {
203 ushort ticket = model.AccessRequest (realm);
204 model.QueueDelete (ticket, qRef.Queue, false, false, false);
209 public void Send (IMessage msg)
211 if (QRef == QueueReference.DEFAULT)
212 throw new MonoMessagingException ("Path has not been specified");
214 if (msg.BodyStream == null)
215 throw new ArgumentException ("Message is not serialized properly");
217 ConnectionFactory cf = new ConnectionFactory ();
220 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
221 SetDeliveryInfo (msg, GetVersion (cn), null);
222 using (IModel ch = cn.CreateModel ()) {
226 } catch (BrokerUnreachableException e) {
227 throw new ConnectionException (QRef, e);
231 public void Send (IMessage msg, IMessageQueueTransaction transaction)
233 if (QRef == QueueReference.DEFAULT)
234 throw new MonoMessagingException ("Path has not been specified");
236 if (msg.BodyStream == null)
237 throw new ArgumentException ("Message is not serialized properly");
239 RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
241 tx.RunSend (SendInContext, msg);
244 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
246 switch (transactionType) {
247 case MessageQueueTransactionType.Single:
248 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
252 } catch (Exception e) {
254 throw new MonoMessagingException(e.Message, e);
259 case MessageQueueTransactionType.None:
263 case MessageQueueTransactionType.Automatic:
264 throw new NotSupportedException("Automatic transaction types not supported");
268 private void SendInContext (ref string host, ref IConnection cn,
269 ref IModel model, IMessage msg, string txId)
273 else if (host != QRef.Host)
274 throw new MonoMessagingException ("Transactions can not span multiple hosts");
277 ConnectionFactory cf = new ConnectionFactory ();
278 cn = cf.CreateConnection (host);
282 model = cn.CreateModel ();
286 SetDeliveryInfo (msg, GetVersion (cn), txId);
290 private void Send (IModel model, IMessage msg)
292 ushort ticket = model.AccessRequest ("/data");
293 string finalName = model.QueueDeclare (ticket, QRef.Queue, true);
294 IMessageBuilder mb = helper.WriteMessage (model, msg);
296 model.BasicPublish (ticket, "", finalName,
297 (IBasicProperties) mb.GetContentHeader(),
298 mb.GetContentBody ());
303 ConnectionFactory cf = new ConnectionFactory ();
305 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
306 using (IModel model = cn.CreateModel ()) {
307 ushort ticket = model.AccessRequest (realm);
308 model.QueuePurge (ticket, QRef.Queue, false);
313 public IMessage Peek ()
315 ConnectionFactory cf = new ConnectionFactory ();
317 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
318 using (IModel ch = cn.CreateModel ()) {
319 return Receive (ch, -1, false);
324 public IMessage Peek (TimeSpan timeout)
326 return Run (Peeker (timeout));
327 // ConnectionFactory cf = new ConnectionFactory ();
329 // using (IConnection cn = cf.CreateConnection (QRef.Host)) {
330 // using (IModel ch = cn.CreateModel ()) {
331 // if (timeout == TimeSpan.MaxValue) {
332 // return Receive (ch, -1, false);
334 // return Receive (ch, (int) timeout.TotalMilliseconds, false);
340 public IMessage PeekById (string id)
342 return Run (Peeker (ById (id)));
343 // ConnectionFactory cf = new ConnectionFactory ();
345 // using (IConnection cn = cf.CreateConnection (QRef.Host)) {
346 // using (IModel ch = cn.CreateModel ()) {
347 // return Receive (ch, 500, true, new IdMatcher (id).MatchById);
352 public IMessage PeekById (string id, TimeSpan timeout)
354 return Run (Peeker (timeout, ById (id)));
357 public IMessage PeekByCorrelationId (string id)
359 return Run (Peeker (ByCorrelationId (id)));
360 // ConnectionFactory cf = new ConnectionFactory ();
362 // using (IConnection cn = cf.CreateConnection (QRef.Host)) {
363 // using (IModel ch = cn.CreateModel ()) {
364 // return Receive (ch, 500, false,
365 // new CorrelationIdMatcher (id).MatchById);
370 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
372 return Run (Peeker (timeout, ByCorrelationId (id)));
375 public IMessage Receive ()
377 return Run (Receiver ());
380 public IMessage Receive (TimeSpan timeout)
382 return Run (Receiver (timeout));
385 public IMessage Receive (TimeSpan timeout,
386 IMessageQueueTransaction transaction)
388 return Run (transaction, Receiver (timeout));
391 public IMessage Receive (TimeSpan timeout,
392 MessageQueueTransactionType transactionType)
394 return Run (transactionType, Receiver (timeout));
397 public IMessage Receive (IMessageQueueTransaction transaction)
399 return Run (transaction, Receiver());
402 public IMessage Receive (MessageQueueTransactionType transactionType)
404 return Run (transactionType, Receiver ());
407 public IMessage ReceiveById (string id)
409 return Run (Receiver (ById (id)));
412 public IMessage ReceiveById (string id, TimeSpan timeout)
414 return Run (Receiver (timeout, ById (id)));
417 public IMessage ReceiveById (string id,
418 IMessageQueueTransaction transaction)
420 return Run (transaction, Receiver (ById (id)));
423 public IMessage ReceiveById (string id,
424 MessageQueueTransactionType transactionType)
426 return Run (transactionType, Receiver (ById (id)));
429 public IMessage ReceiveById (string id, TimeSpan timeout,
430 IMessageQueueTransaction transaction)
432 return Run (transaction, Receiver (timeout, ById (id)));
435 public IMessage ReceiveById (string id, TimeSpan timeout,
436 MessageQueueTransactionType transactionType)
438 return Run (transactionType, Receiver (timeout, ById (id)));
441 public IMessage ReceiveByCorrelationId (string id)
443 return Run (Receiver (ByCorrelationId (id)));
446 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
448 return Run (Receiver (timeout, ByCorrelationId (id)));
451 public IMessage ReceiveByCorrelationId (string id,
452 IMessageQueueTransaction transaction)
454 return Run (transaction, Receiver (ByCorrelationId (id)));
457 public IMessage ReceiveByCorrelationId (string id,
458 MessageQueueTransactionType transactionType)
460 return Run (transactionType, Receiver (ByCorrelationId (id)));
463 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
464 IMessageQueueTransaction transaction)
466 return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
469 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
470 MessageQueueTransactionType transactionType)
472 return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
475 public IMessageEnumerator GetMessageEnumerator ()
477 return new RabbitMQMessageEnumerator (helper, QRef);
480 private IMessage Run (MessageQueueTransactionType transactionType,
481 TxReceiver.DoReceive r)
483 switch (transactionType) {
484 case MessageQueueTransactionType.Single:
485 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
486 bool success = false;
488 IMessage msg = Run (tx, r);
498 case MessageQueueTransactionType.None:
502 throw new NotSupportedException(transactionType + " not supported");
506 private IMessage Run (IMessageQueueTransaction transaction,
507 TxReceiver.DoReceive r)
509 TxReceiver txr = new TxReceiver (this, r);
510 RabbitMQMessageQueueTransaction tx =
511 (RabbitMQMessageQueueTransaction) transaction;
512 return tx.RunReceive (txr.ReceiveInContext);
515 private IMessage Run (TxReceiver.DoReceive r)
517 ConnectionFactory cf = new ConnectionFactory ();
518 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
519 using (IModel model = cn.CreateModel ()) {
520 return r (this, model);
525 private IMessage ReceiveInContext (ref string host, ref IConnection cn,
526 ref IModel model, string txId)
530 else if (host != QRef.Host)
531 throw new MonoMessagingException ("Transactions can not span multiple hosts");
534 ConnectionFactory cf = new ConnectionFactory ();
535 cn = cf.CreateConnection (host);
539 model = cn.CreateModel ();
543 return Receive (model, -1, true);
546 private class TxReceiver
548 private readonly DoReceive doReceive;
549 private readonly RabbitMQMessageQueue q;
551 public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
553 this.doReceive = doReceive;
556 public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
558 public IMessage ReceiveInContext (ref string host, ref IConnection cn,
559 ref IModel model, string txId)
563 else if (host != q.QRef.Host)
564 throw new MonoMessagingException ("Transactions can not span multiple hosts");
567 ConnectionFactory cf = new ConnectionFactory ();
568 cn = cf.CreateConnection (host);
572 model = cn.CreateModel ();
576 return doReceive (q, model);
580 private class DoReceiveWithTimeout
582 private readonly int timeout;
583 private readonly IsMatch matcher;
584 private readonly bool ack;
586 public DoReceiveWithTimeout (int timeout, IsMatch matcher)
587 : this (timeout, matcher, true)
591 public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
593 if (matcher != null && timeout == -1)
596 this.timeout = timeout;
597 this.matcher = matcher;
601 public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
604 return q.Receive (model, timeout, ack);
606 return q.Receive (model, timeout, ack, matcher);
610 private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
613 int to = TimeSpanToInt32 (timeout);
614 return new DoReceiveWithTimeout (to, matcher).DoReceive;
617 private static TxReceiver.DoReceive Receiver (IsMatch matcher)
619 return new DoReceiveWithTimeout (-1, matcher).DoReceive;
622 private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
624 int to = TimeSpanToInt32 (timeout);
625 return new DoReceiveWithTimeout (to, null).DoReceive;
628 private TxReceiver.DoReceive Receiver ()
630 return new DoReceiveWithTimeout (-1, null).DoReceive;
633 private TxReceiver.DoReceive Peeker ()
635 return new DoReceiveWithTimeout (-1, null).DoReceive;
638 private TxReceiver.DoReceive Peeker (TimeSpan timeout)
640 int to = TimeSpanToInt32 (timeout);
641 return new DoReceiveWithTimeout (to, null, false).DoReceive;
644 private TxReceiver.DoReceive Peeker (IsMatch matcher)
646 return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
649 private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
651 int to = TimeSpanToInt32 (timeout);
652 return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
655 delegate bool IsMatch (BasicDeliverEventArgs result);
657 private class IdMatcher
659 private readonly string id;
660 public IdMatcher (string id)
665 public bool MatchById (BasicDeliverEventArgs result)
667 return result.BasicProperties.MessageId == id;
671 private static IsMatch ById (string id)
673 return new IdMatcher (id).MatchById;
676 private class CorrelationIdMatcher
678 private readonly string correlationId;
679 public CorrelationIdMatcher (string correlationId)
681 this.correlationId = correlationId;
684 public bool MatchById (BasicDeliverEventArgs result)
686 return result.BasicProperties.CorrelationId == correlationId;
690 private static IsMatch ByCorrelationId (string correlationId)
692 return new CorrelationIdMatcher (correlationId).MatchById;
695 private IMessage Receive (IModel model, int timeout, bool doAck)
697 Console.WriteLine ("{0}, {1}", timeout, doAck);
699 ushort ticket = model.AccessRequest (realm);
700 string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
702 using (Subscription sub = new Subscription (model, ticket, finalName)) {
703 BasicDeliverEventArgs result;
704 if (sub.Next (timeout, out result)) {
705 IMessage m = helper.ReadMessage (QRef, result);
710 throw new MonoMessagingException ("No Message Available");
715 private IMessage Receive (IModel model, int timeout,
716 bool doAck, IsMatch matcher)
718 Console.WriteLine ("{0}, {1}", timeout, doAck);
720 ushort ticket = model.AccessRequest (realm);
721 string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
723 using (Subscription sub = new Subscription (model, ticket, finalName)) {
724 BasicDeliverEventArgs result;
725 while (sub.Next (timeout, out result)) {
727 if (matcher (result)) {
728 IMessage m = helper.ReadMessage (QRef, result);
735 throw new MessageUnavailableException ("Message not available");
739 private RabbitMQMessageQueueTransaction GetTx ()
741 return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
744 private static int TimeSpanToInt32 (TimeSpan timespan)
746 if (timespan == TimeSpan.MaxValue)
749 return (int) timespan.TotalMilliseconds;