Apply RabbitMQ support patch by Michael Barker, on bug #457089.
[mono.git] / mcs / class / Mono.Messaging.RabbitMQ / Mono.Messaging.RabbitMQ / RabbitMQMessageQueue.cs
1 //
2 // Mono.Messaging.RabbitMQ
3 //
4 // Authors:
5 //        Michael Barker (mike@middlesoft.co.uk)
6 //
7 // (C) 2008 Michael Barker
8 //
9
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining
12 // a copy of this software and associated documentation files (the
13 // "Software"), to deal in the Software without restriction, including
14 // without limitation the rights to use, copy, modify, merge, publish,
15 // distribute, sublicense, and/or sell copies of the Software, and to
16 // permit persons to whom the Software is furnished to do so, subject to
17 // the following conditions:
18 // 
19 // The above copyright notice and this permission notice shall be
20 // included in all copies or substantial portions of the Software.
21 // 
22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
23 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
24 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
25 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
26 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
27 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30
31 using System;
32 using System.Collections;
33 using System.ComponentModel;
34 using System.IO;
35 using System.Text;
36
37 using RabbitMQ.Client;
38 using RabbitMQ.Client.Content;
39 using RabbitMQ.Client.Events;
40 using RabbitMQ.Client.Exceptions;
41 using RabbitMQ.Client.MessagePatterns;
42 using RabbitMQ.Util;
43
44 namespace Mono.Messaging.RabbitMQ {
45
46         public class RabbitMQMessageQueue : IMessageQueue {
47                 
48                 private bool authenticate = false;
49                 private short basePriority = 0;
50                 private Guid category = Guid.Empty;
51                 private bool denySharedReceive = false;
52                 private EncryptionRequired encryptionRequired;
53                 private long maximumJournalSize = -1;
54                 private long maximumQueueSize = -1;
55                 private ISynchronizeInvoke synchronizingObject = null;
56                 private bool useJournalQueue = false;
57                 private QueueReference qRef = QueueReference.DEFAULT;
58                 private readonly RabbitMQMessagingProvider provider;
59                 private readonly MessageFactory helper;
60                 private readonly string realm;
61                 private readonly bool transactional;
62                 
63                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
64                                              bool transactional)
65                         : this (provider, QueueReference.DEFAULT, transactional)
66                 {
67                 }
68                 
69                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
70                                              QueueReference qRef, 
71                                              bool transactional)
72                         : this (provider, "/data", qRef, transactional)
73                 {
74                 }
75                 
76                 public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
77                                              string realm, QueueReference qRef,
78                                              bool transactional)
79                 {
80                         this.provider = provider;
81                         this.helper = new MessageFactory (provider);
82                         this.realm = realm;
83                         this.qRef = qRef;
84                         this.transactional = transactional;
85                 }
86
87                 public bool Authenticate {
88                         get { return authenticate; }
89                         set { authenticate = value; }
90                 }
91
92                 public short BasePriority {
93                         get { return basePriority; }
94                         set { basePriority = value; }
95                 }
96
97                 public bool CanRead {
98                         get { throw new NotImplementedException (); }
99                 }
100                 
101                 public bool CanWrite {
102                         get { throw new NotImplementedException (); }
103                 }
104                 
105                 public Guid Category {
106                         get { return category; }
107                         set { category = value; }
108                 }
109                 
110                 public DateTime CreateTime {
111                         get { throw new NotImplementedException (); }
112                 }
113                 
114                 public bool DenySharedReceive {
115                         get { return denySharedReceive; }
116                         set { denySharedReceive = value; }
117                 }
118                 
119                 public EncryptionRequired EncryptionRequired {
120                         get { return encryptionRequired; }
121                         set { encryptionRequired = value; }
122                 }
123                 
124                 public Guid Id {
125                         get { throw new NotImplementedException (); }
126                 }
127                 
128                 public DateTime LastModifyTime {
129                         get { throw new NotImplementedException (); }
130                 }
131                 
132                 public long MaximumJournalSize {
133                         get { return maximumJournalSize; }
134                         set { maximumJournalSize = value; }
135                 }
136                 
137                 public long MaximumQueueSize {
138                         get { return maximumQueueSize; }
139                         set { maximumQueueSize = value; }
140                 }
141                 
142                 public IntPtr ReadHandle {
143                         get { throw new NotImplementedException (); }
144                 }
145                 
146                 public ISynchronizeInvoke SynchronizingObject {
147                         get { return synchronizingObject; }
148                         set { synchronizingObject = value; }
149                 }
150                 
151                 public bool Transactional {
152                         get { return transactional; }
153                 }
154                 
155                 public bool UseJournalQueue {
156                         get { return useJournalQueue; }
157                         set { useJournalQueue = value; }
158                 }
159                 
160                 public IntPtr WriteHandle {
161                         get { throw new NotImplementedException (); }
162                 }
163                 
164                 public QueueReference QRef {
165                         get { return qRef; }
166                         set { qRef = value; }
167                 }
168                 
169                 private static long GetVersion (IConnection cn)
170                 {
171                         long version = cn.Protocol.MajorVersion;
172                         version = version << 32;
173                         version += cn.Protocol.MinorVersion;
174                         return version;
175                 }
176                 
177                 private void SetDeliveryInfo (IMessage msg, long senderVersion,
178                                               string transactionId)
179                 {
180                         msg.SetDeliveryInfo (Acknowledgment.None,
181                                              DateTime.MinValue,
182                                              this,
183                                              Guid.NewGuid ().ToString () + "\\0",
184                                              MessageType.Normal,
185                                              new byte[0],
186                                              senderVersion,
187                                              DateTime.UtcNow,
188                                              null,
189                                              transactionId);
190                 }
191                 
192                 public void Close ()
193                 {
194                         // No-op (Queue are currently stateless)
195                 }
196                 
197                 public static void Delete (string realm, QueueReference qRef)
198                 {
199                         ConnectionFactory cf = new ConnectionFactory ();
200                         
201                         using (IConnection cn = cf.CreateConnection (qRef.Host)) {
202                                 using (IModel model = cn.CreateModel ()) {
203                                         ushort ticket = model.AccessRequest (realm);
204                                         model.QueueDelete (ticket, qRef.Queue, false, false, false);
205                                 }
206                         }
207                 }                       
208                 
209                 public void Send (IMessage msg)
210                 {
211                         if (QRef == QueueReference.DEFAULT)
212                                 throw new MonoMessagingException ("Path has not been specified");
213                         
214                         if (msg.BodyStream == null)
215                                 throw new ArgumentException ("Message is not serialized properly");
216                 
217                         ConnectionFactory cf = new ConnectionFactory ();
218                         
219                         try {
220                                 using (IConnection cn = cf.CreateConnection (QRef.Host)) {
221                                         SetDeliveryInfo (msg, GetVersion (cn), null);
222                                         using (IModel ch = cn.CreateModel ()) {
223                                                 Send (ch, msg);
224                                         }
225                                 }
226                         } catch (BrokerUnreachableException e) {
227                                 throw new ConnectionException (QRef, e);
228                         }
229                 }
230                 
231                 public void Send (IMessage msg, IMessageQueueTransaction transaction)
232                 {
233                         if (QRef == QueueReference.DEFAULT)
234                                 throw new MonoMessagingException ("Path has not been specified");
235                         
236                         if (msg.BodyStream == null)
237                                 throw new ArgumentException ("Message is not serialized properly");
238                         
239                         RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
240                         
241                         tx.RunSend (SendInContext, msg);
242                 }
243                 
244                 public void Send (IMessage msg, MessageQueueTransactionType transactionType)
245                 {
246                         switch (transactionType) {
247                         case MessageQueueTransactionType.Single:
248                                 using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
249                                         try {
250                                                 Send (msg, tx);
251                                                 tx.Commit ();
252                                         } catch (Exception e) {
253                                                 tx.Abort ();
254                                                 throw new MonoMessagingException(e.Message, e);
255                                         }
256                                 }
257                                 break;
258
259                         case MessageQueueTransactionType.None:
260                                 Send (msg);
261                                 break;
262
263                         case MessageQueueTransactionType.Automatic:
264                                 throw new NotSupportedException("Automatic transaction types not supported");
265                         }
266                 }
267                 
268                 private void SendInContext (ref string host, ref IConnection cn, 
269                                             ref IModel model, IMessage msg, string txId)
270                 {
271                         if (host == null)
272                                 host = QRef.Host;
273                         else if (host != QRef.Host)
274                                 throw new MonoMessagingException ("Transactions can not span multiple hosts");
275                         
276                         if (cn == null) {
277                                 ConnectionFactory cf = new ConnectionFactory ();
278                                 cn = cf.CreateConnection (host);
279                         }
280                         
281                         if (model == null) {
282                                 model = cn.CreateModel ();
283                                 model.TxSelect ();
284                         }
285                         
286                         SetDeliveryInfo (msg, GetVersion (cn), txId);
287                         Send (model, msg);
288                 }
289                 
290                 private void Send (IModel model, IMessage msg)
291                 {
292                         ushort ticket = model.AccessRequest ("/data");
293                         string finalName = model.QueueDeclare (ticket, QRef.Queue, true);
294                         IMessageBuilder mb = helper.WriteMessage (model, msg);
295
296                         model.BasicPublish (ticket, "", finalName,
297                                             (IBasicProperties) mb.GetContentHeader(),
298                                             mb.GetContentBody ());
299                 }
300                 
301                 public void Purge ()
302                 {
303                         ConnectionFactory cf = new ConnectionFactory ();
304
305                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
306                                 using (IModel model = cn.CreateModel ()) {
307                                         ushort ticket = model.AccessRequest (realm);
308                                         model.QueuePurge (ticket, QRef.Queue, false);
309                                 }
310                         }
311                 }
312                 
313                 public IMessage Peek ()
314                 {
315                         ConnectionFactory cf = new ConnectionFactory ();
316
317                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
318                                 using (IModel ch = cn.CreateModel ()) {
319                                         return Receive (ch, -1, false);
320                                 }
321                         }
322                 }
323                 
324                 public IMessage Peek (TimeSpan timeout)
325                 {
326                         return Run (Peeker (timeout));
327 //                      ConnectionFactory cf = new ConnectionFactory ();
328 //
329 //                      using (IConnection cn = cf.CreateConnection (QRef.Host)) {
330 //                              using (IModel ch = cn.CreateModel ()) {
331 //                                      if (timeout == TimeSpan.MaxValue) {
332 //                                              return Receive (ch, -1, false);
333 //                                      } else {
334 //                                              return Receive (ch, (int) timeout.TotalMilliseconds, false);
335 //                                      }
336 //                              }
337 //                      }
338                 }
339                 
340                 public IMessage PeekById (string id)
341                 {
342                         return Run (Peeker (ById (id)));
343 //                      ConnectionFactory cf = new ConnectionFactory ();
344 //
345 //                      using (IConnection cn = cf.CreateConnection (QRef.Host)) {
346 //                              using (IModel ch = cn.CreateModel ()) {
347 //                                      return Receive (ch, 500, true, new IdMatcher (id).MatchById);
348 //                              }
349 //                      }
350                 }
351
352                 public IMessage PeekById (string id, TimeSpan timeout)
353                 {
354                         return Run (Peeker (timeout, ById (id)));
355                 }
356                 
357                 public IMessage PeekByCorrelationId (string id)
358                 {
359                         return Run (Peeker (ByCorrelationId (id)));
360 //                      ConnectionFactory cf = new ConnectionFactory ();
361 //
362 //                      using (IConnection cn = cf.CreateConnection (QRef.Host)) {
363 //                              using (IModel ch = cn.CreateModel ()) {
364 //                                      return Receive (ch, 500, false, 
365 //                                                      new CorrelationIdMatcher (id).MatchById);
366 //                              }
367 //                      }
368                 }
369
370                 public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
371                 {
372                         return Run (Peeker (timeout, ByCorrelationId (id)));
373                 }
374                 
375                 public IMessage Receive ()
376                 {
377                         return Run (Receiver ());
378                 }
379                 
380                 public IMessage Receive (TimeSpan timeout)
381                 {
382                         return Run (Receiver (timeout));
383                 }
384                 
385                 public IMessage Receive (TimeSpan timeout,
386                                          IMessageQueueTransaction transaction)
387                 {
388                         return Run (transaction, Receiver (timeout));
389                 }
390                 
391                 public IMessage Receive (TimeSpan timeout,
392                                          MessageQueueTransactionType transactionType)
393                 {
394                         return Run (transactionType, Receiver (timeout));
395                 }
396                 
397                 public IMessage Receive (IMessageQueueTransaction transaction)
398                 {
399                         return Run (transaction, Receiver());
400                 }
401                 
402                 public IMessage Receive (MessageQueueTransactionType transactionType)
403                 {
404                         return Run (transactionType, Receiver ());
405                 }               
406
407                 public IMessage ReceiveById (string id)
408                 {
409                         return Run (Receiver (ById (id)));
410                 }
411
412                 public IMessage ReceiveById (string id, TimeSpan timeout)
413                 {
414                         return Run (Receiver (timeout, ById (id)));
415                 }
416                 
417                 public IMessage ReceiveById (string id,
418                                              IMessageQueueTransaction transaction)
419                 {
420                         return Run (transaction, Receiver (ById (id)));
421                 }
422                 
423                 public IMessage ReceiveById (string id,
424                                              MessageQueueTransactionType transactionType)
425                 {
426                         return Run (transactionType, Receiver (ById (id)));
427                 }
428                 
429                 public IMessage ReceiveById (string id, TimeSpan timeout,
430                                              IMessageQueueTransaction transaction)
431                 {
432                         return Run (transaction, Receiver (timeout, ById (id)));
433                 }
434                 
435                 public IMessage ReceiveById (string id, TimeSpan timeout,
436                                              MessageQueueTransactionType transactionType)
437                 {
438                         return Run (transactionType, Receiver (timeout, ById (id)));
439                 }
440                 
441                 public IMessage ReceiveByCorrelationId (string id)
442                 {
443                         return Run (Receiver (ByCorrelationId (id)));
444                 }
445                 
446                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
447                 {
448                         return Run (Receiver (timeout, ByCorrelationId (id)));
449                 }
450                 
451                 public IMessage ReceiveByCorrelationId (string id,
452                                                         IMessageQueueTransaction transaction)
453                 {
454                         return Run (transaction, Receiver (ByCorrelationId (id)));
455                 }
456                 
457                 public IMessage ReceiveByCorrelationId (string id,
458                                                         MessageQueueTransactionType transactionType)
459                 {
460                         return Run (transactionType, Receiver (ByCorrelationId (id)));
461                 }
462                 
463                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
464                                                         IMessageQueueTransaction transaction)
465                 {
466                         return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
467                 }
468                 
469                 public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
470                                                         MessageQueueTransactionType transactionType)
471                 {
472                         return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
473                 }
474                 
475                 public IMessageEnumerator GetMessageEnumerator ()
476                 {
477                         return new RabbitMQMessageEnumerator (helper, QRef);
478                 }
479                 
480                 private IMessage Run (MessageQueueTransactionType transactionType,
481                                       TxReceiver.DoReceive r)
482                 {
483                         switch (transactionType) {
484                         case MessageQueueTransactionType.Single:
485                                 using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
486                                         bool success = false;
487                                         try {
488                                                 IMessage msg = Run (tx, r);
489                                                 tx.Commit ();
490                                                 success = true;
491                                                 return msg;
492                                         } finally {
493                                                 if (!success)
494                                                         tx.Abort ();
495                                         }
496                                 }
497
498                         case MessageQueueTransactionType.None:
499                                 return Run (r);
500
501                         default:
502                                 throw new NotSupportedException(transactionType + " not supported");
503                         }
504                 }               
505
506                 private IMessage Run (IMessageQueueTransaction transaction,
507                                       TxReceiver.DoReceive r)
508                 {
509                         TxReceiver txr = new TxReceiver (this, r);
510                         RabbitMQMessageQueueTransaction tx = 
511                                 (RabbitMQMessageQueueTransaction) transaction;
512                         return tx.RunReceive (txr.ReceiveInContext);                    
513                 }
514                 
515                 private IMessage Run (TxReceiver.DoReceive r)
516                 {
517                         ConnectionFactory cf = new ConnectionFactory ();
518                         using (IConnection cn = cf.CreateConnection (QRef.Host)) {
519                                 using (IModel model = cn.CreateModel ()) {
520                                         return r (this, model);
521                                 }
522                         }
523                 }
524                 
525                 private IMessage ReceiveInContext (ref string host, ref IConnection cn, 
526                                                    ref IModel model, string txId)
527                 {
528                         if (host == null)
529                                 host = QRef.Host;
530                         else if (host != QRef.Host)
531                                 throw new MonoMessagingException ("Transactions can not span multiple hosts");
532                         
533                         if (cn == null) {
534                                 ConnectionFactory cf = new ConnectionFactory ();
535                                 cn = cf.CreateConnection (host);
536                         }
537                         
538                         if (model == null) {
539                                 model = cn.CreateModel ();
540                                 model.TxSelect ();
541                         }
542                         
543                         return Receive (model, -1, true);
544                 }               
545
546                 private class TxReceiver
547                 {
548                         private readonly DoReceive doReceive;
549                         private readonly RabbitMQMessageQueue q;
550                         
551                         public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
552                                 this.q = q;
553                                 this.doReceive = doReceive;
554                         }
555                         
556                         public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
557                         
558                         public IMessage ReceiveInContext (ref string host, ref IConnection cn, 
559                                                           ref IModel model, string txId)
560                         {
561                                 if (host == null)
562                                         host = q.QRef.Host;
563                                 else if (host != q.QRef.Host)
564                                         throw new MonoMessagingException ("Transactions can not span multiple hosts");
565                                 
566                                 if (cn == null) {
567                                         ConnectionFactory cf = new ConnectionFactory ();
568                                         cn = cf.CreateConnection (host);
569                                 }
570                                 
571                                 if (model == null) {
572                                         model = cn.CreateModel ();
573                                         model.TxSelect ();
574                                 }
575                                 
576                                 return doReceive (q, model);
577                         }
578                 }
579                 
580                 private class DoReceiveWithTimeout
581                 {
582                         private readonly int timeout;
583                         private readonly IsMatch matcher;
584                         private readonly bool ack;
585                         
586                         public DoReceiveWithTimeout (int timeout, IsMatch matcher)
587                                 : this (timeout, matcher, true)
588                         {
589                         }
590                         
591                         public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
592                         {
593                                 if (matcher != null && timeout == -1)
594                                         this.timeout = 500;
595                                 else 
596                                         this.timeout = timeout;
597                                 this.matcher = matcher;
598                                 this.ack = ack;
599                         }
600                         
601                         public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
602                         {
603                                 if (matcher == null)
604                                         return q.Receive (model, timeout, ack);
605                                 else
606                                         return q.Receive (model, timeout, ack, matcher);
607                         }
608                 }
609                 
610                 private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
611                                                               IsMatch matcher)
612                 {
613                         int to = TimeSpanToInt32 (timeout);
614                         return new DoReceiveWithTimeout (to, matcher).DoReceive;
615                 }
616                 
617                 private static TxReceiver.DoReceive Receiver (IsMatch matcher)
618                 {
619                         return new DoReceiveWithTimeout (-1, matcher).DoReceive;
620                 }
621                 
622                 private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
623                 {
624                         int to = TimeSpanToInt32 (timeout);
625                         return new DoReceiveWithTimeout (to, null).DoReceive;
626                 }
627
628                 private TxReceiver.DoReceive Receiver ()
629                 {
630                         return new DoReceiveWithTimeout (-1, null).DoReceive;
631                 }               
632                 
633                 private TxReceiver.DoReceive Peeker ()
634                 {
635                         return new DoReceiveWithTimeout (-1, null).DoReceive;
636                 }               
637                 
638                 private TxReceiver.DoReceive Peeker (TimeSpan timeout)
639                 {
640                         int to = TimeSpanToInt32 (timeout);
641                         return new DoReceiveWithTimeout (to, null, false).DoReceive;
642                 }               
643                 
644                 private TxReceiver.DoReceive Peeker (IsMatch matcher)
645                 {
646                         return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
647                 }               
648                 
649                 private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
650                 {
651                         int to = TimeSpanToInt32 (timeout);
652                         return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
653                 }
654                 
655                 delegate bool IsMatch (BasicDeliverEventArgs result);           
656                 
657                 private class IdMatcher
658                 {
659                         private readonly string id;
660                         public IdMatcher (string id)
661                         {
662                                 this.id = id;
663                         }
664                         
665                         public bool MatchById (BasicDeliverEventArgs result)
666                         {
667                                 return result.BasicProperties.MessageId == id;
668                         }
669                 }
670                 
671                 private static IsMatch ById (string id)
672                 {
673                         return new IdMatcher (id).MatchById;
674                 }
675                 
676                 private class CorrelationIdMatcher
677                 {
678                         private readonly string correlationId;
679                         public CorrelationIdMatcher (string correlationId)
680                         {
681                                 this.correlationId = correlationId;
682                         }
683                         
684                         public bool MatchById (BasicDeliverEventArgs result)
685                         {
686                                 return result.BasicProperties.CorrelationId == correlationId;
687                         }
688                 }
689                 
690                 private static IsMatch ByCorrelationId (string correlationId)
691                 {
692                         return new CorrelationIdMatcher (correlationId).MatchById;
693                 }
694                 
695                 private IMessage Receive (IModel model, int timeout, bool doAck)
696                 {
697                         Console.WriteLine ("{0}, {1}", timeout, doAck);
698                         
699                         ushort ticket = model.AccessRequest (realm);
700                         string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
701                         
702                         using (Subscription sub = new Subscription (model, ticket, finalName)) {
703                                 BasicDeliverEventArgs result;
704                                 if (sub.Next (timeout, out result)) {
705                                         IMessage m = helper.ReadMessage (QRef, result);
706                                         if (doAck)
707                                                 sub.Ack (result);
708                                         return m;
709                                 } else {
710                                         throw new MonoMessagingException ("No Message Available");
711                                 }
712                         }
713                 }
714                                 
715                 private IMessage Receive (IModel model, int timeout, 
716                                           bool doAck, IsMatch matcher)
717                 {
718                         Console.WriteLine ("{0}, {1}", timeout, doAck);
719                         
720                         ushort ticket = model.AccessRequest (realm);
721                         string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
722                         
723                         using (Subscription sub = new Subscription (model, ticket, finalName)) {
724                                 BasicDeliverEventArgs result;
725                                 while (sub.Next (timeout, out result)) {
726                                         
727                                         if (matcher (result)) {
728                                                 IMessage m = helper.ReadMessage (QRef, result);
729                                                 if (doAck)
730                                                         sub.Ack (result);
731                                                 return m;
732                                         }
733                                 }
734                                 
735                                 throw new MessageUnavailableException ("Message not available");
736                         }
737                 }
738                 
739                 private RabbitMQMessageQueueTransaction GetTx ()
740                 {
741                         return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
742                 }               
743                 
744                 private static int TimeSpanToInt32 (TimeSpan timespan)
745                 {
746                         if (timespan == TimeSpan.MaxValue)
747                                 return -1;
748                         else
749                                 return (int) timespan.TotalMilliseconds;
750                 }
751         }
752 }