Add a more functional (i.e. fewer-stubs) implementation of System.Data.Linq.
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / RabbitMQMessageQueue.cs
1 //
2 // Mono.Messaging.RabbitMQ
3 //
4 // Authors:
5 //        Michael Barker (mike@middlesoft.co.uk)
6 //
7 // (C) 2008 Michael Barker
8 //
9
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
18 // 
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
21 // 
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30
31 using System;
32 using System.Collections;
33 using System.ComponentModel;
34 using System.IO;
35 using System.Text;
36
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
42 using RabbitMQ.Util;
43
44 namespace Mono.Messaging.RabbitMQ {
45
46         public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
47                 
48                 private bool authenticate = false;
49                 private short basePriority = 0;
50                 private Guid category = Guid.Empty;
51                 private bool denySharedReceive = false;
52                 private EncryptionRequired encryptionRequired;
53                 private long maximumJournalSize = -1;
54                 private long maximumQueueSize = -1;
55                 private ISynchronizeInvoke synchronizingObject = null;
56                 private bool useJournalQueue = false;
57                 private QueueReference qRef = QueueReference.DEFAULT;
58                 private readonly RabbitMQMessagingProvider provider;
59                 private readonly MessageFactory helper;
60                 private readonly string realm;
61                 private readonly bool transactional;
62                 
63                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
64                                              bool transactional)
65                         : this (provider, QueueReference.DEFAULT, transactional)
66                 {
67                 }
68                 
69                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
70                                              QueueReference qRef, 
71                                              bool transactional)
72                         : this (provider, "/data", qRef, transactional)
73                 {
74                 }
75                 
76                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
77                                              string realm, QueueReference qRef,
78                                              bool transactional)
79                 {
80                         this.provider = provider;
81                         this.helper = new MessageFactory (provider);
82                         this.realm = realm;
83                         this.qRef = qRef;
84                         this.transactional = transactional;
85                 }
86                 
87                 protected override IMessageQueue Queue {
88                         get { return this; }
89                 }
90
91                 public bool Authenticate {
92                         get { return authenticate; }
93                         set { authenticate = value; }
94                 }
95
96                 public short BasePriority {
97                         get { return basePriority; }
98                         set { basePriority = value; }
99                 }
100
101                 public bool CanRead {
102                         get { throw new NotImplementedException (); }
103                 }
104                 
105                 public bool CanWrite {
106                         get { throw new NotImplementedException (); }
107                 }
108                 
109                 public Guid Category {
110                         get { return category; }
111                         set { category = value; }
112                 }
113                 
114                 public DateTime CreateTime {
115                         get { throw new NotImplementedException (); }
116                 }
117                 
118                 public bool DenySharedReceive {
119                         get { return denySharedReceive; }
120                         set { denySharedReceive = value; }
121                 }
122                 
123                 public EncryptionRequired EncryptionRequired {
124                         get { return encryptionRequired; }
125                         set { encryptionRequired = value; }
126                 }
127                 
128                 public Guid Id {
129                         get { throw new NotImplementedException (); }
130                 }
131                 
132                 public DateTime LastModifyTime {
133                         get { throw new NotImplementedException (); }
134                 }
135                 
136                 public long MaximumJournalSize {
137                         get { return maximumJournalSize; }
138                         set { maximumJournalSize = value; }
139                 }
140                 
141                 public long MaximumQueueSize {
142                         get { return maximumQueueSize; }
143                         set { maximumQueueSize = value; }
144                 }
145                 
146                 public IntPtr ReadHandle {
147                         get { throw new NotImplementedException (); }
148                 }
149                 
150                 public ISynchronizeInvoke SynchronizingObject {
151                         get { return synchronizingObject; }
152                         set { synchronizingObject = value; }
153                 }
154                 
155                 public bool Transactional {
156                         get { return transactional; }
157                 }
158                 
159                 public bool UseJournalQueue {
160                         get { return useJournalQueue; }
161                         set { useJournalQueue = value; }
162                 }
163                 
164                 public IntPtr WriteHandle {
165                         get { throw new NotImplementedException (); }
166                 }
167                 
168                 public QueueReference QRef {
169                         get { return qRef; }
170                         set { qRef = value; }
171                 }
172                 
173                 private static long GetVersion (IConnection cn)
174                 {
175                         long version = cn.Protocol.MajorVersion;
176                         version = version << 32;
177                         version += cn.Protocol.MinorVersion;
178                         return version;
179                 }
180                 
181                 private void SetDeliveryInfo (IMessage msg, long senderVersion,
182                                               string transactionId)
183                 {
184                         msg.SetDeliveryInfo (Acknowledgment.None,
185                                              DateTime.MinValue,
186                                              this,
187                                              Guid.NewGuid ().ToString () + "\\0",
188                                              MessageType.Normal,
189                                              new byte[0],
190                                              senderVersion,
191                                              DateTime.UtcNow,
192                                              null,
193                                              transactionId);
194                 }
195                 
196                 public void Close ()
197                 {
198                         // No-op (Queue are currently stateless)
199                 }
200                 
201                 public static void Delete (string realm, QueueReference qRef)
202                 {
203                         ConnectionFactory cf = new ConnectionFactory ();
204                         
205                         using (IConnection cn = cf.CreateConnection (qRef.Host)) {
206                                 using (IModel model = cn.CreateModel ()) {
207                                         ushort ticket = model.AccessRequest (realm);
208                                         model.QueueDelete (ticket, qRef.Queue, false, false, false);
209                                 }
210                         }
211                 }                       
212                 
213                 public void Send (IMessage msg)
214                 {
215                         if (QRef == QueueReference.DEFAULT)
216                                 throw new MonoMessagingException ("Path has not been specified");
217                         
218                         if (msg.BodyStream == null)
219                                 throw new ArgumentException ("Message is not serialized properly");
220                 
221                         ConnectionFactory cf = new ConnectionFactory ();
222                         
223                         try {
224                                 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
225                                         SetDeliveryInfo (msg, GetVersion (cn), null);
226                                         using (IModel ch = cn.CreateModel ()) {
227                                                 Send (ch, msg);
228                                         }
229                                 }
230                         } catch (BrokerUnreachableException e) {
231                                 throw new ConnectionException (QRef, e);
232                         }
233                 }
234                 
235                 public void Send (IMessage msg, IMessageQueueTransaction transaction)
236                 {
237                         if (QRef == QueueReference.DEFAULT)
238                                 throw new MonoMessagingException ("Path has not been specified");
239                         
240                         if (msg.BodyStream == null)
241                                 throw new ArgumentException ("Message is not serialized properly");
242                         
243                         RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
244                         
245                         tx.RunSend (SendInContext, msg);
246                 }
247                 
248                 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
249                 {
250                         switch (transactionType) {
251                         case MessageQueueTransactionType.Single:
252                                 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
253                                         try {
254                                                 Send (msg, tx);
255                                                 tx.Commit ();
256                                         } catch (Exception e) {
257                                                 tx.Abort ();
258                                                 throw new MonoMessagingException(e.Message, e);
259                                         }
260                                 }
261                                 break;
262
263                         case MessageQueueTransactionType.None:
264                                 Send (msg);
265                                 break;
266
267                         case MessageQueueTransactionType.Automatic:
268                                 throw new NotSupportedException("Automatic transaction types not supported");
269                         }
270                 }
271                 
272                 private void SendInContext (ref string host, ref IConnection cn, 
273                                             ref IModel model, IMessage msg, string txId)
274                 {
275                         if (host == null)
276                                 host = QRef.Host;
277                         else if (host != QRef.Host)
278                                 throw new MonoMessagingException ("Transactions can not span multiple hosts");
279                         
280                         if (cn == null) {
281                                 ConnectionFactory cf = new ConnectionFactory ();
282                                 cn = cf.CreateConnection (host);
283                         }
284                         
285                         if (model == null) {
286                                 model = cn.CreateModel ();
287                                 model.TxSelect ();
288                         }
289                         
290                         SetDeliveryInfo (msg, GetVersion (cn), txId);
291                         Send (model, msg);
292                 }
293                 
294                 private void Send (IModel model, IMessage msg)
295                 {
296                         ushort ticket = model.AccessRequest ("/data");
297                         string finalName = model.QueueDeclare (ticket, QRef.Queue, true);
298                         IMessageBuilder mb = helper.WriteMessage (model, msg);
299
300                         model.BasicPublish (ticket, "", finalName,
301                                             (IBasicProperties) mb.GetContentHeader(),
302                                             mb.GetContentBody ());
303                 }
304                 
305                 public void Purge ()
306                 {
307                         ConnectionFactory cf = new ConnectionFactory ();
308
309                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
310                                 using (IModel model = cn.CreateModel ()) {
311                                         ushort ticket = model.AccessRequest (realm);
312                                         model.QueuePurge (ticket, QRef.Queue, false);
313                                 }
314                         }
315                 }
316                 
317                 public IMessage Peek ()
318                 {
319                         ConnectionFactory cf = new ConnectionFactory ();
320
321                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
322                                 using (IModel ch = cn.CreateModel ()) {
323                                         return Receive (ch, -1, false);
324                                 }
325                         }
326                 }
327                 
328                 public IMessage Peek (TimeSpan timeout)
329                 {
330                         return Run (Peeker (timeout));
331                 }
332                 
333                 public IMessage PeekById (string id)
334                 {
335                         return Run (Peeker (ById (id)));
336                 }
337
338                 public IMessage PeekById (string id, TimeSpan timeout)
339                 {
340                         return Run (Peeker (timeout, ById (id)));
341                 }
342                 
343                 public IMessage PeekByCorrelationId (string id)
344                 {
345                         return Run (Peeker (ByCorrelationId (id)));
346                 }
347
348                 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
349                 {
350                         return Run (Peeker (timeout, ByCorrelationId (id)));
351                 }
352                 
353                 public IMessage Receive ()
354                 {
355                         return Run (Receiver ());
356                 }
357                 
358                 public IMessage Receive (TimeSpan timeout)
359                 {
360                         return Run (Receiver (timeout));
361                 }
362                 
363                 public IMessage Receive (TimeSpan timeout,
364                                          IMessageQueueTransaction transaction)
365                 {
366                         return Run (transaction, Receiver (timeout));
367                 }
368                 
369                 public IMessage Receive (TimeSpan timeout,
370                                          MessageQueueTransactionType transactionType)
371                 {
372                         return Run (transactionType, Receiver (timeout));
373                 }
374                 
375                 public IMessage Receive (IMessageQueueTransaction transaction)
376                 {
377                         return Run (transaction, Receiver());
378                 }
379                 
380                 public IMessage Receive (MessageQueueTransactionType transactionType)
381                 {
382                         return Run (transactionType, Receiver ());
383                 }               
384
385                 public IMessage ReceiveById (string id)
386                 {
387                         return Run (Receiver (ById (id)));
388                 }
389
390                 public IMessage ReceiveById (string id, TimeSpan timeout)
391                 {
392                         return Run (Receiver (timeout, ById (id)));
393                 }
394                 
395                 public IMessage ReceiveById (string id,
396                                              IMessageQueueTransaction transaction)
397                 {
398                         return Run (transaction, Receiver (ById (id)));
399                 }
400                 
401                 public IMessage ReceiveById (string id,
402                                              MessageQueueTransactionType transactionType)
403                 {
404                         return Run (transactionType, Receiver (ById (id)));
405                 }
406                 
407                 public IMessage ReceiveById (string id, TimeSpan timeout,
408                                              IMessageQueueTransaction transaction)
409                 {
410                         return Run (transaction, Receiver (timeout, ById (id)));
411                 }
412                 
413                 public IMessage ReceiveById (string id, TimeSpan timeout,
414                                              MessageQueueTransactionType transactionType)
415                 {
416                         return Run (transactionType, Receiver (timeout, ById (id)));
417                 }
418                 
419                 public IMessage ReceiveByCorrelationId (string id)
420                 {
421                         return Run (Receiver (ByCorrelationId (id)));
422                 }
423                 
424                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
425                 {
426                         return Run (Receiver (timeout, ByCorrelationId (id)));
427                 }
428                 
429                 public IMessage ReceiveByCorrelationId (string id,
430                                                         IMessageQueueTransaction transaction)
431                 {
432                         return Run (transaction, Receiver (ByCorrelationId (id)));
433                 }
434                 
435                 public IMessage ReceiveByCorrelationId (string id,
436                                                         MessageQueueTransactionType transactionType)
437                 {
438                         return Run (transactionType, Receiver (ByCorrelationId (id)));
439                 }
440                 
441                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
442                                                         IMessageQueueTransaction transaction)
443                 {
444                         return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
445                 }
446                 
447                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
448                                                         MessageQueueTransactionType transactionType)
449                 {
450                         return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
451                 }
452                 
453                 public IMessageEnumerator GetMessageEnumerator ()
454                 {
455                         return new RabbitMQMessageEnumerator (helper, QRef);
456                 }
457                 
458                 private IMessage Run (MessageQueueTransactionType transactionType,
459                                       TxReceiver.DoReceive r)
460                 {
461                         switch (transactionType) {
462                         case MessageQueueTransactionType.Single:
463                                 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
464                                         bool success = false;
465                                         try {
466                                                 IMessage msg = Run (tx, r);
467                                                 tx.Commit ();
468                                                 success = true;
469                                                 return msg;
470                                         } finally {
471                                                 if (!success)
472                                                         tx.Abort ();
473                                         }
474                                 }
475
476                         case MessageQueueTransactionType.None:
477                                 return Run (r);
478
479                         default:
480                                 throw new NotSupportedException(transactionType + " not supported");
481                         }
482                 }               
483
484                 private IMessage Run (IMessageQueueTransaction transaction,
485                                       TxReceiver.DoReceive r)
486                 {
487                         TxReceiver txr = new TxReceiver (this, r);
488                         RabbitMQMessageQueueTransaction tx = 
489                                 (RabbitMQMessageQueueTransaction) transaction;
490                         return tx.RunReceive (txr.ReceiveInContext);                    
491                 }
492                 
493                 private IMessage Run (TxReceiver.DoReceive r)
494                 {
495                         ConnectionFactory cf = new ConnectionFactory ();
496                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
497                                 using (IModel model = cn.CreateModel ()) {
498                                         return r (this, model);
499                                 }
500                         }
501                 }
502                 
503                 private IMessage ReceiveInContext (ref string host, ref IConnection cn, 
504                                                    ref IModel model, string txId)
505                 {
506                         if (host == null)
507                                 host = QRef.Host;
508                         else if (host != QRef.Host)
509                                 throw new MonoMessagingException ("Transactions can not span multiple hosts");
510                         
511                         if (cn == null) {
512                                 ConnectionFactory cf = new ConnectionFactory ();
513                                 cn = cf.CreateConnection (host);
514                         }
515                         
516                         if (model == null) {
517                                 model = cn.CreateModel ();
518                                 model.TxSelect ();
519                         }
520                         
521                         return Receive (model, -1, true);
522                 }               
523
524                 private class TxReceiver
525                 {
526                         private readonly DoReceive doReceive;
527                         private readonly RabbitMQMessageQueue q;
528                         
529                         public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
530                                 this.q = q;
531                                 this.doReceive = doReceive;
532                         }
533                         
534                         public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
535                         
536                         public IMessage ReceiveInContext (ref string host, ref IConnection cn, 
537                                                           ref IModel model, string txId)
538                         {
539                                 if (host == null)
540                                         host = q.QRef.Host;
541                                 else if (host != q.QRef.Host)
542                                         throw new MonoMessagingException ("Transactions can not span multiple hosts");
543                                 
544                                 if (cn == null) {
545                                         ConnectionFactory cf = new ConnectionFactory ();
546                                         cn = cf.CreateConnection (host);
547                                 }
548                                 
549                                 if (model == null) {
550                                         model = cn.CreateModel ();
551                                         model.TxSelect ();
552                                 }
553                                 
554                                 return doReceive (q, model);
555                         }
556                 }
557                 
558                 private class DoReceiveWithTimeout
559                 {
560                         private readonly int timeout;
561                         private readonly IsMatch matcher;
562                         private readonly bool ack;
563                         
564                         public DoReceiveWithTimeout (int timeout, IsMatch matcher)
565                                 : this (timeout, matcher, true)
566                         {
567                         }
568                         
569                         public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
570                         {
571                                 if (matcher != null && timeout == -1)
572                                         this.timeout = 500;
573                                 else 
574                                         this.timeout = timeout;
575                                 this.matcher = matcher;
576                                 this.ack = ack;
577                         }
578                         
579                         public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
580                         {
581                                 if (matcher == null)
582                                         return q.Receive (model, timeout, ack);
583                                 else
584                                         return q.Receive (model, timeout, ack, matcher);
585                         }
586                 }
587                 
588                 private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
589                                                               IsMatch matcher)
590                 {
591                         int to = TimeSpanToInt32 (timeout);
592                         return new DoReceiveWithTimeout (to, matcher).DoReceive;
593                 }
594                 
595                 private static TxReceiver.DoReceive Receiver (IsMatch matcher)
596                 {
597                         return new DoReceiveWithTimeout (-1, matcher).DoReceive;
598                 }
599                 
600                 private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
601                 {
602                         int to = TimeSpanToInt32 (timeout);
603                         return new DoReceiveWithTimeout (to, null).DoReceive;
604                 }
605
606                 private TxReceiver.DoReceive Receiver ()
607                 {
608                         return new DoReceiveWithTimeout (-1, null).DoReceive;
609                 }               
610                 
611                 private TxReceiver.DoReceive Peeker ()
612                 {
613                         return new DoReceiveWithTimeout (-1, null).DoReceive;
614                 }               
615                 
616                 private TxReceiver.DoReceive Peeker (TimeSpan timeout)
617                 {
618                         int to = TimeSpanToInt32 (timeout);
619                         return new DoReceiveWithTimeout (to, null, false).DoReceive;
620                 }               
621                 
622                 private TxReceiver.DoReceive Peeker (IsMatch matcher)
623                 {
624                         return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
625                 }               
626                 
627                 private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
628                 {
629                         int to = TimeSpanToInt32 (timeout);
630                         return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
631                 }
632                 
633                 delegate bool IsMatch (BasicDeliverEventArgs result);           
634                 
635                 private class IdMatcher
636                 {
637                         private readonly string id;
638                         public IdMatcher (string id)
639                         {
640                                 this.id = id;
641                         }
642                         
643                         public bool MatchById (BasicDeliverEventArgs result)
644                         {
645                                 return result.BasicProperties.MessageId == id;
646                         }
647                 }
648                 
649                 private static IsMatch ById (string id)
650                 {
651                         return new IdMatcher (id).MatchById;
652                 }
653                 
654                 private class CorrelationIdMatcher
655                 {
656                         private readonly string correlationId;
657                         public CorrelationIdMatcher (string correlationId)
658                         {
659                                 this.correlationId = correlationId;
660                         }
661                         
662                         public bool MatchById (BasicDeliverEventArgs result)
663                         {
664                                 return result.BasicProperties.CorrelationId == correlationId;
665                         }
666                 }
667                 
668                 private static IsMatch ByCorrelationId (string correlationId)
669                 {
670                         return new CorrelationIdMatcher (correlationId).MatchById;
671                 }
672                 
673                 private IMessage Receive (IModel model, int timeout, bool doAck)
674                 {
675                         ushort ticket = model.AccessRequest (realm);
676                         string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
677                         
678                         using (Subscription sub = new Subscription (model, ticket, finalName)) {
679                                 BasicDeliverEventArgs result;
680                                 if (sub.Next (timeout, out result)) {
681                                         IMessage m = helper.ReadMessage (QRef, result);
682                                         if (doAck)
683                                                 sub.Ack (result);
684                                         return m;
685                                 } else {
686                                         throw new MonoMessagingException ("No Message Available");
687                                 }
688                         }
689                 }
690                                 
691                 private IMessage Receive (IModel model, int timeout, 
692                                           bool doAck, IsMatch matcher)
693                 {
694                         ushort ticket = model.AccessRequest (realm);
695                         string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
696                         
697                         using (Subscription sub = new Subscription (model, ticket, finalName)) {
698                                 BasicDeliverEventArgs result;
699                                 while (sub.Next (timeout, out result)) {
700                                         
701                                         if (matcher (result)) {
702                                                 IMessage m = helper.ReadMessage (QRef, result);
703                                                 if (doAck)
704                                                         sub.Ack (result);
705                                                 return m;
706                                         }
707                                 }
708                                 
709                                 throw new MessageUnavailableException ("Message not available");
710                         }
711                 }
712                 
713                 private RabbitMQMessageQueueTransaction GetTx ()
714                 {
715                         return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
716                 }               
717                 
718                 private static int TimeSpanToInt32 (TimeSpan timespan)
719                 {
720                         if (timespan == TimeSpan.MaxValue)
721                                 return -1;
722                         else
723                                 return (int) timespan.TotalMilliseconds;
724                 }
725         }
726 }