2009-05-19 Michael Barker <mike@middlesoft.co.uk>
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / RabbitMQMessageQueue.cs
1 //
2 // Mono.Messaging.RabbitMQ
3 //
4 // Authors:
5 //        Michael Barker (mike@middlesoft.co.uk)
6 //
7 // (C) 2008 Michael Barker
8 //
9
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
18 // 
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
21 // 
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30
31 using System;
32 using System.Collections;
33 using System.ComponentModel;
34 using System.IO;
35 using System.Text;
36
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
42 using RabbitMQ.Util;
43
44 namespace Mono.Messaging.RabbitMQ {
45
46         /// <summary>
47         /// RabbitMQ Implementation of a message queue.  Currrently this implementation
48         /// attempts to be as stateless as possible.  Connection the AMQP server
49         /// are only created as needed.
50         /// </summary>
51         public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
52                 
53                 private bool authenticate = false;
54                 private short basePriority = 0;
55                 private Guid category = Guid.Empty;
56                 private bool denySharedReceive = false;
57                 private EncryptionRequired encryptionRequired;
58                 private long maximumJournalSize = -1;
59                 private long maximumQueueSize = -1;
60                 private ISynchronizeInvoke synchronizingObject = null;
61                 private bool useJournalQueue = false;
62                 private QueueReference qRef = QueueReference.DEFAULT;
63                 private readonly RabbitMQMessagingProvider provider;
64                 private readonly MessageFactory helper;
65                 private readonly string realm;
66                 private readonly bool transactional;
67                 
68                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
69                                              bool transactional)
70                         : this (provider, QueueReference.DEFAULT, transactional)
71                 {
72                 }
73                 
74                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
75                                              QueueReference qRef, 
76                                              bool transactional)
77                         : this (provider, "/data", qRef, transactional)
78                 {
79                 }
80                 
81                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
82                                              string realm, QueueReference qRef,
83                                              bool transactional)
84                 {
85                         this.provider = provider;
86                         this.helper = new MessageFactory (provider);
87                         this.realm = realm;
88                         this.qRef = qRef;
89                         this.transactional = transactional;
90                 }
91                 
92                 protected override IMessageQueue Queue {
93                         get { return this; }
94                 }
95
96                 public bool Authenticate {
97                         get { return authenticate; }
98                         set { authenticate = value; }
99                 }
100
101                 public short BasePriority {
102                         get { return basePriority; }
103                         set { basePriority = value; }
104                 }
105
106                 public bool CanRead {
107                         get { throw new NotImplementedException (); }
108                 }
109                 
110                 public bool CanWrite {
111                         get { throw new NotImplementedException (); }
112                 }
113                 
114                 public Guid Category {
115                         get { return category; }
116                         set { category = value; }
117                 }
118                 
119                 public DateTime CreateTime {
120                         get { throw new NotImplementedException (); }
121                 }
122                 
123                 public bool DenySharedReceive {
124                         get { return denySharedReceive; }
125                         set { denySharedReceive = value; }
126                 }
127                 
128                 public EncryptionRequired EncryptionRequired {
129                         get { return encryptionRequired; }
130                         set { encryptionRequired = value; }
131                 }
132                 
133                 public Guid Id {
134                         get { throw new NotImplementedException (); }
135                 }
136                 
137                 public DateTime LastModifyTime {
138                         get { throw new NotImplementedException (); }
139                 }
140                 
141                 public long MaximumJournalSize {
142                         get { return maximumJournalSize; }
143                         set { maximumJournalSize = value; }
144                 }
145                 
146                 public long MaximumQueueSize {
147                         get { return maximumQueueSize; }
148                         set { maximumQueueSize = value; }
149                 }
150                 
151                 public IntPtr ReadHandle {
152                         get { throw new NotImplementedException (); }
153                 }
154                 
155                 public ISynchronizeInvoke SynchronizingObject {
156                         get { return synchronizingObject; }
157                         set { synchronizingObject = value; }
158                 }
159                 
160                 public bool Transactional {
161                         get { return transactional; }
162                 }
163                 
164                 public bool UseJournalQueue {
165                         get { return useJournalQueue; }
166                         set { useJournalQueue = value; }
167                 }
168                 
169                 public IntPtr WriteHandle {
170                         get { throw new NotImplementedException (); }
171                 }
172                 
173                 public QueueReference QRef {
174                         get { return qRef; }
175                         set { qRef = value; }
176                 }
177                 
178                 private static long GetVersion (IConnection cn)
179                 {
180                         long version = cn.Protocol.MajorVersion;
181                         version = version << 32;
182                         version += cn.Protocol.MinorVersion;
183                         return version;
184                 }
185                 
186                 private void SetDeliveryInfo (IMessage msg, long senderVersion,
187                                               string transactionId)
188                 {
189                         msg.SetDeliveryInfo (Acknowledgment.None,
190                                              DateTime.MinValue,
191                                              this,
192                                              Guid.NewGuid ().ToString () + "\\0",
193                                              MessageType.Normal,
194                                              new byte[0],
195                                              senderVersion,
196                                              DateTime.UtcNow,
197                                              null,
198                                              transactionId);
199                 }
200                 
201                 public void Close ()
202                 {
203                         // No-op (Queue are currently stateless)
204                 }
205                 
206                 public static void Delete (string realm, QueueReference qRef)
207                 {
208                         ConnectionFactory cf = new ConnectionFactory ();
209                         
210                         using (IConnection cn = cf.CreateConnection (qRef.Host)) {
211                                 using (IModel model = cn.CreateModel ()) {
212                                         ushort ticket = model.AccessRequest (realm);
213                                         model.QueueDelete (ticket, qRef.Queue, false, false, false);
214                                 }
215                         }
216                 }                       
217                 
218                 public void Send (IMessage msg)
219                 {
220                         if (QRef == QueueReference.DEFAULT)
221                                 throw new MonoMessagingException ("Path has not been specified");
222                         
223                         if (msg.BodyStream == null)
224                                 throw new ArgumentException ("Message is not serialized properly");
225                 
226                         ConnectionFactory cf = new ConnectionFactory ();
227                         
228                         try {
229                                 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
230                                         SetDeliveryInfo (msg, GetVersion (cn), null);
231                                         using (IModel ch = cn.CreateModel ()) {
232                                                 Send (ch, msg);
233                                         }
234                                 }
235                         } catch (BrokerUnreachableException e) {
236                                 throw new ConnectionException (QRef, e);
237                         }
238                 }
239                 
240                 public void Send (IMessage msg, IMessageQueueTransaction transaction)
241                 {
242                         if (QRef == QueueReference.DEFAULT)
243                                 throw new MonoMessagingException ("Path has not been specified");
244                         
245                         if (msg.BodyStream == null)
246                                 throw new ArgumentException ("Message is not serialized properly");
247                         
248                         RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
249                         
250                         tx.RunSend (SendInContext, msg);
251                 }
252                 
253                 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
254                 {
255                         switch (transactionType) {
256                         case MessageQueueTransactionType.Single:
257                                 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
258                                         try {
259                                                 Send (msg, tx);
260                                                 tx.Commit ();
261                                         } catch (Exception e) {
262                                                 tx.Abort ();
263                                                 throw new MonoMessagingException(e.Message, e);
264                                         }
265                                 }
266                                 break;
267
268                         case MessageQueueTransactionType.None:
269                                 Send (msg);
270                                 break;
271
272                         case MessageQueueTransactionType.Automatic:
273                                 throw new NotSupportedException("Automatic transaction types not supported");
274                         }
275                 }
276                 
277                 private void SendInContext (ref string host, ref IConnection cn, 
278                                             ref IModel model, IMessage msg, string txId)
279                 {
280                         if (host == null)
281                                 host = QRef.Host;
282                         else if (host != QRef.Host)
283                                 throw new MonoMessagingException ("Transactions can not span multiple hosts");
284                         
285                         if (cn == null) {
286                                 ConnectionFactory cf = new ConnectionFactory ();
287                                 cn = cf.CreateConnection (host);
288                         }
289                         
290                         if (model == null) {
291                                 model = cn.CreateModel ();
292                                 model.TxSelect ();
293                         }
294                         
295                         SetDeliveryInfo (msg, GetVersion (cn), txId);
296                         Send (model, msg);
297                 }
298                 
299                 private void Send (IModel model, IMessage msg)
300                 {
301                         ushort ticket = model.AccessRequest ("/data");
302                         string finalName = model.QueueDeclare (ticket, QRef.Queue, true);
303                         IMessageBuilder mb = helper.WriteMessage (model, msg);
304
305                         model.BasicPublish (ticket, "", finalName,
306                                             (IBasicProperties) mb.GetContentHeader(),
307                                             mb.GetContentBody ());
308                 }
309                 
310                 public void Purge ()
311                 {
312                         ConnectionFactory cf = new ConnectionFactory ();
313
314                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
315                                 using (IModel model = cn.CreateModel ()) {
316                                         ushort ticket = model.AccessRequest (realm);
317                                         model.QueuePurge (ticket, QRef.Queue, false);
318                                 }
319                         }
320                 }
321                 
322                 public IMessage Peek ()
323                 {
324                         ConnectionFactory cf = new ConnectionFactory ();
325
326                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
327                                 using (IModel ch = cn.CreateModel ()) {
328                                         return Receive (ch, -1, false);
329                                 }
330                         }
331                 }
332                 
333                 public IMessage Peek (TimeSpan timeout)
334                 {
335                         return Run (Peeker (timeout));
336                 }
337                 
338                 public IMessage PeekById (string id)
339                 {
340                         return Run (Peeker (ById (id)));
341                 }
342
343                 public IMessage PeekById (string id, TimeSpan timeout)
344                 {
345                         return Run (Peeker (timeout, ById (id)));
346                 }
347                 
348                 public IMessage PeekByCorrelationId (string id)
349                 {
350                         return Run (Peeker (ByCorrelationId (id)));
351                 }
352
353                 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
354                 {
355                         return Run (Peeker (timeout, ByCorrelationId (id)));
356                 }
357                 
358                 public IMessage Receive ()
359                 {
360                         return Run (Receiver ());
361                 }
362                 
363                 public IMessage Receive (TimeSpan timeout)
364                 {
365                         return Run (Receiver (timeout));
366                 }
367                 
368                 public IMessage Receive (TimeSpan timeout,
369                                          IMessageQueueTransaction transaction)
370                 {
371                         return Run (transaction, Receiver (timeout));
372                 }
373                 
374                 public IMessage Receive (TimeSpan timeout,
375                                          MessageQueueTransactionType transactionType)
376                 {
377                         return Run (transactionType, Receiver (timeout));
378                 }
379                 
380                 public IMessage Receive (IMessageQueueTransaction transaction)
381                 {
382                         return Run (transaction, Receiver());
383                 }
384                 
385                 public IMessage Receive (MessageQueueTransactionType transactionType)
386                 {
387                         return Run (transactionType, Receiver ());
388                 }               
389
390                 public IMessage ReceiveById (string id)
391                 {
392                         return Run (Receiver (ById (id)));
393                 }
394
395                 public IMessage ReceiveById (string id, TimeSpan timeout)
396                 {
397                         return Run (Receiver (timeout, ById (id)));
398                 }
399                 
400                 public IMessage ReceiveById (string id,
401                                              IMessageQueueTransaction transaction)
402                 {
403                         return Run (transaction, Receiver (ById (id)));
404                 }
405                 
406                 public IMessage ReceiveById (string id,
407                                              MessageQueueTransactionType transactionType)
408                 {
409                         return Run (transactionType, Receiver (ById (id)));
410                 }
411                 
412                 public IMessage ReceiveById (string id, TimeSpan timeout,
413                                              IMessageQueueTransaction transaction)
414                 {
415                         return Run (transaction, Receiver (timeout, ById (id)));
416                 }
417                 
418                 public IMessage ReceiveById (string id, TimeSpan timeout,
419                                              MessageQueueTransactionType transactionType)
420                 {
421                         return Run (transactionType, Receiver (timeout, ById (id)));
422                 }
423                 
424                 public IMessage ReceiveByCorrelationId (string id)
425                 {
426                         return Run (Receiver (ByCorrelationId (id)));
427                 }
428                 
429                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
430                 {
431                         return Run (Receiver (timeout, ByCorrelationId (id)));
432                 }
433                 
434                 public IMessage ReceiveByCorrelationId (string id,
435                                                         IMessageQueueTransaction transaction)
436                 {
437                         return Run (transaction, Receiver (ByCorrelationId (id)));
438                 }
439                 
440                 public IMessage ReceiveByCorrelationId (string id,
441                                                         MessageQueueTransactionType transactionType)
442                 {
443                         return Run (transactionType, Receiver (ByCorrelationId (id)));
444                 }
445                 
446                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
447                                                         IMessageQueueTransaction transaction)
448                 {
449                         return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
450                 }
451                 
452                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
453                                                         MessageQueueTransactionType transactionType)
454                 {
455                         return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
456                 }
457                 
458                 public IMessageEnumerator GetMessageEnumerator ()
459                 {
460                         return new RabbitMQMessageEnumerator (helper, QRef);
461                 }
462                 
463                 private IMessage Run (MessageQueueTransactionType transactionType,
464                                       TxReceiver.DoReceive r)
465                 {
466                         switch (transactionType) {
467                         case MessageQueueTransactionType.Single:
468                                 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
469                                         bool success = false;
470                                         try {
471                                                 IMessage msg = Run (tx, r);
472                                                 tx.Commit ();
473                                                 success = true;
474                                                 return msg;
475                                         } finally {
476                                                 if (!success)
477                                                         tx.Abort ();
478                                         }
479                                 }
480
481                         case MessageQueueTransactionType.None:
482                                 return Run (r);
483
484                         default:
485                                 throw new NotSupportedException(transactionType + " not supported");
486                         }
487                 }               
488
489                 private IMessage Run (IMessageQueueTransaction transaction,
490                                       TxReceiver.DoReceive r)
491                 {
492                         TxReceiver txr = new TxReceiver (this, r);
493                         RabbitMQMessageQueueTransaction tx = 
494                                 (RabbitMQMessageQueueTransaction) transaction;
495                         return tx.RunReceive (txr.ReceiveInContext);                    
496                 }
497                 
498                 private IMessage Run (TxReceiver.DoReceive r)
499                 {
500                         ConnectionFactory cf = new ConnectionFactory ();
501                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
502                                 using (IModel model = cn.CreateModel ()) {
503                                         return r (this, model);
504                                 }
505                         }
506                 }
507                 
508                 private IMessage ReceiveInContext (ref string host, ref IConnection cn, 
509                                                    ref IModel model, string txId)
510                 {
511                         if (host == null)
512                                 host = QRef.Host;
513                         else if (host != QRef.Host)
514                                 throw new MonoMessagingException ("Transactions can not span multiple hosts");
515                         
516                         if (cn == null) {
517                                 ConnectionFactory cf = new ConnectionFactory ();
518                                 cn = cf.CreateConnection (host);
519                         }
520                         
521                         if (model == null) {
522                                 model = cn.CreateModel ();
523                                 model.TxSelect ();
524                         }
525                         
526                         return Receive (model, -1, true);
527                 }               
528
529                 private class TxReceiver
530                 {
531                         private readonly DoReceive doReceive;
532                         private readonly RabbitMQMessageQueue q;
533                         
534                         public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
535                                 this.q = q;
536                                 this.doReceive = doReceive;
537                         }
538                         
539                         public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
540                         
541                         public IMessage ReceiveInContext (ref string host, ref IConnection cn, 
542                                                           ref IModel model, string txId)
543                         {
544                                 if (host == null)
545                                         host = q.QRef.Host;
546                                 else if (host != q.QRef.Host)
547                                         throw new MonoMessagingException ("Transactions can not span multiple hosts");
548                                 
549                                 if (cn == null) {
550                                         ConnectionFactory cf = new ConnectionFactory ();
551                                         cn = cf.CreateConnection (host);
552                                 }
553                                 
554                                 if (model == null) {
555                                         model = cn.CreateModel ();
556                                         model.TxSelect ();
557                                 }
558                                 
559                                 return doReceive (q, model);
560                         }
561                 }
562                 
563                 private class DoReceiveWithTimeout
564                 {
565                         private readonly int timeout;
566                         private readonly IsMatch matcher;
567                         private readonly bool ack;
568                         
569                         public DoReceiveWithTimeout (int timeout, IsMatch matcher)
570                                 : this (timeout, matcher, true)
571                         {
572                         }
573                         
574                         public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
575                         {
576                                 if (matcher != null && timeout == -1)
577                                         this.timeout = 500;
578                                 else 
579                                         this.timeout = timeout;
580                                 this.matcher = matcher;
581                                 this.ack = ack;
582                         }
583                         
584                         public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
585                         {
586                                 if (matcher == null)
587                                         return q.Receive (model, timeout, ack);
588                                 else
589                                         return q.Receive (model, timeout, ack, matcher);
590                         }
591                 }
592                 
593                 private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
594                                                               IsMatch matcher)
595                 {
596                         int to = TimeSpanToInt32 (timeout);
597                         return new DoReceiveWithTimeout (to, matcher).DoReceive;
598                 }
599                 
600                 private static TxReceiver.DoReceive Receiver (IsMatch matcher)
601                 {
602                         return new DoReceiveWithTimeout (-1, matcher).DoReceive;
603                 }
604                 
605                 private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
606                 {
607                         int to = TimeSpanToInt32 (timeout);
608                         return new DoReceiveWithTimeout (to, null).DoReceive;
609                 }
610
611                 private TxReceiver.DoReceive Receiver ()
612                 {
613                         return new DoReceiveWithTimeout (-1, null).DoReceive;
614                 }               
615                 
616                 private TxReceiver.DoReceive Peeker ()
617                 {
618                         return new DoReceiveWithTimeout (-1, null).DoReceive;
619                 }               
620                 
621                 private TxReceiver.DoReceive Peeker (TimeSpan timeout)
622                 {
623                         int to = TimeSpanToInt32 (timeout);
624                         return new DoReceiveWithTimeout (to, null, false).DoReceive;
625                 }               
626                 
627                 private TxReceiver.DoReceive Peeker (IsMatch matcher)
628                 {
629                         return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
630                 }               
631                 
632                 private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
633                 {
634                         int to = TimeSpanToInt32 (timeout);
635                         return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
636                 }
637                 
638                 delegate bool IsMatch (BasicDeliverEventArgs result);           
639                 
640                 private class IdMatcher
641                 {
642                         private readonly string id;
643                         public IdMatcher (string id)
644                         {
645                                 this.id = id;
646                         }
647                         
648                         public bool MatchById (BasicDeliverEventArgs result)
649                         {
650                                 return result.BasicProperties.MessageId == id;
651                         }
652                 }
653                 
654                 private static IsMatch ById (string id)
655                 {
656                         return new IdMatcher (id).MatchById;
657                 }
658                 
659                 private class CorrelationIdMatcher
660                 {
661                         private readonly string correlationId;
662                         public CorrelationIdMatcher (string correlationId)
663                         {
664                                 this.correlationId = correlationId;
665                         }
666                         
667                         public bool MatchById (BasicDeliverEventArgs result)
668                         {
669                                 return result.BasicProperties.CorrelationId == correlationId;
670                         }
671                 }
672                 
673                 private static IsMatch ByCorrelationId (string correlationId)
674                 {
675                         return new CorrelationIdMatcher (correlationId).MatchById;
676                 }
677                 
678                 private IMessage Receive (IModel model, int timeout, bool doAck)
679                 {
680                         ushort ticket = model.AccessRequest (realm);
681                         string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
682                         
683                         using (Subscription sub = new Subscription (model, ticket, finalName)) {
684                                 BasicDeliverEventArgs result;
685                                 if (sub.Next (timeout, out result)) {
686                                         IMessage m = helper.ReadMessage (QRef, result);
687                                         if (doAck)
688                                                 sub.Ack (result);
689                                         return m;
690                                 } else {
691                                         throw new MonoMessagingException ("No Message Available");
692                                 }
693                         }
694                 }
695                                 
696                 private IMessage Receive (IModel model, int timeout, 
697                                           bool doAck, IsMatch matcher)
698                 {
699                         ushort ticket = model.AccessRequest (realm);
700                         string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
701                         
702                         using (Subscription sub = new Subscription (model, ticket, finalName)) {
703                                 BasicDeliverEventArgs result;
704                                 while (sub.Next (timeout, out result)) {
705                                         
706                                         if (matcher (result)) {
707                                                 IMessage m = helper.ReadMessage (QRef, result);
708                                                 if (doAck)
709                                                         sub.Ack (result);
710                                                 return m;
711                                         }
712                                 }
713                                 
714                                 throw new MessageUnavailableException ("Message not available");
715                         }
716                 }
717                 
718                 private RabbitMQMessageQueueTransaction GetTx ()
719                 {
720                         return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
721                 }               
722                 
723                 private static int TimeSpanToInt32 (TimeSpan timespan)
724                 {
725                         if (timespan == TimeSpan.MaxValue)
726                                 return -1;
727                         else
728                                 return (int) timespan.TotalMilliseconds;
729                 }
730         }
731 }