2009-06-12 Bill Holmes <billholmes54@gmail.com>
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / RabbitMQMessageQueue.cs
1 //
2 // Mono.Messaging.RabbitMQ
3 //
4 // Authors:
5 //        Michael Barker (mike@middlesoft.co.uk)
6 //
7 // (C) 2008 Michael Barker
8 //
9
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
18 // 
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
21 // 
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30
31 using System;
32 using System.Collections;
33 using System.ComponentModel;
34 using System.IO;
35 using System.Text;
36
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
42 using RabbitMQ.Util;
43
44 namespace Mono.Messaging.RabbitMQ {
45
46         /// <summary>
47         /// RabbitMQ Implementation of a message queue.  Currrently this implementation
48         /// attempts to be as stateless as possible.  Connection the AMQP server
49         /// are only created as needed.
50         /// </summary>
51         public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
52                 
53                 private bool authenticate = false;
54                 private short basePriority = 0;
55                 private Guid category = Guid.Empty;
56                 private bool denySharedReceive = false;
57                 private EncryptionRequired encryptionRequired;
58                 private long maximumJournalSize = -1;
59                 private long maximumQueueSize = -1;
60                 private ISynchronizeInvoke synchronizingObject = null;
61                 private bool useJournalQueue = false;
62                 private QueueReference qRef = QueueReference.DEFAULT;
63                 private readonly RabbitMQMessagingProvider provider;
64                 private readonly MessageFactory helper;
65                 private readonly string realm;
66                 private readonly bool transactional;
67                 
68                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
69                                              bool transactional)
70                         : this (provider, QueueReference.DEFAULT, transactional)
71                 {
72                 }
73                 
74                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
75                                              QueueReference qRef, 
76                                              bool transactional)
77                         : this (provider, "/data", qRef, transactional)
78                 {
79                 }
80                 
81                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
82                                              string realm, QueueReference qRef,
83                                              bool transactional)
84                 {
85                         this.provider = provider;
86                         this.helper = new MessageFactory (provider);
87                         this.realm = realm;
88                         this.qRef = qRef;
89                         this.transactional = transactional;
90                 }
91                 
92                 protected override IMessageQueue Queue {
93                         get { return this; }
94                 }
95
96                 public bool Authenticate {
97                         get { return authenticate; }
98                         set { authenticate = value; }
99                 }
100
101                 public short BasePriority {
102                         get { return basePriority; }
103                         set { basePriority = value; }
104                 }
105
106                 public bool CanRead {
107                         get { throw new NotImplementedException (); }
108                 }
109                 
110                 public bool CanWrite {
111                         get { throw new NotImplementedException (); }
112                 }
113                 
114                 public Guid Category {
115                         get { return category; }
116                         set { category = value; }
117                 }
118                 
119                 public DateTime CreateTime {
120                         get { throw new NotImplementedException (); }
121                 }
122                 
123                 public bool DenySharedReceive {
124                         get { return denySharedReceive; }
125                         set { denySharedReceive = value; }
126                 }
127                 
128                 public EncryptionRequired EncryptionRequired {
129                         get { return encryptionRequired; }
130                         set { encryptionRequired = value; }
131                 }
132                 
133                 public Guid Id {
134                         get { throw new NotImplementedException (); }
135                 }
136                 
137                 public DateTime LastModifyTime {
138                         get { throw new NotImplementedException (); }
139                 }
140                 
141                 public long MaximumJournalSize {
142                         get { return maximumJournalSize; }
143                         set { maximumJournalSize = value; }
144                 }
145                 
146                 public long MaximumQueueSize {
147                         get { return maximumQueueSize; }
148                         set { maximumQueueSize = value; }
149                 }
150                 
151                 public IntPtr ReadHandle {
152                         get { throw new NotImplementedException (); }
153                 }
154                 
155                 public ISynchronizeInvoke SynchronizingObject {
156                         get { return synchronizingObject; }
157                         set { synchronizingObject = value; }
158                 }
159                 
160                 public bool Transactional {
161                         get { return transactional; }
162                 }
163                 
164                 public bool UseJournalQueue {
165                         get { return useJournalQueue; }
166                         set { useJournalQueue = value; }
167                 }
168                 
169                 public IntPtr WriteHandle {
170                         get { throw new NotImplementedException (); }
171                 }
172                 
173                 public QueueReference QRef {
174                         get { return qRef; }
175                         set { qRef = value; }
176                 }
177                 
178                 private static long GetVersion (IConnection cn)
179                 {
180                         long version = cn.Protocol.MajorVersion;
181                         version = version << 32;
182                         version += cn.Protocol.MinorVersion;
183                         return version;
184                 }
185                 
186                 private void SetDeliveryInfo (IMessage msg, long senderVersion,
187                                               string transactionId)
188                 {
189                         msg.SetDeliveryInfo (Acknowledgment.None,
190                                              DateTime.MinValue,
191                                              this,
192                                              Guid.NewGuid ().ToString () + "\\0",
193                                              MessageType.Normal,
194                                              new byte[0],
195                                              senderVersion,
196                                              DateTime.UtcNow,
197                                              null,
198                                              transactionId);
199                 }
200                 
201                 public void Close ()
202                 {
203                         // No-op (Queue are currently stateless)
204                 }
205                 
206                 public static void Delete (string realm, QueueReference qRef)
207                 {
208                         ConnectionFactory cf = new ConnectionFactory ();
209                         
210                         using (IConnection cn = cf.CreateConnection (qRef.Host)) {
211                                 using (IModel model = cn.CreateModel ()) {
212                                         model.QueueDelete (qRef.Queue, false, false, false);
213                                 }
214                         }
215                 }                       
216                 
217                 public void Send (IMessage msg)
218                 {
219                         if (QRef == QueueReference.DEFAULT)
220                                 throw new MonoMessagingException ("Path has not been specified");
221                         
222                         if (msg.BodyStream == null)
223                                 throw new ArgumentException ("Message is not serialized properly");
224                 
225                         ConnectionFactory cf = new ConnectionFactory ();
226                         
227                         try {
228                                 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
229                                         SetDeliveryInfo (msg, GetVersion (cn), null);
230                                         using (IModel ch = cn.CreateModel ()) {
231                                                 Send (ch, msg);
232                                         }
233                                 }
234                         } catch (BrokerUnreachableException e) {
235                                 throw new ConnectionException (QRef, e);
236                         }
237                 }
238                 
239                 public void Send (IMessage msg, IMessageQueueTransaction transaction)
240                 {
241                         if (QRef == QueueReference.DEFAULT)
242                                 throw new MonoMessagingException ("Path has not been specified");
243                         
244                         if (msg.BodyStream == null)
245                                 throw new ArgumentException ("Message is not serialized properly");
246                         
247                         RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
248                         
249                         tx.RunSend (SendInContext, msg);
250                 }
251                 
252                 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
253                 {
254                         switch (transactionType) {
255                         case MessageQueueTransactionType.Single:
256                                 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
257                                         try {
258                                                 Send (msg, tx);
259                                                 tx.Commit ();
260                                         } catch (Exception e) {
261                                                 tx.Abort ();
262                                                 throw new MonoMessagingException(e.Message, e);
263                                         }
264                                 }
265                                 break;
266
267                         case MessageQueueTransactionType.None:
268                                 Send (msg);
269                                 break;
270
271                         case MessageQueueTransactionType.Automatic:
272                                 throw new NotSupportedException("Automatic transaction types not supported");
273                         }
274                 }
275                 
276                 private void SendInContext (ref string host, ref IConnection cn, 
277                                             ref IModel model, IMessage msg, string txId)
278                 {
279                         if (host == null)
280                                 host = QRef.Host;
281                         else if (host != QRef.Host)
282                                 throw new MonoMessagingException ("Transactions can not span multiple hosts");
283                         
284                         if (cn == null) {
285                                 ConnectionFactory cf = new ConnectionFactory ();
286                                 cn = cf.CreateConnection (host);
287                         }
288                         
289                         if (model == null) {
290                                 model = cn.CreateModel ();
291                                 model.TxSelect ();
292                         }
293                         
294                         SetDeliveryInfo (msg, GetVersion (cn), txId);
295                         Send (model, msg);
296                 }
297                 
298                 private void Send (IModel model, IMessage msg)
299                 {
300                         string finalName = model.QueueDeclare (QRef.Queue, true);
301                         IMessageBuilder mb = helper.WriteMessage (model, msg);
302
303                         model.BasicPublish ("", finalName,
304                                             (IBasicProperties) mb.GetContentHeader(),
305                                             mb.GetContentBody ());
306                 }
307                 
308                 public void Purge ()
309                 {
310                         ConnectionFactory cf = new ConnectionFactory ();
311
312                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
313                                 using (IModel model = cn.CreateModel ()) {
314                                         model.QueuePurge (QRef.Queue, false);
315                                 }
316                         }
317                 }
318                 
319                 public IMessage Peek ()
320                 {
321                         ConnectionFactory cf = new ConnectionFactory ();
322
323                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
324                                 using (IModel ch = cn.CreateModel ()) {
325                                         return Receive (ch, -1, false);
326                                 }
327                         }
328                 }
329                 
330                 public IMessage Peek (TimeSpan timeout)
331                 {
332                         return Run (Peeker (timeout));
333                 }
334                 
335                 public IMessage PeekById (string id)
336                 {
337                         return Run (Peeker (ById (id)));
338                 }
339
340                 public IMessage PeekById (string id, TimeSpan timeout)
341                 {
342                         return Run (Peeker (timeout, ById (id)));
343                 }
344                 
345                 public IMessage PeekByCorrelationId (string id)
346                 {
347                         return Run (Peeker (ByCorrelationId (id)));
348                 }
349
350                 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
351                 {
352                         return Run (Peeker (timeout, ByCorrelationId (id)));
353                 }
354                 
355                 public IMessage Receive ()
356                 {
357                         return Run (Receiver ());
358                 }
359                 
360                 public IMessage Receive (TimeSpan timeout)
361                 {
362                         return Run (Receiver (timeout));
363                 }
364                 
365                 public IMessage Receive (TimeSpan timeout,
366                                          IMessageQueueTransaction transaction)
367                 {
368                         return Run (transaction, Receiver (timeout));
369                 }
370                 
371                 public IMessage Receive (TimeSpan timeout,
372                                          MessageQueueTransactionType transactionType)
373                 {
374                         return Run (transactionType, Receiver (timeout));
375                 }
376                 
377                 public IMessage Receive (IMessageQueueTransaction transaction)
378                 {
379                         return Run (transaction, Receiver());
380                 }
381                 
382                 public IMessage Receive (MessageQueueTransactionType transactionType)
383                 {
384                         return Run (transactionType, Receiver ());
385                 }               
386
387                 public IMessage ReceiveById (string id)
388                 {
389                         return Run (Receiver (ById (id)));
390                 }
391
392                 public IMessage ReceiveById (string id, TimeSpan timeout)
393                 {
394                         return Run (Receiver (timeout, ById (id)));
395                 }
396                 
397                 public IMessage ReceiveById (string id,
398                                              IMessageQueueTransaction transaction)
399                 {
400                         return Run (transaction, Receiver (ById (id)));
401                 }
402                 
403                 public IMessage ReceiveById (string id,
404                                              MessageQueueTransactionType transactionType)
405                 {
406                         return Run (transactionType, Receiver (ById (id)));
407                 }
408                 
409                 public IMessage ReceiveById (string id, TimeSpan timeout,
410                                              IMessageQueueTransaction transaction)
411                 {
412                         return Run (transaction, Receiver (timeout, ById (id)));
413                 }
414                 
415                 public IMessage ReceiveById (string id, TimeSpan timeout,
416                                              MessageQueueTransactionType transactionType)
417                 {
418                         return Run (transactionType, Receiver (timeout, ById (id)));
419                 }
420                 
421                 public IMessage ReceiveByCorrelationId (string id)
422                 {
423                         return Run (Receiver (ByCorrelationId (id)));
424                 }
425                 
426                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
427                 {
428                         return Run (Receiver (timeout, ByCorrelationId (id)));
429                 }
430                 
431                 public IMessage ReceiveByCorrelationId (string id,
432                                                         IMessageQueueTransaction transaction)
433                 {
434                         return Run (transaction, Receiver (ByCorrelationId (id)));
435                 }
436                 
437                 public IMessage ReceiveByCorrelationId (string id,
438                                                         MessageQueueTransactionType transactionType)
439                 {
440                         return Run (transactionType, Receiver (ByCorrelationId (id)));
441                 }
442                 
443                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
444                                                         IMessageQueueTransaction transaction)
445                 {
446                         return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
447                 }
448                 
449                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
450                                                         MessageQueueTransactionType transactionType)
451                 {
452                         return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
453                 }
454                 
455                 public IMessageEnumerator GetMessageEnumerator ()
456                 {
457                         return new RabbitMQMessageEnumerator (helper, QRef);
458                 }
459                 
460                 private IMessage Run (MessageQueueTransactionType transactionType,
461                                       TxReceiver.DoReceive r)
462                 {
463                         switch (transactionType) {
464                         case MessageQueueTransactionType.Single:
465                                 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
466                                         bool success = false;
467                                         try {
468                                                 IMessage msg = Run (tx, r);
469                                                 tx.Commit ();
470                                                 success = true;
471                                                 return msg;
472                                         } finally {
473                                                 if (!success)
474                                                         tx.Abort ();
475                                         }
476                                 }
477
478                         case MessageQueueTransactionType.None:
479                                 return Run (r);
480
481                         default:
482                                 throw new NotSupportedException(transactionType + " not supported");
483                         }
484                 }               
485
486                 private IMessage Run (IMessageQueueTransaction transaction,
487                                       TxReceiver.DoReceive r)
488                 {
489                         TxReceiver txr = new TxReceiver (this, r);
490                         RabbitMQMessageQueueTransaction tx = 
491                                 (RabbitMQMessageQueueTransaction) transaction;
492                         return tx.RunReceive (txr.ReceiveInContext);                    
493                 }
494                 
495                 private IMessage Run (TxReceiver.DoReceive r)
496                 {
497                         ConnectionFactory cf = new ConnectionFactory ();
498                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
499                                 using (IModel model = cn.CreateModel ()) {
500                                         return r (this, model);
501                                 }
502                         }
503                 }
504                 
505                 private IMessage ReceiveInContext (ref string host, ref IConnection cn, 
506                                                    ref IModel model, string txId)
507                 {
508                         if (host == null)
509                                 host = QRef.Host;
510                         else if (host != QRef.Host)
511                                 throw new MonoMessagingException ("Transactions can not span multiple hosts");
512                         
513                         if (cn == null) {
514                                 ConnectionFactory cf = new ConnectionFactory ();
515                                 cn = cf.CreateConnection (host);
516                         }
517                         
518                         if (model == null) {
519                                 model = cn.CreateModel ();
520                                 model.TxSelect ();
521                         }
522                         
523                         return Receive (model, -1, true);
524                 }               
525
526                 private class TxReceiver
527                 {
528                         private readonly DoReceive doReceive;
529                         private readonly RabbitMQMessageQueue q;
530                         
531                         public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
532                                 this.q = q;
533                                 this.doReceive = doReceive;
534                         }
535                         
536                         public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
537                         
538                         public IMessage ReceiveInContext (ref string host, ref IConnection cn, 
539                                                           ref IModel model, string txId)
540                         {
541                                 if (host == null)
542                                         host = q.QRef.Host;
543                                 else if (host != q.QRef.Host)
544                                         throw new MonoMessagingException ("Transactions can not span multiple hosts");
545                                 
546                                 if (cn == null) {
547                                         ConnectionFactory cf = new ConnectionFactory ();
548                                         cn = cf.CreateConnection (host);
549                                 }
550                                 
551                                 if (model == null) {
552                                         model = cn.CreateModel ();
553                                         model.TxSelect ();
554                                 }
555                                 
556                                 return doReceive (q, model);
557                         }
558                 }
559                 
560                 private class DoReceiveWithTimeout
561                 {
562                         private readonly int timeout;
563                         private readonly IsMatch matcher;
564                         private readonly bool ack;
565                         
566                         public DoReceiveWithTimeout (int timeout, IsMatch matcher)
567                                 : this (timeout, matcher, true)
568                         {
569                         }
570                         
571                         public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
572                         {
573                                 if (matcher != null && timeout == -1)
574                                         this.timeout = 500;
575                                 else 
576                                         this.timeout = timeout;
577                                 this.matcher = matcher;
578                                 this.ack = ack;
579                         }
580                         
581                         public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
582                         {
583                                 if (matcher == null)
584                                         return q.Receive (model, timeout, ack);
585                                 else
586                                         return q.Receive (model, timeout, ack, matcher);
587                         }
588                 }
589                 
590                 private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
591                                                               IsMatch matcher)
592                 {
593                         int to = MessageFactory.TimeSpanToInt32 (timeout);
594                         return new DoReceiveWithTimeout (to, matcher).DoReceive;
595                 }
596                 
597                 private static TxReceiver.DoReceive Receiver (IsMatch matcher)
598                 {
599                         return new DoReceiveWithTimeout (-1, matcher).DoReceive;
600                 }
601                 
602                 private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
603                 {
604                         int to = MessageFactory.TimeSpanToInt32 (timeout);
605                         return new DoReceiveWithTimeout (to, null).DoReceive;
606                 }
607
608                 private TxReceiver.DoReceive Receiver ()
609                 {
610                         return new DoReceiveWithTimeout (-1, null).DoReceive;
611                 }               
612                 
613                 private TxReceiver.DoReceive Peeker ()
614                 {
615                         return new DoReceiveWithTimeout (-1, null).DoReceive;
616                 }               
617                 
618                 private TxReceiver.DoReceive Peeker (TimeSpan timeout)
619                 {
620                         int to = MessageFactory.TimeSpanToInt32 (timeout);
621                         return new DoReceiveWithTimeout (to, null, false).DoReceive;
622                 }               
623                 
624                 private TxReceiver.DoReceive Peeker (IsMatch matcher)
625                 {
626                         return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
627                 }               
628                 
629                 private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
630                 {
631                         int to = MessageFactory.TimeSpanToInt32 (timeout);
632                         return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
633                 }
634                 
635                 delegate bool IsMatch (BasicDeliverEventArgs result);           
636                 
637                 private class IdMatcher
638                 {
639                         private readonly string id;
640                         public IdMatcher (string id)
641                         {
642                                 this.id = id;
643                         }
644                         
645                         public bool MatchById (BasicDeliverEventArgs result)
646                         {
647                                 return result.BasicProperties.MessageId == id;
648                         }
649                 }
650                 
651                 private static IsMatch ById (string id)
652                 {
653                         return new IdMatcher (id).MatchById;
654                 }
655                 
656                 private class CorrelationIdMatcher
657                 {
658                         private readonly string correlationId;
659                         public CorrelationIdMatcher (string correlationId)
660                         {
661                                 this.correlationId = correlationId;
662                         }
663                         
664                         public bool MatchById (BasicDeliverEventArgs result)
665                         {
666                                 return result.BasicProperties.CorrelationId == correlationId;
667                         }
668                 }
669                 
670                 private static IsMatch ByCorrelationId (string correlationId)
671                 {
672                         return new CorrelationIdMatcher (correlationId).MatchById;
673                 }
674                 
675                 private IMessage Receive (IModel model, int timeout, bool doAck)
676                 {
677                         string finalName = model.QueueDeclare (QRef.Queue, false);
678                         
679                         using (Subscription sub = new Subscription (model, finalName)) {
680                                 BasicDeliverEventArgs result;
681                                 if (sub.Next (timeout, out result)) {
682                                         IMessage m = helper.ReadMessage (QRef, result);
683                                         if (doAck)
684                                                 sub.Ack (result);
685                                         return m;
686                                 } else {
687                                         throw new MonoMessagingException ("No Message Available");
688                                 }
689                         }
690                 }
691                                 
692                 private IMessage Receive (IModel model, int timeout, 
693                                           bool doAck, IsMatch matcher)
694                 {
695                         string finalName = model.QueueDeclare (QRef.Queue, false);
696                         
697                         using (Subscription sub = new Subscription (model, finalName)) {
698                                 BasicDeliverEventArgs result;
699                                 while (sub.Next (timeout, out result)) {
700                                         
701                                         if (matcher (result)) {
702                                                 IMessage m = helper.ReadMessage (QRef, result);
703                                                 if (doAck)
704                                                         sub.Ack (result);
705                                                 return m;
706                                         }
707                                 }
708                                 
709                                 throw new MessageUnavailableException ("Message not available");
710                         }
711                 }
712                 
713                 private RabbitMQMessageQueueTransaction GetTx ()
714                 {
715                         return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
716                 }               
717         }
718 }