2010-06-23: Michael Barker <mike@middlesoft.co.uk>
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / RabbitMQMessageQueue.cs
1 //
2 // Mono.Messaging.RabbitMQ
3 //
4 // Authors:
5 //        Michael Barker (mike@middlesoft.co.uk)
6 //
7 // (C) 2008 Michael Barker
8 //
9
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
18 // 
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
21 // 
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30
31 using System;
32 using System.Collections;
33 using System.ComponentModel;
34 using System.IO;
35 using System.Text;
36
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
42 using RabbitMQ.Util;
43
44 namespace Mono.Messaging.RabbitMQ {
45
46         /// <summary>
47         /// RabbitMQ Implementation of a message queue.  Currrently this implementation
48         /// attempts to be as stateless as possible.  Connection the AMQP server
49         /// are only created as needed.
50         /// </summary>
51         public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
52                 
53                 private bool authenticate = false;
54                 private short basePriority = 0;
55                 private Guid category = Guid.Empty;
56                 private bool denySharedReceive = false;
57                 private EncryptionRequired encryptionRequired;
58                 private long maximumJournalSize = -1;
59                 private long maximumQueueSize = -1;
60                 private ISynchronizeInvoke synchronizingObject = null;
61                 private bool useJournalQueue = false;
62                 private QueueReference qRef = QueueReference.DEFAULT;
63                 private readonly RabbitMQMessagingProvider provider;
64                 private readonly MessageFactory helper;
65                 private readonly bool transactional;
66                 
67                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
68                                              bool transactional)
69                         : this (provider, QueueReference.DEFAULT, transactional)
70                 {
71                 }
72                 
73                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
74                                              QueueReference qRef, 
75                                              bool transactional)
76                 {
77                         this.provider = provider;
78                         this.helper = new MessageFactory (provider);
79                         this.qRef = qRef;
80                         this.transactional = transactional;
81                 }
82                 
83                 protected override IMessageQueue Queue {
84                         get { return this; }
85                 }
86
87                 public bool Authenticate {
88                         get { return authenticate; }
89                         set { authenticate = value; }
90                 }
91
92                 public short BasePriority {
93                         get { return basePriority; }
94                         set { basePriority = value; }
95                 }
96
97                 public bool CanRead {
98                         get { throw new NotImplementedException (); }
99                 }
100                 
101                 public bool CanWrite {
102                         get { throw new NotImplementedException (); }
103                 }
104                 
105                 public Guid Category {
106                         get { return category; }
107                         set { category = value; }
108                 }
109                 
110                 public DateTime CreateTime {
111                         get { throw new NotImplementedException (); }
112                 }
113                 
114                 public bool DenySharedReceive {
115                         get { return denySharedReceive; }
116                         set { denySharedReceive = value; }
117                 }
118                 
119                 public EncryptionRequired EncryptionRequired {
120                         get { return encryptionRequired; }
121                         set { encryptionRequired = value; }
122                 }
123                 
124                 public Guid Id {
125                         get { throw new NotImplementedException (); }
126                 }
127                 
128                 public DateTime LastModifyTime {
129                         get { throw new NotImplementedException (); }
130                 }
131                 
132                 public long MaximumJournalSize {
133                         get { return maximumJournalSize; }
134                         set { maximumJournalSize = value; }
135                 }
136                 
137                 public long MaximumQueueSize {
138                         get { return maximumQueueSize; }
139                         set { maximumQueueSize = value; }
140                 }
141                 
142                 public IntPtr ReadHandle {
143                         get { throw new NotImplementedException (); }
144                 }
145                 
146                 public ISynchronizeInvoke SynchronizingObject {
147                         get { return synchronizingObject; }
148                         set { synchronizingObject = value; }
149                 }
150                 
151                 public bool Transactional {
152                         get { return transactional; }
153                 }
154                 
155                 public bool UseJournalQueue {
156                         get { return useJournalQueue; }
157                         set { useJournalQueue = value; }
158                 }
159                 
160                 public IntPtr WriteHandle {
161                         get { throw new NotImplementedException (); }
162                 }
163                 
164                 public QueueReference QRef {
165                         get { return qRef; }
166                         set { qRef = value; }
167                 }
168                 
169                 private static long GetVersion (IConnection cn)
170                 {
171                         long version = cn.Protocol.MajorVersion;
172                         version = version << 32;
173                         version += cn.Protocol.MinorVersion;
174                         return version;
175                 }
176                 
177                 private void SetDeliveryInfo (IMessage msg, long senderVersion,
178                                               string transactionId)
179                 {
180                         msg.SetDeliveryInfo (Acknowledgment.None,
181                                              DateTime.MinValue,
182                                              this,
183                                              Guid.NewGuid ().ToString () + "\\0",
184                                              MessageType.Normal,
185                                              new byte[0],
186                                              senderVersion,
187                                              DateTime.UtcNow,
188                                              null,
189                                              transactionId);
190                 }
191                 
192                 public void Close ()
193                 {
194                         // No-op (Queue are currently stateless)
195                 }
196                 
197                 public static void Delete (QueueReference qRef)
198                 {
199                         using (IConnection cn = CreateConnection (qRef)) {
200                                 using (IModel model = cn.CreateModel ()) {
201                                         model.QueueDelete (qRef.Queue, false, false, false);
202                                 }
203                         }
204                 }                       
205                 
206                 public void Send (IMessage msg)
207                 {
208                         if (QRef == QueueReference.DEFAULT)
209                                 throw new MonoMessagingException ("Path has not been specified");
210                         
211                         if (msg.BodyStream == null)
212                                 throw new ArgumentException ("Message is not serialized properly");
213                 
214                         try {
215                                 using (IConnection cn = CreateConnection (QRef)) {
216                                         SetDeliveryInfo (msg, GetVersion (cn), null);
217                                         using (IModel ch = cn.CreateModel ()) {
218                                                 Send (ch, msg);
219                                         }
220                                 }
221                         } catch (BrokerUnreachableException e) {
222                                 throw new ConnectionException (QRef, e);
223                         }
224                 }
225                 
226                 public void Send (IMessage msg, IMessageQueueTransaction transaction)
227                 {
228                         if (QRef == QueueReference.DEFAULT)
229                                 throw new MonoMessagingException ("Path has not been specified");
230                         
231                         if (msg.BodyStream == null)
232                                 throw new ArgumentException ("Message is not serialized properly");
233                         
234                         RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
235                         
236                         tx.RunSend (SendInContext, msg);
237                 }
238                 
239                 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
240                 {
241                         switch (transactionType) {
242                         case MessageQueueTransactionType.Single:
243                                 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
244                                         try {
245                                                 Send (msg, tx);
246                                                 tx.Commit ();
247                                         } catch (Exception e) {
248                                                 tx.Abort ();
249                                                 throw new MonoMessagingException(e.Message, e);
250                                         }
251                                 }
252                                 break;
253
254                         case MessageQueueTransactionType.None:
255                                 Send (msg);
256                                 break;
257
258                         case MessageQueueTransactionType.Automatic:
259                                 throw new NotSupportedException("Automatic transaction types not supported");
260                         }
261                 }
262                 
263                 private void SendInContext (ref string host, ref IConnection cn, 
264                                             ref IModel model, IMessage msg, string txId)
265                 {
266                         if (host == null)
267                                 host = QRef.Host;
268                         else if (host != QRef.Host)
269                                 throw new MonoMessagingException ("Transactions can not span multiple hosts");
270                         
271                         if (cn == null)
272                                 cn = CreateConnection (QRef);
273                         
274                         if (model == null) {
275                                 model = cn.CreateModel ();
276                                 model.TxSelect ();
277                         }
278                         
279                         SetDeliveryInfo (msg, GetVersion (cn), txId);
280                         Send (model, msg);
281                 }
282                 
283                 private void Send (IModel model, IMessage msg)
284                 {
285                         string finalName = model.QueueDeclare (QRef.Queue, true);
286                         IMessageBuilder mb = helper.WriteMessage (model, msg);
287
288                         model.BasicPublish ("", finalName,
289                                             (IBasicProperties) mb.GetContentHeader(),
290                                             mb.GetContentBody ());
291                 }
292                 
293                 public void Purge ()
294                 {
295                         using (IConnection cn = CreateConnection (QRef)) {
296                                 using (IModel model = cn.CreateModel ()) {
297                                         model.QueuePurge (QRef.Queue, false);
298                                 }
299                         }
300                 }
301                 
302                 public IMessage Peek ()
303                 {
304                         using (IConnection cn = CreateConnection (QRef)) {
305                                 using (IModel ch = cn.CreateModel ()) {
306                                         return Receive (ch, -1, false);
307                                 }
308                         }
309                 }
310                 
311                 public IMessage Peek (TimeSpan timeout)
312                 {
313                         return Run (Peeker (timeout));
314                 }
315                 
316                 public IMessage PeekById (string id)
317                 {
318                         return Run (Peeker (ById (id)));
319                 }
320
321                 public IMessage PeekById (string id, TimeSpan timeout)
322                 {
323                         return Run (Peeker (timeout, ById (id)));
324                 }
325                 
326                 public IMessage PeekByCorrelationId (string id)
327                 {
328                         return Run (Peeker (ByCorrelationId (id)));
329                 }
330
331                 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
332                 {
333                         return Run (Peeker (timeout, ByCorrelationId (id)));
334                 }
335                 
336                 public IMessage Receive ()
337                 {
338                         return Run (Receiver ());
339                 }
340                 
341                 public IMessage Receive (TimeSpan timeout)
342                 {
343                         return Run (Receiver (timeout));
344                 }
345                 
346                 public IMessage Receive (TimeSpan timeout,
347                                          IMessageQueueTransaction transaction)
348                 {
349                         return Run (transaction, Receiver (timeout));
350                 }
351                 
352                 public IMessage Receive (TimeSpan timeout,
353                                          MessageQueueTransactionType transactionType)
354                 {
355                         return Run (transactionType, Receiver (timeout));
356                 }
357                 
358                 public IMessage Receive (IMessageQueueTransaction transaction)
359                 {
360                         return Run (transaction, Receiver());
361                 }
362                 
363                 public IMessage Receive (MessageQueueTransactionType transactionType)
364                 {
365                         return Run (transactionType, Receiver ());
366                 }               
367
368                 public IMessage ReceiveById (string id)
369                 {
370                         return Run (Receiver (ById (id)));
371                 }
372
373                 public IMessage ReceiveById (string id, TimeSpan timeout)
374                 {
375                         return Run (Receiver (timeout, ById (id)));
376                 }
377                 
378                 public IMessage ReceiveById (string id,
379                                              IMessageQueueTransaction transaction)
380                 {
381                         return Run (transaction, Receiver (ById (id)));
382                 }
383                 
384                 public IMessage ReceiveById (string id,
385                                              MessageQueueTransactionType transactionType)
386                 {
387                         return Run (transactionType, Receiver (ById (id)));
388                 }
389                 
390                 public IMessage ReceiveById (string id, TimeSpan timeout,
391                                              IMessageQueueTransaction transaction)
392                 {
393                         return Run (transaction, Receiver (timeout, ById (id)));
394                 }
395                 
396                 public IMessage ReceiveById (string id, TimeSpan timeout,
397                                              MessageQueueTransactionType transactionType)
398                 {
399                         return Run (transactionType, Receiver (timeout, ById (id)));
400                 }
401                 
402                 public IMessage ReceiveByCorrelationId (string id)
403                 {
404                         return Run (Receiver (ByCorrelationId (id)));
405                 }
406                 
407                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
408                 {
409                         return Run (Receiver (timeout, ByCorrelationId (id)));
410                 }
411                 
412                 public IMessage ReceiveByCorrelationId (string id,
413                                                         IMessageQueueTransaction transaction)
414                 {
415                         return Run (transaction, Receiver (ByCorrelationId (id)));
416                 }
417                 
418                 public IMessage ReceiveByCorrelationId (string id,
419                                                         MessageQueueTransactionType transactionType)
420                 {
421                         return Run (transactionType, Receiver (ByCorrelationId (id)));
422                 }
423                 
424                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
425                                                         IMessageQueueTransaction transaction)
426                 {
427                         return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
428                 }
429                 
430                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
431                                                         MessageQueueTransactionType transactionType)
432                 {
433                         return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
434                 }
435                 
436                 public IMessageEnumerator GetMessageEnumerator ()
437                 {
438                         return new RabbitMQMessageEnumerator (helper, QRef);
439                 }
440                 
441                 private delegate IMessage RecieveDelegate (RabbitMQMessageQueue q,
442                                                            IModel model);
443                 
444                 private IMessage Run (MessageQueueTransactionType transactionType,
445                                       RecieveDelegate r)
446                 {
447                         switch (transactionType) {
448                         case MessageQueueTransactionType.Single:
449                                 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
450                                         bool success = false;
451                                         try {
452                                                 IMessage msg = Run (tx, r);
453                                                 tx.Commit ();
454                                                 success = true;
455                                                 return msg;
456                                         } finally {
457                                                 if (!success)
458                                                         tx.Abort ();
459                                         }
460                                 }
461
462                         case MessageQueueTransactionType.None:
463                                 return Run (r);
464
465                         default:
466                                 throw new NotSupportedException(transactionType + " not supported");
467                         }
468                 }               
469
470                 private IMessage Run (IMessageQueueTransaction transaction, 
471                                       RecieveDelegate r)
472                 {
473                         TxReceiver txr = new TxReceiver (this, r);
474                         RabbitMQMessageQueueTransaction tx = 
475                                 (RabbitMQMessageQueueTransaction) transaction;
476                         return tx.RunReceive (txr.ReceiveInContext);                    
477                 }
478                 
479                 private IMessage Run (RecieveDelegate r)
480                 {
481                         using (IConnection cn = CreateConnection (QRef)) {
482                                 using (IModel model = cn.CreateModel ()) {
483                                         return r (this, model);
484                                 }
485                         }
486                 }
487                 
488                 private class TxReceiver
489                 {
490                         private readonly RecieveDelegate doReceive;
491                         private readonly RabbitMQMessageQueue q;
492                         
493                         public TxReceiver(RabbitMQMessageQueue q, RecieveDelegate doReceive) {
494                                 this.q = q;
495                                 this.doReceive = doReceive;
496                         }
497                                                 
498                         public IMessage ReceiveInContext (ref string host, ref IConnection cn, 
499                                                           ref IModel model, string txId)
500                         {
501                                 if (host == null)
502                                         host = q.QRef.Host;
503                                 else if (host != q.QRef.Host)
504                                         throw new MonoMessagingException ("Transactions can not span multiple hosts");
505                                 
506                                 if (cn == null)
507                                         cn = CreateConnection (q.QRef);
508                                 
509                                 if (model == null) {
510                                         model = cn.CreateModel ();
511                                         model.TxSelect ();
512                                 }
513                                 
514                                 return doReceive (q, model);
515                         }
516                 }
517                 
518                 private class RecieveDelegateFactory
519                 {
520                         private readonly int timeout;
521                         private readonly IsMatch matcher;
522                         private readonly bool ack;
523                         
524                         public RecieveDelegateFactory (int timeout, IsMatch matcher)
525                                 : this (timeout, matcher, true)
526                         {
527                         }
528                         
529                         public RecieveDelegateFactory (int timeout, IsMatch matcher, bool ack)
530                         {
531                                 if (matcher != null && timeout == -1)
532                                         this.timeout = 500;
533                                 else 
534                                         this.timeout = timeout;
535                                 this.matcher = matcher;
536                                 this.ack = ack;
537                         }
538                         
539                         public IMessage RecieveDelegate (RabbitMQMessageQueue q, IModel model)
540                         {
541                                 if (matcher == null)
542                                         return q.Receive (model, timeout, ack);
543                                 else
544                                         return q.Receive (model, timeout, ack, matcher);
545                         }
546                 }
547                 
548                 private static RecieveDelegate Receiver (TimeSpan timeout,
549                                                          IsMatch matcher)
550                 {
551                         int to = MessageFactory.TimeSpanToInt32 (timeout);
552                         return new RecieveDelegateFactory (to, matcher).RecieveDelegate;
553                 }
554                 
555                 private static RecieveDelegate Receiver (IsMatch matcher)
556                 {
557                         return new RecieveDelegateFactory (-1, matcher).RecieveDelegate;
558                 }
559                 
560                 private static RecieveDelegate Receiver (TimeSpan timeout)
561                 {
562                         int to = MessageFactory.TimeSpanToInt32 (timeout);
563                         return new RecieveDelegateFactory (to, null).RecieveDelegate;
564                 }
565
566                 private RecieveDelegate Receiver ()
567                 {
568                         return new RecieveDelegateFactory (-1, null).RecieveDelegate;
569                 }               
570                 
571                 private RecieveDelegate Peeker (TimeSpan timeout)
572                 {
573                         int to = MessageFactory.TimeSpanToInt32 (timeout);
574                         return new RecieveDelegateFactory (to, null, false).RecieveDelegate;
575                 }               
576                 
577                 private RecieveDelegate Peeker (IsMatch matcher)
578                 {
579                         return new RecieveDelegateFactory (-1, matcher, false).RecieveDelegate;
580                 }               
581                 
582                 private RecieveDelegate Peeker (TimeSpan timeout, IsMatch matcher)
583                 {
584                         int to = MessageFactory.TimeSpanToInt32 (timeout);
585                         return new RecieveDelegateFactory (to, matcher, false).RecieveDelegate;
586                 }
587                 
588                 delegate bool IsMatch (BasicDeliverEventArgs result);           
589                 
590                 private class IdMatcher
591                 {
592                         private readonly string id;
593                         public IdMatcher (string id)
594                         {
595                                 this.id = id;
596                         }
597                         
598                         public bool MatchById (BasicDeliverEventArgs result)
599                         {
600                                 return result.BasicProperties.MessageId == id;
601                         }
602                 }
603                 
604                 private static IsMatch ById (string id)
605                 {
606                         return new IdMatcher (id).MatchById;
607                 }
608                 
609                 private class CorrelationIdMatcher
610                 {
611                         private readonly string correlationId;
612                         public CorrelationIdMatcher (string correlationId)
613                         {
614                                 this.correlationId = correlationId;
615                         }
616                         
617                         public bool MatchById (BasicDeliverEventArgs result)
618                         {
619                                 return result.BasicProperties.CorrelationId == correlationId;
620                         }
621                 }
622                 
623                 private static IsMatch ByCorrelationId (string correlationId)
624                 {
625                         return new CorrelationIdMatcher (correlationId).MatchById;
626                 }
627                 
628                 private IMessage Receive (IModel model, int timeout, bool doAck)
629                 {
630                         string finalName = model.QueueDeclare (QRef.Queue, false);
631                         
632                         using (Subscription sub = new Subscription (model, finalName)) {
633                                 BasicDeliverEventArgs result;
634                                 if (sub.Next (timeout, out result)) {
635                                         IMessage m = helper.ReadMessage (QRef, result);
636                                         if (doAck)
637                                                 sub.Ack (result);
638                                         return m;
639                                 } else {
640                                         throw new MonoMessagingException ("No Message Available");
641                                 }
642                         }
643                 }
644                                 
645                 private IMessage Receive (IModel model, int timeout, 
646                                           bool doAck, IsMatch matcher)
647                 {
648                         string finalName = model.QueueDeclare (QRef.Queue, false);
649                         
650                         using (Subscription sub = new Subscription (model, finalName)) {
651                                 BasicDeliverEventArgs result;
652                                 while (sub.Next (timeout, out result)) {
653                                         
654                                         if (matcher (result)) {
655                                                 IMessage m = helper.ReadMessage (QRef, result);
656                                                 if (doAck)
657                                                         sub.Ack (result);
658                                                 return m;
659                                         }
660                                 }
661                                 
662                                 throw new MessageUnavailableException ("Message not available");
663                         }
664                 }
665                 
666                 private RabbitMQMessageQueueTransaction GetTx ()
667                 {
668                         return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
669                 }
670
671                 private static IConnection CreateConnection (QueueReference qRef)
672                 {
673                         ConnectionFactory cf = new ConnectionFactory ();
674                         cf.Address = qRef.Host;
675                         return cf.CreateConnection ();
676                 }               
677         }
678 }