2009-07-11 Michael Barker <mike@middlesoft.co.uk>
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / RabbitMQMessageQueue.cs
1 //
2 // Mono.Messaging.RabbitMQ
3 //
4 // Authors:
5 //        Michael Barker (mike@middlesoft.co.uk)
6 //
7 // (C) 2008 Michael Barker
8 //
9
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
18 // 
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
21 // 
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30
31 using System;
32 using System.Collections;
33 using System.ComponentModel;
34 using System.IO;
35 using System.Text;
36
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
42 using RabbitMQ.Util;
43
44 namespace Mono.Messaging.RabbitMQ {
45
46         /// <summary>
47         /// RabbitMQ Implementation of a message queue.  Currrently this implementation
48         /// attempts to be as stateless as possible.  Connection the AMQP server
49         /// are only created as needed.
50         /// </summary>
51         public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
52                 
53                 private bool authenticate = false;
54                 private short basePriority = 0;
55                 private Guid category = Guid.Empty;
56                 private bool denySharedReceive = false;
57                 private EncryptionRequired encryptionRequired;
58                 private long maximumJournalSize = -1;
59                 private long maximumQueueSize = -1;
60                 private ISynchronizeInvoke synchronizingObject = null;
61                 private bool useJournalQueue = false;
62                 private QueueReference qRef = QueueReference.DEFAULT;
63                 private readonly RabbitMQMessagingProvider provider;
64                 private readonly MessageFactory helper;
65                 private readonly bool transactional;
66                 
67                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
68                                              bool transactional)
69                         : this (provider, QueueReference.DEFAULT, transactional)
70                 {
71                 }
72                 
73                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
74                                              QueueReference qRef, 
75                                              bool transactional)
76                 {
77                         this.provider = provider;
78                         this.helper = new MessageFactory (provider);
79                         this.qRef = qRef;
80                         this.transactional = transactional;
81                 }
82                 
83                 protected override IMessageQueue Queue {
84                         get { return this; }
85                 }
86
87                 public bool Authenticate {
88                         get { return authenticate; }
89                         set { authenticate = value; }
90                 }
91
92                 public short BasePriority {
93                         get { return basePriority; }
94                         set { basePriority = value; }
95                 }
96
97                 public bool CanRead {
98                         get { throw new NotImplementedException (); }
99                 }
100                 
101                 public bool CanWrite {
102                         get { throw new NotImplementedException (); }
103                 }
104                 
105                 public Guid Category {
106                         get { return category; }
107                         set { category = value; }
108                 }
109                 
110                 public DateTime CreateTime {
111                         get { throw new NotImplementedException (); }
112                 }
113                 
114                 public bool DenySharedReceive {
115                         get { return denySharedReceive; }
116                         set { denySharedReceive = value; }
117                 }
118                 
119                 public EncryptionRequired EncryptionRequired {
120                         get { return encryptionRequired; }
121                         set { encryptionRequired = value; }
122                 }
123                 
124                 public Guid Id {
125                         get { throw new NotImplementedException (); }
126                 }
127                 
128                 public DateTime LastModifyTime {
129                         get { throw new NotImplementedException (); }
130                 }
131                 
132                 public long MaximumJournalSize {
133                         get { return maximumJournalSize; }
134                         set { maximumJournalSize = value; }
135                 }
136                 
137                 public long MaximumQueueSize {
138                         get { return maximumQueueSize; }
139                         set { maximumQueueSize = value; }
140                 }
141                 
142                 public IntPtr ReadHandle {
143                         get { throw new NotImplementedException (); }
144                 }
145                 
146                 public ISynchronizeInvoke SynchronizingObject {
147                         get { return synchronizingObject; }
148                         set { synchronizingObject = value; }
149                 }
150                 
151                 public bool Transactional {
152                         get { return transactional; }
153                 }
154                 
155                 public bool UseJournalQueue {
156                         get { return useJournalQueue; }
157                         set { useJournalQueue = value; }
158                 }
159                 
160                 public IntPtr WriteHandle {
161                         get { throw new NotImplementedException (); }
162                 }
163                 
164                 public QueueReference QRef {
165                         get { return qRef; }
166                         set { qRef = value; }
167                 }
168                 
169                 private static long GetVersion (IConnection cn)
170                 {
171                         long version = cn.Protocol.MajorVersion;
172                         version = version << 32;
173                         version += cn.Protocol.MinorVersion;
174                         return version;
175                 }
176                 
177                 private void SetDeliveryInfo (IMessage msg, long senderVersion,
178                                               string transactionId)
179                 {
180                         msg.SetDeliveryInfo (Acknowledgment.None,
181                                              DateTime.MinValue,
182                                              this,
183                                              Guid.NewGuid ().ToString () + "\\0",
184                                              MessageType.Normal,
185                                              new byte[0],
186                                              senderVersion,
187                                              DateTime.UtcNow,
188                                              null,
189                                              transactionId);
190                 }
191                 
192                 public void Close ()
193                 {
194                         // No-op (Queue are currently stateless)
195                 }
196                 
197                 public static void Delete (QueueReference qRef)
198                 {
199                         ConnectionFactory cf = new ConnectionFactory ();
200                         
201                         using (IConnection cn = cf.CreateConnection (qRef.Host)) {
202                                 using (IModel model = cn.CreateModel ()) {
203                                         model.QueueDelete (qRef.Queue, false, false, false);
204                                 }
205                         }
206                 }                       
207                 
208                 public void Send (IMessage msg)
209                 {
210                         if (QRef == QueueReference.DEFAULT)
211                                 throw new MonoMessagingException ("Path has not been specified");
212                         
213                         if (msg.BodyStream == null)
214                                 throw new ArgumentException ("Message is not serialized properly");
215                 
216                         ConnectionFactory cf = new ConnectionFactory ();
217                         
218                         try {
219                                 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
220                                         SetDeliveryInfo (msg, GetVersion (cn), null);
221                                         using (IModel ch = cn.CreateModel ()) {
222                                                 Send (ch, msg);
223                                         }
224                                 }
225                         } catch (BrokerUnreachableException e) {
226                                 throw new ConnectionException (QRef, e);
227                         }
228                 }
229                 
230                 public void Send (IMessage msg, IMessageQueueTransaction transaction)
231                 {
232                         if (QRef == QueueReference.DEFAULT)
233                                 throw new MonoMessagingException ("Path has not been specified");
234                         
235                         if (msg.BodyStream == null)
236                                 throw new ArgumentException ("Message is not serialized properly");
237                         
238                         RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
239                         
240                         tx.RunSend (SendInContext, msg);
241                 }
242                 
243                 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
244                 {
245                         switch (transactionType) {
246                         case MessageQueueTransactionType.Single:
247                                 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
248                                         try {
249                                                 Send (msg, tx);
250                                                 tx.Commit ();
251                                         } catch (Exception e) {
252                                                 tx.Abort ();
253                                                 throw new MonoMessagingException(e.Message, e);
254                                         }
255                                 }
256                                 break;
257
258                         case MessageQueueTransactionType.None:
259                                 Send (msg);
260                                 break;
261
262                         case MessageQueueTransactionType.Automatic:
263                                 throw new NotSupportedException("Automatic transaction types not supported");
264                         }
265                 }
266                 
267                 private void SendInContext (ref string host, ref IConnection cn, 
268                                             ref IModel model, IMessage msg, string txId)
269                 {
270                         if (host == null)
271                                 host = QRef.Host;
272                         else if (host != QRef.Host)
273                                 throw new MonoMessagingException ("Transactions can not span multiple hosts");
274                         
275                         if (cn == null) {
276                                 ConnectionFactory cf = new ConnectionFactory ();
277                                 cn = cf.CreateConnection (host);
278                         }
279                         
280                         if (model == null) {
281                                 model = cn.CreateModel ();
282                                 model.TxSelect ();
283                         }
284                         
285                         SetDeliveryInfo (msg, GetVersion (cn), txId);
286                         Send (model, msg);
287                 }
288                 
289                 private void Send (IModel model, IMessage msg)
290                 {
291                         string finalName = model.QueueDeclare (QRef.Queue, true);
292                         IMessageBuilder mb = helper.WriteMessage (model, msg);
293
294                         model.BasicPublish ("", finalName,
295                                             (IBasicProperties) mb.GetContentHeader(),
296                                             mb.GetContentBody ());
297                 }
298                 
299                 public void Purge ()
300                 {
301                         ConnectionFactory cf = new ConnectionFactory ();
302
303                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
304                                 using (IModel model = cn.CreateModel ()) {
305                                         model.QueuePurge (QRef.Queue, false);
306                                 }
307                         }
308                 }
309                 
310                 public IMessage Peek ()
311                 {
312                         ConnectionFactory cf = new ConnectionFactory ();
313
314                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
315                                 using (IModel ch = cn.CreateModel ()) {
316                                         return Receive (ch, -1, false);
317                                 }
318                         }
319                 }
320                 
321                 public IMessage Peek (TimeSpan timeout)
322                 {
323                         return Run (Peeker (timeout));
324                 }
325                 
326                 public IMessage PeekById (string id)
327                 {
328                         return Run (Peeker (ById (id)));
329                 }
330
331                 public IMessage PeekById (string id, TimeSpan timeout)
332                 {
333                         return Run (Peeker (timeout, ById (id)));
334                 }
335                 
336                 public IMessage PeekByCorrelationId (string id)
337                 {
338                         return Run (Peeker (ByCorrelationId (id)));
339                 }
340
341                 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
342                 {
343                         return Run (Peeker (timeout, ByCorrelationId (id)));
344                 }
345                 
346                 public IMessage Receive ()
347                 {
348                         return Run (Receiver ());
349                 }
350                 
351                 public IMessage Receive (TimeSpan timeout)
352                 {
353                         return Run (Receiver (timeout));
354                 }
355                 
356                 public IMessage Receive (TimeSpan timeout,
357                                          IMessageQueueTransaction transaction)
358                 {
359                         return Run (transaction, Receiver (timeout));
360                 }
361                 
362                 public IMessage Receive (TimeSpan timeout,
363                                          MessageQueueTransactionType transactionType)
364                 {
365                         return Run (transactionType, Receiver (timeout));
366                 }
367                 
368                 public IMessage Receive (IMessageQueueTransaction transaction)
369                 {
370                         return Run (transaction, Receiver());
371                 }
372                 
373                 public IMessage Receive (MessageQueueTransactionType transactionType)
374                 {
375                         return Run (transactionType, Receiver ());
376                 }               
377
378                 public IMessage ReceiveById (string id)
379                 {
380                         return Run (Receiver (ById (id)));
381                 }
382
383                 public IMessage ReceiveById (string id, TimeSpan timeout)
384                 {
385                         return Run (Receiver (timeout, ById (id)));
386                 }
387                 
388                 public IMessage ReceiveById (string id,
389                                              IMessageQueueTransaction transaction)
390                 {
391                         return Run (transaction, Receiver (ById (id)));
392                 }
393                 
394                 public IMessage ReceiveById (string id,
395                                              MessageQueueTransactionType transactionType)
396                 {
397                         return Run (transactionType, Receiver (ById (id)));
398                 }
399                 
400                 public IMessage ReceiveById (string id, TimeSpan timeout,
401                                              IMessageQueueTransaction transaction)
402                 {
403                         return Run (transaction, Receiver (timeout, ById (id)));
404                 }
405                 
406                 public IMessage ReceiveById (string id, TimeSpan timeout,
407                                              MessageQueueTransactionType transactionType)
408                 {
409                         return Run (transactionType, Receiver (timeout, ById (id)));
410                 }
411                 
412                 public IMessage ReceiveByCorrelationId (string id)
413                 {
414                         return Run (Receiver (ByCorrelationId (id)));
415                 }
416                 
417                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
418                 {
419                         return Run (Receiver (timeout, ByCorrelationId (id)));
420                 }
421                 
422                 public IMessage ReceiveByCorrelationId (string id,
423                                                         IMessageQueueTransaction transaction)
424                 {
425                         return Run (transaction, Receiver (ByCorrelationId (id)));
426                 }
427                 
428                 public IMessage ReceiveByCorrelationId (string id,
429                                                         MessageQueueTransactionType transactionType)
430                 {
431                         return Run (transactionType, Receiver (ByCorrelationId (id)));
432                 }
433                 
434                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
435                                                         IMessageQueueTransaction transaction)
436                 {
437                         return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
438                 }
439                 
440                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
441                                                         MessageQueueTransactionType transactionType)
442                 {
443                         return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
444                 }
445                 
446                 public IMessageEnumerator GetMessageEnumerator ()
447                 {
448                         return new RabbitMQMessageEnumerator (helper, QRef);
449                 }
450                 
451                 private IMessage Run (MessageQueueTransactionType transactionType,
452                                       TxReceiver.DoReceive r)
453                 {
454                         switch (transactionType) {
455                         case MessageQueueTransactionType.Single:
456                                 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
457                                         bool success = false;
458                                         try {
459                                                 IMessage msg = Run (tx, r);
460                                                 tx.Commit ();
461                                                 success = true;
462                                                 return msg;
463                                         } finally {
464                                                 if (!success)
465                                                         tx.Abort ();
466                                         }
467                                 }
468
469                         case MessageQueueTransactionType.None:
470                                 return Run (r);
471
472                         default:
473                                 throw new NotSupportedException(transactionType + " not supported");
474                         }
475                 }               
476
477                 private IMessage Run (IMessageQueueTransaction transaction,
478                                       TxReceiver.DoReceive r)
479                 {
480                         TxReceiver txr = new TxReceiver (this, r);
481                         RabbitMQMessageQueueTransaction tx = 
482                                 (RabbitMQMessageQueueTransaction) transaction;
483                         return tx.RunReceive (txr.ReceiveInContext);                    
484                 }
485                 
486                 private IMessage Run (TxReceiver.DoReceive r)
487                 {
488                         ConnectionFactory cf = new ConnectionFactory ();
489                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
490                                 using (IModel model = cn.CreateModel ()) {
491                                         return r (this, model);
492                                 }
493                         }
494                 }
495                 
496                 private IMessage ReceiveInContext (ref string host, ref IConnection cn, 
497                                                    ref IModel model, string txId)
498                 {
499                         if (host == null)
500                                 host = QRef.Host;
501                         else if (host != QRef.Host)
502                                 throw new MonoMessagingException ("Transactions can not span multiple hosts");
503                         
504                         if (cn == null) {
505                                 ConnectionFactory cf = new ConnectionFactory ();
506                                 cn = cf.CreateConnection (host);
507                         }
508                         
509                         if (model == null) {
510                                 model = cn.CreateModel ();
511                                 model.TxSelect ();
512                         }
513                         
514                         return Receive (model, -1, true);
515                 }               
516
517                 private class TxReceiver
518                 {
519                         private readonly DoReceive doReceive;
520                         private readonly RabbitMQMessageQueue q;
521                         
522                         public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
523                                 this.q = q;
524                                 this.doReceive = doReceive;
525                         }
526                         
527                         public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
528                         
529                         public IMessage ReceiveInContext (ref string host, ref IConnection cn, 
530                                                           ref IModel model, string txId)
531                         {
532                                 if (host == null)
533                                         host = q.QRef.Host;
534                                 else if (host != q.QRef.Host)
535                                         throw new MonoMessagingException ("Transactions can not span multiple hosts");
536                                 
537                                 if (cn == null) {
538                                         ConnectionFactory cf = new ConnectionFactory ();
539                                         cn = cf.CreateConnection (host);
540                                 }
541                                 
542                                 if (model == null) {
543                                         model = cn.CreateModel ();
544                                         model.TxSelect ();
545                                 }
546                                 
547                                 return doReceive (q, model);
548                         }
549                 }
550                 
551                 private class DoReceiveWithTimeout
552                 {
553                         private readonly int timeout;
554                         private readonly IsMatch matcher;
555                         private readonly bool ack;
556                         
557                         public DoReceiveWithTimeout (int timeout, IsMatch matcher)
558                                 : this (timeout, matcher, true)
559                         {
560                         }
561                         
562                         public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
563                         {
564                                 if (matcher != null && timeout == -1)
565                                         this.timeout = 500;
566                                 else 
567                                         this.timeout = timeout;
568                                 this.matcher = matcher;
569                                 this.ack = ack;
570                         }
571                         
572                         public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
573                         {
574                                 if (matcher == null)
575                                         return q.Receive (model, timeout, ack);
576                                 else
577                                         return q.Receive (model, timeout, ack, matcher);
578                         }
579                 }
580                 
581                 private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
582                                                               IsMatch matcher)
583                 {
584                         int to = MessageFactory.TimeSpanToInt32 (timeout);
585                         return new DoReceiveWithTimeout (to, matcher).DoReceive;
586                 }
587                 
588                 private static TxReceiver.DoReceive Receiver (IsMatch matcher)
589                 {
590                         return new DoReceiveWithTimeout (-1, matcher).DoReceive;
591                 }
592                 
593                 private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
594                 {
595                         int to = MessageFactory.TimeSpanToInt32 (timeout);
596                         return new DoReceiveWithTimeout (to, null).DoReceive;
597                 }
598
599                 private TxReceiver.DoReceive Receiver ()
600                 {
601                         return new DoReceiveWithTimeout (-1, null).DoReceive;
602                 }               
603                 
604                 private TxReceiver.DoReceive Peeker ()
605                 {
606                         return new DoReceiveWithTimeout (-1, null).DoReceive;
607                 }               
608                 
609                 private TxReceiver.DoReceive Peeker (TimeSpan timeout)
610                 {
611                         int to = MessageFactory.TimeSpanToInt32 (timeout);
612                         return new DoReceiveWithTimeout (to, null, false).DoReceive;
613                 }               
614                 
615                 private TxReceiver.DoReceive Peeker (IsMatch matcher)
616                 {
617                         return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
618                 }               
619                 
620                 private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
621                 {
622                         int to = MessageFactory.TimeSpanToInt32 (timeout);
623                         return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
624                 }
625                 
626                 delegate bool IsMatch (BasicDeliverEventArgs result);           
627                 
628                 private class IdMatcher
629                 {
630                         private readonly string id;
631                         public IdMatcher (string id)
632                         {
633                                 this.id = id;
634                         }
635                         
636                         public bool MatchById (BasicDeliverEventArgs result)
637                         {
638                                 return result.BasicProperties.MessageId == id;
639                         }
640                 }
641                 
642                 private static IsMatch ById (string id)
643                 {
644                         return new IdMatcher (id).MatchById;
645                 }
646                 
647                 private class CorrelationIdMatcher
648                 {
649                         private readonly string correlationId;
650                         public CorrelationIdMatcher (string correlationId)
651                         {
652                                 this.correlationId = correlationId;
653                         }
654                         
655                         public bool MatchById (BasicDeliverEventArgs result)
656                         {
657                                 return result.BasicProperties.CorrelationId == correlationId;
658                         }
659                 }
660                 
661                 private static IsMatch ByCorrelationId (string correlationId)
662                 {
663                         return new CorrelationIdMatcher (correlationId).MatchById;
664                 }
665                 
666                 private IMessage Receive (IModel model, int timeout, bool doAck)
667                 {
668                         string finalName = model.QueueDeclare (QRef.Queue, false);
669                         
670                         using (Subscription sub = new Subscription (model, finalName)) {
671                                 BasicDeliverEventArgs result;
672                                 if (sub.Next (timeout, out result)) {
673                                         IMessage m = helper.ReadMessage (QRef, result);
674                                         if (doAck)
675                                                 sub.Ack (result);
676                                         return m;
677                                 } else {
678                                         throw new MonoMessagingException ("No Message Available");
679                                 }
680                         }
681                 }
682                                 
683                 private IMessage Receive (IModel model, int timeout, 
684                                           bool doAck, IsMatch matcher)
685                 {
686                         string finalName = model.QueueDeclare (QRef.Queue, false);
687                         
688                         using (Subscription sub = new Subscription (model, finalName)) {
689                                 BasicDeliverEventArgs result;
690                                 while (sub.Next (timeout, out result)) {
691                                         
692                                         if (matcher (result)) {
693                                                 IMessage m = helper.ReadMessage (QRef, result);
694                                                 if (doAck)
695                                                         sub.Ack (result);
696                                                 return m;
697                                         }
698                                 }
699                                 
700                                 throw new MessageUnavailableException ("Message not available");
701                         }
702                 }
703                 
704                 private RabbitMQMessageQueueTransaction GetTx ()
705                 {
706                         return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
707                 }               
708         }
709 }