2 // Mono.Messaging.RabbitMQ
5 // Michael Barker (mike@middlesoft.co.uk)
7 // (C) 2008 Michael Barker
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 using System.Collections;
33 using System.ComponentModel;
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
44 namespace Mono.Messaging.RabbitMQ {
47 /// RabbitMQ Implementation of a message queue. Currrently this implementation
48 /// attempts to be as stateless as possible. Connection the AMQP server
49 /// are only created as needed.
51 public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
53 private readonly RabbitMQMessagingProvider provider;
54 private readonly MessageFactory helper;
55 private readonly bool transactional;
56 private readonly TimeSpan noTime = new TimeSpan(0, 0, 0, 0, 500);
58 private bool authenticate = false;
59 private short basePriority = 0;
60 private Guid category = Guid.Empty;
61 private bool denySharedReceive = false;
62 private EncryptionRequired encryptionRequired;
63 private long maximumJournalSize = -1;
64 private long maximumQueueSize = -1;
65 private ISynchronizeInvoke synchronizingObject = null;
66 private bool useJournalQueue = false;
67 private QueueReference qRef = QueueReference.DEFAULT;
69 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
71 : this (provider, QueueReference.DEFAULT, transactional)
75 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
79 this.provider = provider;
80 this.helper = new MessageFactory (provider);
82 this.transactional = transactional;
85 protected override IMessageQueue Queue {
89 public bool Authenticate {
90 get { return authenticate; }
91 set { authenticate = value; }
94 public short BasePriority {
95 get { return basePriority; }
96 set { basePriority = value; }
100 get { throw new NotImplementedException (); }
103 public bool CanWrite {
104 get { throw new NotImplementedException (); }
107 public Guid Category {
108 get { return category; }
109 set { category = value; }
112 public DateTime CreateTime {
113 get { throw new NotImplementedException (); }
116 public bool DenySharedReceive {
117 get { return denySharedReceive; }
118 set { denySharedReceive = value; }
121 public EncryptionRequired EncryptionRequired {
122 get { return encryptionRequired; }
123 set { encryptionRequired = value; }
127 get { throw new NotImplementedException (); }
130 public DateTime LastModifyTime {
131 get { throw new NotImplementedException (); }
134 public long MaximumJournalSize {
135 get { return maximumJournalSize; }
136 set { maximumJournalSize = value; }
139 public long MaximumQueueSize {
140 get { return maximumQueueSize; }
141 set { maximumQueueSize = value; }
144 public IntPtr ReadHandle {
145 get { throw new NotImplementedException (); }
148 public ISynchronizeInvoke SynchronizingObject {
149 get { return synchronizingObject; }
150 set { synchronizingObject = value; }
153 public bool Transactional {
154 get { return transactional; }
157 public bool UseJournalQueue {
158 get { return useJournalQueue; }
159 set { useJournalQueue = value; }
162 public IntPtr WriteHandle {
163 get { throw new NotImplementedException (); }
166 public QueueReference QRef {
168 set { qRef = value; }
171 private void SetDeliveryInfo (IMessage msg, string transactionId)
173 msg.SetDeliveryInfo (Acknowledgment.None,
176 Guid.NewGuid ().ToString () + "\\0",
189 public static void Delete (QueueReference qRef)
191 RabbitMQMessagingProvider provider = (RabbitMQMessagingProvider) MessagingProviderLocator.GetProvider ();
192 using (IMessagingContext context = provider.CreateContext (qRef.Host)) {
193 context.Delete (qRef);
197 public void Send (IMessage msg)
199 if (QRef == QueueReference.DEFAULT)
200 throw new MonoMessagingException ("Path has not been specified");
202 if (msg.BodyStream == null)
203 throw new ArgumentException ("BodyStream is null, Message is not serialized properly");
205 using (IMessagingContext context = CurrentContext) {
207 SetDeliveryInfo (msg, null);
208 context.Send (QRef, msg);
209 } catch (BrokerUnreachableException e) {
210 throw new ConnectionException (QRef, e);
215 public void Send (IMessage msg, IMessageQueueTransaction transaction)
217 if (QRef == QueueReference.DEFAULT)
218 throw new MonoMessagingException ("Path has not been specified");
220 if (msg.BodyStream == null)
221 throw new ArgumentException ("Message is not serialized properly");
223 RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
225 SetDeliveryInfo (msg, tx.Id);
229 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
231 switch (transactionType) {
232 case MessageQueueTransactionType.Single:
233 using (IMessageQueueTransaction tx = NewTx ()) {
237 } catch (Exception e) {
239 throw new MonoMessagingException(e.Message, e);
244 case MessageQueueTransactionType.None:
248 case MessageQueueTransactionType.Automatic:
249 throw new NotSupportedException("Automatic transaction types not supported");
255 using (IMessagingContext context = CurrentContext) {
256 context.Purge (QRef);
260 public IMessage Peek ()
262 return DoReceive (TimeSpan.MaxValue, null, false);
265 public IMessage Peek (TimeSpan timeout)
267 return DoReceive (timeout, null, false);
270 public IMessage PeekById (string id)
272 return DoReceive (noTime, ById (id), false);
275 public IMessage PeekById (string id, TimeSpan timeout)
277 return DoReceive (timeout, ById (id), false);
280 public IMessage PeekByCorrelationId (string id)
282 return DoReceive (noTime, ByCorrelationId (id), false);
285 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
287 return DoReceive (timeout, ByCorrelationId (id), false);
290 public IMessage Receive ()
292 return DoReceive (TimeSpan.MaxValue, null, true);
295 public IMessage Receive (TimeSpan timeout)
297 return DoReceive (timeout, null, true);
300 public IMessage Receive (TimeSpan timeout,
301 IMessageQueueTransaction transaction)
303 return DoReceive (transaction, timeout, null, true);
306 public IMessage Receive (TimeSpan timeout,
307 MessageQueueTransactionType transactionType)
309 return DoReceive (transactionType, timeout, null, true);
312 public IMessage Receive (IMessageQueueTransaction transaction)
314 return DoReceive (transaction, TimeSpan.MaxValue, null, true);
317 public IMessage Receive (MessageQueueTransactionType transactionType)
319 return DoReceive (transactionType, TimeSpan.MaxValue, null, true);
322 public IMessage ReceiveById (string id)
324 return DoReceive (noTime, ById (id), true);
327 public IMessage ReceiveById (string id, TimeSpan timeout)
329 return DoReceive (timeout, ById (id), true);
332 public IMessage ReceiveById (string id,
333 IMessageQueueTransaction transaction)
335 return DoReceive (transaction, noTime, ById (id), true);
338 public IMessage ReceiveById (string id,
339 MessageQueueTransactionType transactionType)
341 return DoReceive (transactionType, noTime, ById (id), true);
344 public IMessage ReceiveById (string id, TimeSpan timeout,
345 IMessageQueueTransaction transaction)
347 return DoReceive (transaction, timeout, ById (id), true);
350 public IMessage ReceiveById (string id, TimeSpan timeout,
351 MessageQueueTransactionType transactionType)
353 return DoReceive (transactionType, timeout, ById (id), true);
356 public IMessage ReceiveByCorrelationId (string id)
358 return DoReceive (noTime, ByCorrelationId (id), true);
361 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
363 return DoReceive (timeout, ByCorrelationId (id), true);
366 public IMessage ReceiveByCorrelationId (string id,
367 IMessageQueueTransaction transaction)
369 return DoReceive (transaction, noTime, ByCorrelationId (id), true);
372 public IMessage ReceiveByCorrelationId (string id,
373 MessageQueueTransactionType transactionType)
375 return DoReceive (transactionType, noTime, ByCorrelationId (id), true);
378 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
379 IMessageQueueTransaction transaction)
381 return DoReceive (transaction, timeout, ByCorrelationId (id), true);
384 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
385 MessageQueueTransactionType transactionType)
387 return DoReceive (transactionType, timeout, ByCorrelationId (id), true);
390 public IMessageEnumerator GetMessageEnumerator ()
392 return new RabbitMQMessageEnumerator (helper, QRef);
395 private IMessage DoReceive (MessageQueueTransactionType transactionType,
396 TimeSpan timeout, IsMatch matcher, bool ack)
398 switch (transactionType) {
399 case MessageQueueTransactionType.Single:
400 using (RabbitMQMessageQueueTransaction tx = NewTx ()) {
401 bool success = false;
403 IMessage msg = DoReceive ((IMessagingContext) tx, timeout, matcher, ack);
413 case MessageQueueTransactionType.None:
414 return DoReceive (timeout, matcher, true);
417 throw new NotSupportedException (transactionType + " not supported");
421 private IMessage DoReceive (IMessageQueueTransaction transaction,
422 TimeSpan timeout, IsMatch matcher, bool ack)
424 RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
425 return DoReceive ((IMessagingContext) tx, timeout, matcher, ack);
428 private IMessage DoReceive (TimeSpan timeout, IsMatch matcher, bool ack)
430 using (IMessagingContext context = CurrentContext) {
431 return DoReceive (context, timeout, matcher, ack);
435 private IMessage DoReceive (IMessagingContext context, TimeSpan timeout,
436 IsMatch matcher, bool ack)
438 return context.Receive (QRef, timeout, matcher, ack);
441 private IMessagingContext CurrentContext {
443 return provider.CreateContext (qRef.Host);
447 private class IdMatcher
449 private readonly string id;
450 public IdMatcher (string id)
455 public bool MatchById (BasicDeliverEventArgs result)
457 return result.BasicProperties.MessageId == id;
461 private static IsMatch ById (string id)
463 return new IdMatcher (id).MatchById;
466 private class CorrelationIdMatcher
468 private readonly string correlationId;
469 public CorrelationIdMatcher (string correlationId)
471 this.correlationId = correlationId;
474 public bool MatchById (BasicDeliverEventArgs result)
476 return result.BasicProperties.CorrelationId == correlationId;
480 private static IsMatch ByCorrelationId (string correlationId)
482 return new CorrelationIdMatcher (correlationId).MatchById;
485 private RabbitMQMessageQueueTransaction NewTx ()
487 return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();