956fb360ecd2ff647846c2341ac13575c1351256
[mono.git] / mcs / class / RabbitMQ.Client / src / client / messagepatterns / Subscription.cs
1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
3 //
4 // The APL v2.0:
5 //
6 //---------------------------------------------------------------------------
7 //   Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial
8 //   Technologies LLC., and Rabbit Technologies Ltd.
9 //
10 //   Licensed under the Apache License, Version 2.0 (the "License");
11 //   you may not use this file except in compliance with the License.
12 //   You may obtain a copy of the License at
13 //
14 //       http://www.apache.org/licenses/LICENSE-2.0
15 //
16 //   Unless required by applicable law or agreed to in writing, software
17 //   distributed under the License is distributed on an "AS IS" BASIS,
18 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 //   See the License for the specific language governing permissions and
20 //   limitations under the License.
21 //---------------------------------------------------------------------------
22 //
23 // The MPL v1.1:
24 //
25 //---------------------------------------------------------------------------
26 //   The contents of this file are subject to the Mozilla Public License
27 //   Version 1.1 (the "License"); you may not use this file except in
28 //   compliance with the License. You may obtain a copy of the License at
29 //   http://www.rabbitmq.com/mpl.html
30 //
31 //   Software distributed under the License is distributed on an "AS IS"
32 //   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 //   License for the specific language governing rights and limitations
34 //   under the License.
35 //
36 //   The Original Code is The RabbitMQ .NET Client.
37 //
38 //   The Initial Developers of the Original Code are LShift Ltd,
39 //   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
40 //
41 //   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42 //   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43 //   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44 //   Technologies LLC, and Rabbit Technologies Ltd.
45 //
46 //   Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
47 //   Ltd. Portions created by Cohesive Financial Technologies LLC are
48 //   Copyright (C) 2007-2009 Cohesive Financial Technologies
49 //   LLC. Portions created by Rabbit Technologies Ltd are Copyright
50 //   (C) 2007-2009 Rabbit Technologies Ltd.
51 //
52 //   All Rights Reserved.
53 //
54 //   Contributor(s): ______________________________________.
55 //
56 //---------------------------------------------------------------------------
57 using System;
58 using System.IO;
59 using System.Collections;
60
61 using RabbitMQ.Client;
62 using RabbitMQ.Client.Exceptions;
63 using RabbitMQ.Client.Events;
64 using RabbitMQ.Util;
65
66 namespace RabbitMQ.Client.MessagePatterns {
67     ///<summary>Manages a subscription to a queue or exchange.</summary>
68     ///<remarks>
69     ///<para>
70     /// This convenience class abstracts away from much of the detail
71     /// involved in receiving messages from a queue or an exchange.
72     ///</para>
73     ///<para>
74     /// Once created, the Subscription consumes from a queue (using a
75     /// QueueingBasicConsumer). Received deliveries can be retrieved
76     /// by calling Next(), or by using the Subscription as an
77     /// IEnumerator in, for example, a foreach loop.
78     ///</para>
79     ///<para>
80     /// See the documentation for Bind() and for the various overloads
81     /// of the constructor for the various styles of binding and
82     /// subscription that are available.
83     ///</para>
84     ///<para>
85     /// Note that if the "noAck" option is enabled (which it is by
86     /// default), then received deliveries are automatically acked
87     /// within the server before they are even transmitted across the
88     /// network to us. Calling Ack() on received events will always do
89     /// the right thing: if "noAck" is enabled, nothing is done on an
90     /// Ack() call, and if "noAck" is disabled, IModel.BasicAck() is
91     /// called with the correct parameters.
92     ///</para>
93     ///</remarks>
94     public class Subscription: IEnumerable, IEnumerator, IDisposable {
95         protected IModel m_model;
96
97         ///<summary>Retrieve the IModel our subscription is carried by.</summary>
98         public IModel Model { get { return m_model; } }
99
100         protected string m_queueName;
101         protected QueueingBasicConsumer m_consumer;
102         protected string m_consumerTag;
103         protected bool m_noAck;
104         protected bool m_shouldDelete;
105
106         ///<summary>Retrieve the queue name we have subscribed to. May
107         ///be a server-generated name, depending on how the
108         ///Subscription was constructed.</summary>
109         public string QueueName { get { return m_queueName; } }
110         ///<summary>Retrieve the IBasicConsumer that is receiving the
111         ///messages from the server for us. Normally, you will not
112         ///need to access this property - use Next() and friends
113         ///instead.</summary>
114         public IBasicConsumer Consumer { get { return m_consumer; } }
115         ///<summary>Retrieve the consumer-tag that this subscription
116         ///is using. Will usually be a server-generated
117         ///name.</summary>
118         public string ConsumerTag { get { return m_consumerTag; } }
119         ///<summary>Returns true if we are in "noAck" mode, where
120         ///calls to Ack() will be no-ops, and where the server acks
121         ///messages before they are delivered to us. Returns false if
122         ///we are in a mode where calls to Ack() are required, and
123         ///where such calls will actually send an acknowledgement
124         ///message across the network to the server.</summary>
125         public bool NoAck { get { return m_noAck; } }
126
127         protected BasicDeliverEventArgs m_latestEvent;
128
129         ///<summary>Returns the most recent value returned by Next(),
130         ///or null when either no values have been retrieved yet, or
131         ///the most recent value has already been Ack()ed. See also
132         ///the documentation for Ack().</summary>
133         public BasicDeliverEventArgs LatestEvent { get { return m_latestEvent; } }
134
135         ///<summary>Creates a new Subscription in "noAck" mode,
136         ///consuming from a fresh, autodelete, anonymous queue. The
137         ///name of the queue can be retrieved using the QueueName
138         ///property of the Subscription. After creating the queue, the
139         ///queue is bound to the named exchange, using Bind() with the
140         ///given routingKey bind parameter.</summary>
141         public Subscription(IModel model, string exchangeName,
142                             string exchangeType, string routingKey)
143             : this(model)
144         {
145             Bind(exchangeName, exchangeType, routingKey);
146         }
147
148         ///<summary>Creates a new Subscription in "noAck" mode,
149         ///consuming from a fresh, autodelete, anonymous queue. The
150         ///name of the queue can be retrieved using the QueueName
151         ///property of the Subscription.</summary>
152         public Subscription(IModel model)
153             : this(model, null) {}
154
155         ///<summary>Creates a new Subscription in "noAck" mode,
156         ///consuming from a named queue. If the queueName parameter is
157         ///null or the empty-string, creates a fresh, autodelete,
158         ///anonymous queue; otherwise, the queue is declared using
159         ///IModel.QueueDeclare() before IModel.BasicConsume() is
160         ///called. After declaring the queue and starting the
161         ///consumer, the queue is bound to the named exchange, using
162         ///Bind() with the given routingKey bind parameter.</summary>
163         public Subscription(IModel model, string queueName, string exchangeName,
164                             string exchangeType, string routingKey)
165             : this(model, queueName)
166         {
167             Bind(exchangeName, exchangeType, routingKey);
168         }
169
170         ///<summary>Creates a new Subscription in "noAck" mode,
171         ///consuming from a named queue. If the queueName parameter is
172         ///null or the empty-string, creates a fresh, autodelete,
173         ///anonymous queue; otherwise, the queue is declared using
174         ///IModel.QueueDeclare() before IModel.BasicConsume() is
175         ///called.</summary>
176         public Subscription(IModel model, string queueName)
177             : this(model, queueName, true) {}
178
179         ///<summary>Creates a new Subscription, with full control over
180         ///both "noAck" mode and the name of the queue (which, if null
181         ///or the empty-string, will be a fresh autodelete queue, as
182         ///for the other constructor overloads). After declaring the
183         ///queue and starting the consumer, the queue is bound to the
184         ///named exchange, using Bind() with the given routingKey bind
185         ///parameter.</summary>
186         public Subscription(IModel model, string queueName, bool noAck,
187                             string exchangeName, string exchangeType, string routingKey)
188             : this(model, queueName, noAck)
189         {
190             Bind(exchangeName, exchangeType, routingKey);
191         }
192
193         ///<summary>Creates a new Subscription, with full control over
194         ///both "noAck" mode and the name of the queue (which, if null
195         ///or the empty-string, will be a fresh autodelete queue, as
196         ///for the other constructor overloads).</summary>
197         public Subscription(IModel model, string queueName, bool noAck)
198         {
199             m_model = model;
200             if (queueName == null || queueName.Equals("")) {
201                 m_queueName = m_model.QueueDeclare();
202                 m_shouldDelete = true;
203             } else {
204                 m_queueName = m_model.QueueDeclare(queueName);
205                 m_shouldDelete = false;
206             }
207             m_consumer = new QueueingBasicConsumer(m_model);
208             m_consumerTag = m_model.BasicConsume(m_queueName, m_noAck, null, m_consumer);
209             m_latestEvent = null;
210         }
211
212         ///<summary>Closes this Subscription, cancelling the consumer
213         ///record in the server. If an anonymous, autodelete queue
214         ///(i.e., one with a server-generated name) was created during
215         ///construction of the Subscription, this method also deletes
216         ///the created queue (which is an optimisation: autodelete
217         ///queues will be deleted when the IModel closes in any
218         ///case).</summary>
219         public void Close()
220         {
221             try {
222                 if (m_consumer != null) {
223                     m_model.BasicCancel(m_consumerTag);
224                 }
225                 if (m_shouldDelete) {
226                     m_shouldDelete = false;
227                     // We set m_shouldDelete false before attempting
228                     // the delete, because trying twice is worse than
229                     // trying once and failing.
230                     m_model.QueueDelete(m_queueName, false, false, false);
231                 }
232             } catch (OperationInterruptedException) {
233                 // We don't mind, here.
234             }
235             m_consumer = null;
236             m_consumerTag = null;
237         }
238
239         ///<summary>Causes the queue to which we have subscribed to be
240         ///bound to an exchange. Uses IModel.ExchangeDeclare and
241         ///IModel.QueueBind to (a) ensure the exchange exists, and (b)
242         ///link the exchange to our queue.</summary>
243         ///<remarks>
244         ///<para>
245         /// This method is called by some of the overloads of the
246         /// Subscription constructor.
247         ///</para>
248         ///<para>
249         /// Calling Bind() multiple times to bind to multiple
250         /// exchanges, or to bind to a single exchange more than once
251         /// with a different routingKey, is perfectly
252         /// acceptable. Calling Bind() twice with exactly the same
253         /// arguments is permitted and idempotent. For details, see
254         /// the AMQP specification.
255         ///</para>
256         ///</remarks>
257         public void Bind(string exchangeName, string exchangeType, string routingKey)
258         {
259             m_model.ExchangeDeclare(exchangeName, exchangeType);
260             m_model.QueueBind(m_queueName, exchangeName, routingKey, false, null);
261         }
262
263         ///<summary>If LatestEvent is non-null, passes it to
264         ///Ack(BasicDeliverEventArgs). Causes LatestEvent to become
265         ///null.</summary>
266         public void Ack()
267         {
268             if (m_latestEvent != null) {
269                 Ack(m_latestEvent);
270             }
271         }
272
273         ///<summary>If we are not in "noAck" mode, calls
274         ///IModel.BasicAck with the delivery-tag from the passed in
275         ///event; otherwise, sends nothing to the server. In both
276         ///cases, if the passed-in event is the same as LatestEvent
277         ///(by pointer comparison), sets LatestEvent to
278         ///null.</summary>
279         ///<remarks>
280         /// Make sure that this method is only called with events that
281         /// originated from this Subscription - other usage will have
282         /// unpredictable results.
283         ///</remarks>
284         public void Ack(BasicDeliverEventArgs evt)
285         {
286             if (evt == null) {
287                 return;
288             }
289
290             if (!m_noAck) {
291                 m_model.BasicAck(evt.DeliveryTag, false);
292             }
293
294             if (evt == m_latestEvent) {
295                 m_latestEvent = null;
296             }
297         }
298
299         ///<summary>Retrieves the next incoming delivery in our
300         ///subscription queue.</summary>
301         ///<remarks>
302         ///<para>
303         /// Returns null when the end of the stream is reached and on
304         /// every subsequent call. End-of-stream can arise through the
305         /// action of the Subscription.Close() method, or through the
306         /// closure of the IModel or its underlying IConnection.
307         ///</para>
308         ///<para>
309         /// Updates LatestEvent to the value returned.
310         ///</para>
311         ///<para>
312         /// Does not acknowledge any deliveries at all (but in "noAck"
313         /// mode, the server will have auto-acknowledged each event
314         /// before it is even sent across the wire to us).
315         ///</para>
316         ///</remarks>
317         public BasicDeliverEventArgs Next()
318         {
319             try {
320                 if (m_consumer == null) {
321                     // Closed!
322                     throw new InvalidOperationException();
323                 }
324                 m_latestEvent = (BasicDeliverEventArgs) m_consumer.Queue.Dequeue();
325             } catch (EndOfStreamException) {
326                 m_latestEvent = null;
327             }
328             return m_latestEvent;
329         }
330
331         ///<summary>Retrieves the next incoming delivery in our
332         ///subscription queue, or times out after a specified number
333         ///of milliseconds.</summary>
334         ///<remarks>
335         ///<para>
336         /// Returns false only if the timeout expires before either a
337         /// delivery appears or the end-of-stream is reached. If false
338         /// is returned, the out parameter "result" is set to null,
339         /// but LatestEvent is not updated.
340         ///</para>
341         ///<para>
342         /// Returns true to indicate a delivery or the end-of-stream.
343         ///</para>
344         ///<para>
345         /// If a delivery is already waiting in the queue, or one
346         /// arrives before the timeout expires, it is removed from the
347         /// queue and placed in the "result" out parameter. If the
348         /// end-of-stream is detected before the timeout expires,
349         /// "result" is set to null.
350         ///</para>
351         ///<para>
352         /// Whenever this method returns true, it updates LatestEvent
353         /// to the value placed in "result" before returning.
354         ///</para>
355         ///<para>
356         /// End-of-stream can arise through the action of the
357         /// Subscription.Close() method, or through the closure of the
358         /// IModel or its underlying IConnection.
359         ///</para>
360         ///<para>
361         /// This method does not acknowledge any deliveries at all
362         /// (but in "noAck" mode, the server will have
363         /// auto-acknowledged each event before it is even sent across
364         /// the wire to us).
365         ///</para>
366         ///<para>
367         /// A timeout of -1 (i.e. System.Threading.Timeout.Infinite)
368         /// will be interpreted as a command to wait for an
369         /// indefinitely long period of time for an item or the end of
370         /// the stream to become available. Usage of such a timeout is
371         /// equivalent to calling Next() with no arguments (modulo
372         /// predictable method signature differences).
373         ///</para>
374         ///</remarks>
375         public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
376         {
377             try {
378                 if (m_consumer == null) {
379                     // Closed!
380                     throw new InvalidOperationException();
381                 }
382                 object qValue;
383                 if (!m_consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
384                     result = null;
385                     return false;
386                 }
387                 m_latestEvent = (BasicDeliverEventArgs) qValue;
388             } catch (EndOfStreamException) {
389                 m_latestEvent = null;
390             }
391             result = m_latestEvent;
392             return true;
393         }
394
395         ///<summary>Implementation of the IEnumerable interface, for
396         ///permitting Subscription to be used in foreach
397         ///loops.</summary>
398         IEnumerator IEnumerable.GetEnumerator()
399         {
400             return this;
401         }
402
403         ///<summary>Implementation of the IEnumerator interface, for
404         ///permitting Subscription to be used in foreach
405         ///loops.</summary>
406         ///<remarks>
407         ///<para>
408         /// As per the IEnumerator interface definition, throws
409         /// InvalidOperationException if LatestEvent is null.
410         ///</para>
411         ///<para>
412         /// Does not acknowledge any deliveries at all. Ack() must be
413         /// called explicitly on received deliveries.
414         ///</para>
415         ///</remarks>
416         object IEnumerator.Current {
417             get {
418                 if (m_latestEvent == null) {
419                     throw new InvalidOperationException();
420                 }
421                 return m_latestEvent;
422             }
423         }
424
425         ///<summary>Implementation of the IEnumerator interface, for
426         ///permitting Subscription to be used in foreach
427         ///loops.</summary>
428         ///<remarks>
429         ///<para>
430         /// Does not acknowledge any deliveries at all. Ack() must be
431         /// called explicitly on received deliveries.
432         ///</para>
433         ///</remarks>
434         bool IEnumerator.MoveNext()
435         {
436             return Next() != null;
437         }
438
439         ///<summary>Dummy implementation of the IEnumerator interface,
440         ///for permitting Subscription to be used in foreach loops;
441         ///Reset()ting a Subscription doesn't make sense, so this
442         ///method always throws InvalidOperationException.</summary>
443         void IEnumerator.Reset()
444         {
445             // It really doesn't make sense to try to reset a subscription.
446             throw new InvalidOperationException("Subscription.Reset() does not make sense");
447         }
448
449         ///<summary>Implementation of the IDisposable interface,
450         ///permitting Subscription to be used in using
451         ///statements. Simply calls Close().</summary>
452         void IDisposable.Dispose()
453         {
454             Close();
455         }
456     }
457 }