Merge pull request #5714 from alexischr/update_bockbuild
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / RabbitMQMessageQueue.cs
1 //
2 // Mono.Messaging.RabbitMQ
3 //
4 // Authors:
5 //        Michael Barker (mike@middlesoft.co.uk)
6 //
7 // (C) 2008 Michael Barker
8 //
9
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
18 // 
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
21 // 
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30
31 using System;
32 using System.Collections;
33 using System.ComponentModel;
34 using System.IO;
35 using System.Text;
36
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
42 using RabbitMQ.Util;
43
44 namespace Mono.Messaging.RabbitMQ {
45
46         /// <summary>
47         /// RabbitMQ Implementation of a message queue.  Currrently this implementation
48         /// attempts to be as stateless as possible.  Connection the AMQP server
49         /// are only created as needed.
50         /// </summary>
51         public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
52                 
53                 private readonly RabbitMQMessagingProvider provider;
54                 private readonly MessageFactory helper;
55                 private readonly bool transactional;
56                 private readonly TimeSpan noTime = new TimeSpan(0, 0, 0, 0, 500);
57                 
58                 private bool authenticate = false;
59                 private short basePriority = 0;
60                 private Guid category = Guid.Empty;
61                 private bool denySharedReceive = false;
62                 private EncryptionRequired encryptionRequired;
63                 private long maximumJournalSize = -1;
64                 private long maximumQueueSize = -1;
65                 private ISynchronizeInvoke synchronizingObject = null;
66                 private bool useJournalQueue = false;
67                 private QueueReference qRef = QueueReference.DEFAULT;
68                 
69                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
70                                              bool transactional)
71                         : this (provider, QueueReference.DEFAULT, transactional)
72                 {
73                 }
74                 
75                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
76                                              QueueReference qRef, 
77                                              bool transactional)
78                 {
79                         this.provider = provider;
80                         this.helper = new MessageFactory (provider);
81                         this.qRef = qRef;
82                         this.transactional = transactional;
83                 }
84                 
85                 protected override IMessageQueue Queue {
86                         get { return this; }
87                 }
88
89                 public bool Authenticate {
90                         get { return authenticate; }
91                         set { authenticate = value; }
92                 }
93
94                 public short BasePriority {
95                         get { return basePriority; }
96                         set { basePriority = value; }
97                 }
98
99                 public bool CanRead {
100                         get { throw new NotImplementedException (); }
101                 }
102                 
103                 public bool CanWrite {
104                         get { throw new NotImplementedException (); }
105                 }
106                 
107                 public Guid Category {
108                         get { return category; }
109                         set { category = value; }
110                 }
111                 
112                 public DateTime CreateTime {
113                         get { throw new NotImplementedException (); }
114                 }
115                 
116                 public bool DenySharedReceive {
117                         get { return denySharedReceive; }
118                         set { denySharedReceive = value; }
119                 }
120                 
121                 public EncryptionRequired EncryptionRequired {
122                         get { return encryptionRequired; }
123                         set { encryptionRequired = value; }
124                 }
125                 
126                 public Guid Id {
127                         get { throw new NotImplementedException (); }
128                 }
129                 
130                 public DateTime LastModifyTime {
131                         get { throw new NotImplementedException (); }
132                 }
133                 
134                 public long MaximumJournalSize {
135                         get { return maximumJournalSize; }
136                         set { maximumJournalSize = value; }
137                 }
138                 
139                 public long MaximumQueueSize {
140                         get { return maximumQueueSize; }
141                         set { maximumQueueSize = value; }
142                 }
143                 
144                 public IntPtr ReadHandle {
145                         get { throw new NotImplementedException (); }
146                 }
147                 
148                 public ISynchronizeInvoke SynchronizingObject {
149                         get { return synchronizingObject; }
150                         set { synchronizingObject = value; }
151                 }
152                 
153                 public bool Transactional {
154                         get { return transactional; }
155                 }
156                 
157                 public bool UseJournalQueue {
158                         get { return useJournalQueue; }
159                         set { useJournalQueue = value; }
160                 }
161                 
162                 public IntPtr WriteHandle {
163                         get { throw new NotImplementedException (); }
164                 }
165                 
166                 public QueueReference QRef {
167                         get { return qRef; }
168                         set { qRef = value; }
169                 }
170                 
171                 private void SetDeliveryInfo (IMessage msg, string transactionId)
172                 {
173                         msg.SetDeliveryInfo (Acknowledgment.None,
174                                              DateTime.MinValue,
175                                              this,
176                                              Guid.NewGuid ().ToString () + "\\0",
177                                              MessageType.Normal,
178                                              new byte[0],
179                                              0,
180                                              DateTime.UtcNow,
181                                              null,
182                                              transactionId);
183                 }
184                 
185                 public void Close ()
186                 {
187                 }
188                 
189                 public static void Delete (QueueReference qRef)
190                 {
191                         RabbitMQMessagingProvider provider = (RabbitMQMessagingProvider) MessagingProviderLocator.GetProvider ();
192                         using (IMessagingContext context = provider.CreateContext (qRef.Host)) {
193                                 context.Delete (qRef);
194                         }
195                 }                       
196                 
197                 public void Send (IMessage msg)
198                 {
199                         if (QRef == QueueReference.DEFAULT)
200                                 throw new MonoMessagingException ("Path has not been specified");
201                         
202                         if (msg.BodyStream == null)
203                                 throw new ArgumentException ("BodyStream is null, Message is not serialized properly");
204                         
205                         using (IMessagingContext context = CurrentContext) {
206                                 try {
207                                         SetDeliveryInfo (msg, null);
208                                         context.Send (QRef, msg);
209                                 } catch (BrokerUnreachableException e) {
210                                         throw new ConnectionException (QRef, e);
211                                 }
212                         }
213                 }
214                 
215                 public void Send (IMessage msg, IMessageQueueTransaction transaction)
216                 {
217                         if (QRef == QueueReference.DEFAULT)
218                                 throw new MonoMessagingException ("Path has not been specified");
219                         
220                         if (msg.BodyStream == null)
221                                 throw new ArgumentException ("Message is not serialized properly");
222                         
223                         RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
224                         
225                         SetDeliveryInfo (msg, tx.Id);
226                         tx.Send (QRef, msg);
227                 }
228                 
229                 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
230                 {
231                         switch (transactionType) {
232                         case MessageQueueTransactionType.Single:
233                                 using (IMessageQueueTransaction tx = NewTx ()) {
234                                         try {
235                                                 Send (msg, tx);
236                                                 tx.Commit ();
237                                         } catch (Exception e) {
238                                                 tx.Abort ();
239                                                 throw new MonoMessagingException(e.Message, e);
240                                         }
241                                 }
242                                 break;
243
244                         case MessageQueueTransactionType.None:
245                                 Send (msg);
246                                 break;
247
248                         case MessageQueueTransactionType.Automatic:
249                                 throw new NotSupportedException("Automatic transaction types not supported");
250                         }
251                 }
252                 
253                 public void Purge ()
254                 {
255                         using (IMessagingContext context = CurrentContext) {
256                                 context.Purge (QRef);
257                         }
258                 }
259                 
260                 public IMessage Peek ()
261                 {
262                         return DoReceive (TimeSpan.MaxValue, null, false);
263                 }
264                 
265                 public IMessage Peek (TimeSpan timeout)
266                 {
267                         return DoReceive (timeout, null, false);
268                 }
269                 
270                 public IMessage PeekById (string id)
271                 {
272                         return DoReceive (noTime, ById (id), false);
273                 }
274
275                 public IMessage PeekById (string id, TimeSpan timeout)
276                 {
277                         return DoReceive (timeout, ById (id), false);
278                 }
279                 
280                 public IMessage PeekByCorrelationId (string id)
281                 {
282                         return DoReceive (noTime, ByCorrelationId (id), false);
283                 }
284
285                 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
286                 {
287                         return DoReceive (timeout, ByCorrelationId (id), false);
288                 }
289                 
290                 public IMessage Receive ()
291                 {
292                         return DoReceive (TimeSpan.MaxValue, null, true);
293                 }
294                 
295                 public IMessage Receive (TimeSpan timeout)
296                 {
297                         return DoReceive (timeout, null, true);
298                 }
299                 
300                 public IMessage Receive (TimeSpan timeout,
301                                          IMessageQueueTransaction transaction)
302                 {
303                         return DoReceive (transaction, timeout, null, true);
304                 }
305                 
306                 public IMessage Receive (TimeSpan timeout,
307                                          MessageQueueTransactionType transactionType)
308                 {
309                         return DoReceive (transactionType, timeout, null, true);
310                 }
311                 
312                 public IMessage Receive (IMessageQueueTransaction transaction)
313                 {
314                         return DoReceive (transaction, TimeSpan.MaxValue, null, true);
315                 }
316                 
317                 public IMessage Receive (MessageQueueTransactionType transactionType)
318                 {
319                         return DoReceive (transactionType, TimeSpan.MaxValue, null, true);
320                 }
321
322                 public IMessage ReceiveById (string id)
323                 {
324                         return DoReceive (noTime, ById (id), true);
325                 }
326
327                 public IMessage ReceiveById (string id, TimeSpan timeout)
328                 {
329                         return DoReceive (timeout, ById (id), true);
330                 }
331                 
332                 public IMessage ReceiveById (string id,
333                                              IMessageQueueTransaction transaction)
334                 {
335                         return DoReceive (transaction, noTime, ById (id), true);
336                 }
337                 
338                 public IMessage ReceiveById (string id,
339                                              MessageQueueTransactionType transactionType)
340                 {
341                         return DoReceive (transactionType, noTime, ById (id), true);
342                 }
343                 
344                 public IMessage ReceiveById (string id, TimeSpan timeout,
345                                              IMessageQueueTransaction transaction)
346                 {
347                         return DoReceive (transaction, timeout, ById (id), true);
348                 }
349                 
350                 public IMessage ReceiveById (string id, TimeSpan timeout,
351                                              MessageQueueTransactionType transactionType)
352                 {
353                         return DoReceive (transactionType, timeout, ById (id), true);
354                 }
355                 
356                 public IMessage ReceiveByCorrelationId (string id)
357                 {
358                         return DoReceive (noTime, ByCorrelationId (id), true);
359                 }
360                 
361                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
362                 {
363                         return DoReceive (timeout, ByCorrelationId (id), true);
364                 }
365                 
366                 public IMessage ReceiveByCorrelationId (string id,
367                                                         IMessageQueueTransaction transaction)
368                 {
369                         return DoReceive (transaction, noTime, ByCorrelationId (id), true);
370                 }
371                 
372                 public IMessage ReceiveByCorrelationId (string id,
373                                                         MessageQueueTransactionType transactionType)
374                 {
375                         return DoReceive (transactionType, noTime, ByCorrelationId (id), true);
376                 }
377                 
378                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
379                                                         IMessageQueueTransaction transaction)
380                 {
381                         return DoReceive (transaction, timeout, ByCorrelationId (id), true);
382                 }
383                 
384                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
385                                                         MessageQueueTransactionType transactionType)
386                 {
387                         return DoReceive (transactionType, timeout, ByCorrelationId (id), true);
388                 }
389                 
390                 public IMessageEnumerator GetMessageEnumerator ()
391                 {
392                         return new RabbitMQMessageEnumerator (helper, QRef);
393                 }
394                 
395                 private IMessage DoReceive (MessageQueueTransactionType transactionType,
396                                                                         TimeSpan timeout, IsMatch matcher, bool ack)
397                 {
398                         switch (transactionType) {
399                         case MessageQueueTransactionType.Single:
400                                 using (RabbitMQMessageQueueTransaction tx = NewTx ()) {
401                                         bool success = false;
402                                         try {
403                                                 IMessage msg = DoReceive ((IMessagingContext) tx, timeout, matcher, ack);
404                                                 tx.Commit ();
405                                                 success = true;
406                                                 return msg;
407                                         } finally {
408                                                 if (!success)
409                                                         tx.Abort ();
410                                         }
411                                 }
412
413                         case MessageQueueTransactionType.None:
414                                 return DoReceive (timeout, matcher, true);
415
416                         default:
417                                 throw new NotSupportedException (transactionType + " not supported");
418                         }
419                 }
420                 
421                 private IMessage DoReceive (IMessageQueueTransaction transaction,
422                                                                         TimeSpan timeout, IsMatch matcher, bool ack)
423                 {
424                         RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
425                         return DoReceive ((IMessagingContext) tx, timeout, matcher, ack);
426                 }
427                 
428                 private IMessage DoReceive (TimeSpan timeout, IsMatch matcher, bool ack)
429                 {
430                         using (IMessagingContext context = CurrentContext) {
431                                 return DoReceive (context, timeout, matcher, ack);
432                         }
433                 }
434                 
435                 private IMessage DoReceive (IMessagingContext context, TimeSpan timeout,
436                                                                         IsMatch matcher, bool ack)
437                 {
438                         return context.Receive (QRef, timeout, matcher, ack);
439                 }
440                 
441                 private IMessagingContext CurrentContext {
442                         get {
443                                 return provider.CreateContext (qRef.Host);
444                         }
445                 }
446                 
447                 private class IdMatcher
448                 {
449                         private readonly string id;
450                         public IdMatcher (string id)
451                         {
452                                 this.id = id;
453                         }
454                         
455                         public bool MatchById (BasicDeliverEventArgs result)
456                         {
457                                 return result.BasicProperties.MessageId == id;
458                         }
459                 }
460                 
461                 private static IsMatch ById (string id)
462                 {
463                         return new IdMatcher (id).MatchById;
464                 }
465                 
466                 private class CorrelationIdMatcher
467                 {
468                         private readonly string correlationId;
469                         public CorrelationIdMatcher (string correlationId)
470                         {
471                                 this.correlationId = correlationId;
472                         }
473                         
474                         public bool MatchById (BasicDeliverEventArgs result)
475                         {
476                                 return result.BasicProperties.CorrelationId == correlationId;
477                         }
478                 }
479                 
480                 private static IsMatch ByCorrelationId (string correlationId)
481                 {
482                         return new CorrelationIdMatcher (correlationId).MatchById;
483                 }
484                 
485                 private RabbitMQMessageQueueTransaction NewTx ()
486                 {
487                         return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
488                 }
489         }
490 }