202c14a9ec3d592a909f9f89e4c9fb69c71013c6
[mono.git] / mcs / class / RabbitMQ.Client / src / client / messagepatterns / Subscription.cs
1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
3 //
4 // The APL v2.0:
5 //
6 //---------------------------------------------------------------------------
7 //   Copyright (C) 2007, 2008 LShift Ltd., Cohesive Financial
8 //   Technologies LLC., and Rabbit Technologies Ltd.
9 //
10 //   Licensed under the Apache License, Version 2.0 (the "License");
11 //   you may not use this file except in compliance with the License.
12 //   You may obtain a copy of the License at
13 //
14 //       http://www.apache.org/licenses/LICENSE-2.0
15 //
16 //   Unless required by applicable law or agreed to in writing, software
17 //   distributed under the License is distributed on an "AS IS" BASIS,
18 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 //   See the License for the specific language governing permissions and
20 //   limitations under the License.
21 //---------------------------------------------------------------------------
22 //
23 // The MPL v1.1:
24 //
25 //---------------------------------------------------------------------------
26 //   The contents of this file are subject to the Mozilla Public License
27 //   Version 1.1 (the "License"); you may not use this file except in
28 //   compliance with the License. You may obtain a copy of the License at
29 //   http://www.rabbitmq.com/mpl.html
30 //
31 //   Software distributed under the License is distributed on an "AS IS"
32 //   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 //   License for the specific language governing rights and limitations
34 //   under the License.
35 //
36 //   The Original Code is The RabbitMQ .NET Client.
37 //
38 //   The Initial Developers of the Original Code are LShift Ltd.,
39 //   Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
40 //
41 //   Portions created by LShift Ltd., Cohesive Financial Technologies
42 //   LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007, 2008
43 //   LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
44 //   Technologies Ltd.;
45 //
46 //   All Rights Reserved.
47 //
48 //   Contributor(s): ______________________________________.
49 //
50 //---------------------------------------------------------------------------
51 using System;
52 using System.IO;
53 using System.Collections;
54
55 using RabbitMQ.Client;
56 using RabbitMQ.Client.Exceptions;
57 using RabbitMQ.Client.Events;
58 using RabbitMQ.Util;
59
60 namespace RabbitMQ.Client.MessagePatterns {
61     ///<summary>Manages a subscription to a queue or exchange.</summary>
62     ///<remarks>
63     ///<para>
64     /// This convenience class abstracts away from much of the detail
65     /// involved in receiving messages from a queue or an exchange.
66     ///</para>
67     ///<para>
68     /// Once created, the Subscription consumes from a queue (using a
69     /// QueueingBasicConsumer). Received deliveries can be retrieved
70     /// by calling Next(), or by using the Subscription as an
71     /// IEnumerator in, for example, a foreach loop.
72     ///</para>
73     ///<para>
74     /// See the documentation for Bind() and for the various overloads
75     /// of the constructor for the various styles of binding and
76     /// subscription that are available.
77     ///</para>
78     ///<para>
79     /// Note that if the "noAck" option is enabled (which it is by
80     /// default), then received deliveries are automatically acked
81     /// within the server before they are even transmitted across the
82     /// network to us. Calling Ack() on received events will always do
83     /// the right thing: if "noAck" is enabled, nothing is done on an
84     /// Ack() call, and if "noAck" is disabled, IModel.BasicAck() is
85     /// called with the correct parameters.
86     ///</para>
87     ///</remarks>
88     public class Subscription: IEnumerable, IEnumerator, IDisposable {
89         protected IModel m_model;
90         protected ushort m_ticket;
91
92         ///<summary>Retrieve the IModel our subscription is carried by.</summary>
93         public IModel Model { get { return m_model; } }
94         ///<summary>Retrieve the ticket we use for all our server operations.</summary>
95         public ushort Ticket { get { return m_ticket; } }
96
97         protected string m_queueName;
98         protected QueueingBasicConsumer m_consumer;
99         protected string m_consumerTag;
100         protected bool m_noAck;
101         protected bool m_shouldDelete;
102
103         ///<summary>Retrieve the queue name we have subscribed to. May
104         ///be a server-generated name, depending on how the
105         ///Subscription was constructed.</summary>
106         public string QueueName { get { return m_queueName; } }
107         ///<summary>Retrieve the IBasicConsumer that is receiving the
108         ///messages from the server for us. Normally, you will not
109         ///need to access this property - use Next() and friends
110         ///instead.</summary>
111         public IBasicConsumer Consumer { get { return m_consumer; } }
112         ///<summary>Retrieve the consumer-tag that this subscription
113         ///is using. Will usually be a server-generated
114         ///name.</summary>
115         public string ConsumerTag { get { return m_consumerTag; } }
116         ///<summary>Returns true if we are in "noAck" mode, where
117         ///calls to Ack() will be no-ops, and where the server acks
118         ///messages before they are delivered to us. Returns false if
119         ///we are in a mode where calls to Ack() are required, and
120         ///where such calls will actually send an acknowledgement
121         ///message across the network to the server.</summary>
122         public bool NoAck { get { return m_noAck; } }
123
124         protected BasicDeliverEventArgs m_latestEvent;
125
126         ///<summary>Returns the most recent value returned by Next(),
127         ///or null when either no values have been retrieved yet, or
128         ///the most recent value has already been Ack()ed. See also
129         ///the documentation for Ack().</summary>
130         public BasicDeliverEventArgs LatestEvent { get { return m_latestEvent; } }
131
132         ///<summary>Creates a new Subscription in "noAck" mode,
133         ///consuming from a fresh, autodelete, anonymous queue. The
134         ///name of the queue can be retrieved using the QueueName
135         ///property of the Subscription. After creating the queue, the
136         ///queue is bound to the named exchange, using Bind() with the
137         ///given routingKey bind parameter.</summary>
138         public Subscription(IModel model, ushort ticket,
139                             string exchangeName, string exchangeType, string routingKey)
140             : this(model, ticket)
141         {
142             Bind(exchangeName, exchangeType, routingKey);
143         }
144
145         ///<summary>Creates a new Subscription in "noAck" mode,
146         ///consuming from a fresh, autodelete, anonymous queue. The
147         ///name of the queue can be retrieved using the QueueName
148         ///property of the Subscription.</summary>
149         public Subscription(IModel model, ushort ticket)
150             : this(model, ticket, null) {}
151
152         ///<summary>Creates a new Subscription in "noAck" mode,
153         ///consuming from a named queue. If the queueName parameter is
154         ///null or the empty-string, creates a fresh, autodelete,
155         ///anonymous queue; otherwise, the queue is declared using
156         ///IModel.QueueDeclare() before IModel.BasicConsume() is
157         ///called. After declaring the queue and starting the
158         ///consumer, the queue is bound to the named exchange, using
159         ///Bind() with the given routingKey bind parameter.</summary>
160         public Subscription(IModel model, ushort ticket, string queueName,
161                             string exchangeName, string exchangeType, string routingKey)
162             : this(model, ticket, queueName)
163         {
164             Bind(exchangeName, exchangeType, routingKey);
165         }
166
167         ///<summary>Creates a new Subscription in "noAck" mode,
168         ///consuming from a named queue. If the queueName parameter is
169         ///null or the empty-string, creates a fresh, autodelete,
170         ///anonymous queue; otherwise, the queue is declared using
171         ///IModel.QueueDeclare() before IModel.BasicConsume() is
172         ///called.</summary>
173         public Subscription(IModel model, ushort ticket, string queueName)
174             : this(model, ticket, queueName, true) {}
175
176         ///<summary>Creates a new Subscription, with full control over
177         ///both "noAck" mode and the name of the queue (which, if null
178         ///or the empty-string, will be a fresh autodelete queue, as
179         ///for the other constructor overloads). After declaring the
180         ///queue and starting the consumer, the queue is bound to the
181         ///named exchange, using Bind() with the given routingKey bind
182         ///parameter.</summary>
183         public Subscription(IModel model, ushort ticket, string queueName, bool noAck,
184                             string exchangeName, string exchangeType, string routingKey)
185             : this(model, ticket, queueName, noAck)
186         {
187             Bind(exchangeName, exchangeType, routingKey);
188         }
189
190         ///<summary>Creates a new Subscription, with full control over
191         ///both "noAck" mode and the name of the queue (which, if null
192         ///or the empty-string, will be a fresh autodelete queue, as
193         ///for the other constructor overloads).</summary>
194         public Subscription(IModel model, ushort ticket, string queueName, bool noAck)
195         {
196             m_model = model;
197             m_ticket = ticket;
198             if (queueName == null || queueName.Equals("")) {
199                 m_queueName = m_model.QueueDeclare(ticket);
200                 m_shouldDelete = true;
201             } else {
202                 m_queueName = m_model.QueueDeclare(ticket, queueName);
203                 m_shouldDelete = false;
204             }
205             m_consumer = new QueueingBasicConsumer(m_model);
206             m_consumerTag = m_model.BasicConsume(m_ticket, m_queueName, m_noAck, null, m_consumer);
207             m_latestEvent = null;
208         }
209
210         ///<summary>Closes this Subscription, cancelling the consumer
211         ///record in the server. If an anonymous, autodelete queue
212         ///(i.e., one with a server-generated name) was created during
213         ///construction of the Subscription, this method also deletes
214         ///the created queue (which is an optimisation: autodelete
215         ///queues will be deleted when the IModel closes in any
216         ///case).</summary>
217         public void Close()
218         {
219             try {
220                 if (m_consumer != null) {
221                     m_model.BasicCancel(m_consumerTag);
222                 }
223                 if (m_shouldDelete) {
224                     m_shouldDelete = false;
225                     // We set m_shouldDelete false before attempting
226                     // the delete, because trying twice is worse than
227                     // trying once and failing.
228                     m_model.QueueDelete(m_ticket, m_queueName, false, false, false);
229                 }
230             } catch (OperationInterruptedException) {
231                 // We don't mind, here.
232             }
233             m_consumer = null;
234             m_consumerTag = null;
235         }
236
237         ///<summary>Causes the queue to which we have subscribed to be
238         ///bound to an exchange. Uses IModel.ExchangeDeclare and
239         ///IModel.QueueBind to (a) ensure the exchange exists, and (b)
240         ///link the exchange to our queue.</summary>
241         ///<remarks>
242         ///<para>
243         /// This method is called by some of the overloads of the
244         /// Subscription constructor.
245         ///</para>
246         ///<para>
247         /// Calling Bind() multiple times to bind to multiple
248         /// exchanges, or to bind to a single exchange more than once
249         /// with a different routingKey, is perfectly
250         /// acceptable. Calling Bind() twice with exactly the same
251         /// arguments is permitted and idempotent. For details, see
252         /// the AMQP specification.
253         ///</para>
254         ///</remarks>
255         public void Bind(string exchangeName, string exchangeType, string routingKey)
256         {
257             m_model.ExchangeDeclare(m_ticket, exchangeName, exchangeType);
258             m_model.QueueBind(m_ticket, m_queueName, exchangeName, routingKey, false, null);
259         }
260
261         ///<summary>If LatestEvent is non-null, passes it to
262         ///Ack(BasicDeliverEventArgs). Causes LatestEvent to become
263         ///null.</summary>
264         public void Ack()
265         {
266             if (m_latestEvent != null) {
267                 Ack(m_latestEvent);
268             }
269         }
270
271         ///<summary>If we are not in "noAck" mode, calls
272         ///IModel.BasicAck with the delivery-tag from the passed in
273         ///event; otherwise, sends nothing to the server. In both
274         ///cases, if the passed-in event is the same as LatestEvent
275         ///(by pointer comparison), sets LatestEvent to
276         ///null.</summary>
277         ///<remarks>
278         /// Make sure that this method is only called with events that
279         /// originated from this Subscription - other usage will have
280         /// unpredictable results.
281         ///</remarks>
282         public void Ack(BasicDeliverEventArgs evt)
283         {
284             if (evt == null) {
285                 return;
286             }
287
288             if (!m_noAck) {
289                 m_model.BasicAck(evt.DeliveryTag, false);
290             }
291
292             if (evt == m_latestEvent) {
293                 m_latestEvent = null;
294             }
295         }
296
297         ///<summary>Retrieves the next incoming delivery in our
298         ///subscription queue.</summary>
299         ///<remarks>
300         ///<para>
301         /// Returns null when the end of the stream is reached and on
302         /// every subsequent call. End-of-stream can arise through the
303         /// action of the Subscription.Close() method, or through the
304         /// closure of the IModel or its underlying IConnection.
305         ///</para>
306         ///<para>
307         /// Updates LatestEvent to the value returned.
308         ///</para>
309         ///<para>
310         /// Does not acknowledge any deliveries at all (but in "noAck"
311         /// mode, the server will have auto-acknowledged each event
312         /// before it is even sent across the wire to us).
313         ///</para>
314         ///</remarks>
315         public BasicDeliverEventArgs Next()
316         {
317             try {
318                 if (m_consumer == null) {
319                     // Closed!
320                     throw new InvalidOperationException();
321                 }
322                 m_latestEvent = (BasicDeliverEventArgs) m_consumer.Queue.Dequeue();
323             } catch (EndOfStreamException) {
324                 m_latestEvent = null;
325             }
326             return m_latestEvent;
327         }
328
329         ///<summary>Retrieves the next incoming delivery in our
330         ///subscription queue, or times out after a specified number
331         ///of milliseconds.</summary>
332
333         ///<remarks>
334         ///<para>
335         /// Returns false only if the timeout expires before either a
336         /// delivery appears or the end-of-stream is reached. If false
337         /// is returned, the out parameter "result" is set to null,
338         /// but LatestEvent is not updated.
339         ///</para>
340         ///<para>
341         /// Returns true to indicate a delivery or the end-of-stream.
342         ///</para>
343         ///<para>
344         /// If a delivery is already waiting in the queue, or one
345         /// arrives before the timeout expires, it is removed from the
346         /// queue and placed in the "result" out parameter. If the
347         /// end-of-stream is detected before the timeout expires,
348         /// "result" is set to null.
349         ///</para>
350         ///<para>
351         /// Whenever this method returns true, it updates LatestEvent
352         /// to the value placed in "result" before returning.
353         ///</para>
354         ///<para>
355         /// End-of-stream can arise through the action of the
356         /// Subscription.Close() method, or through the closure of the
357         /// IModel or its underlying IConnection.
358         ///</para>
359         ///<para>
360         /// This method does not acknowledge any deliveries at all
361         /// (but in "noAck" mode, the server will have
362         /// auto-acknowledged each event before it is even sent across
363         /// the wire to us).
364         ///</para>
365         ///<para>
366         /// A timeout of -1 (i.e. System.Threading.Timeout.Infinite)
367         /// will be interpreted as a command to wait for an
368         /// indefinitely long period of time for an item or the end of
369         /// the stream to become available. Usage of such a timeout is
370         /// equivalent to calling Next() with no arguments (modulo
371         /// predictable method signature differences).
372         ///</para>
373         ///</remarks>
374         public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
375         {
376             try {
377                 if (m_consumer == null) {
378                     // Closed!
379                     throw new InvalidOperationException();
380                 }
381                 object qValue;
382                 if (!m_consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
383                     result = null;
384                     return false;
385                 }
386                 m_latestEvent = (BasicDeliverEventArgs) qValue;
387             } catch (EndOfStreamException) {
388                 m_latestEvent = null;
389             }
390             result = m_latestEvent;
391             return true;
392         }
393
394         ///<summary>Implementation of the IEnumerable interface, for
395         ///permitting Subscription to be used in foreach
396         ///loops.</summary>
397         IEnumerator IEnumerable.GetEnumerator()
398         {
399             return this;
400         }
401
402         ///<summary>Implementation of the IEnumerator interface, for
403         ///permitting Subscription to be used in foreach
404         ///loops.</summary>
405         ///<remarks>
406         ///<para>
407         /// As per the IEnumerator interface definition, throws
408         /// InvalidOperationException if LatestEvent is null.
409         ///</para>
410         ///<para>
411         /// Does not acknowledge any deliveries at all. Ack() must be
412         /// called explicitly on received deliveries.
413         ///</para>
414         ///</remarks>
415         object IEnumerator.Current {
416             get {
417                 if (m_latestEvent == null) {
418                     throw new InvalidOperationException();
419                 }
420                 return m_latestEvent;
421             }
422         }
423
424         ///<summary>Implementation of the IEnumerator interface, for
425         ///permitting Subscription to be used in foreach
426         ///loops.</summary>
427         ///<remarks>
428         ///<para>
429         /// Does not acknowledge any deliveries at all. Ack() must be
430         /// called explicitly on received deliveries.
431         ///</para>
432         ///</remarks>
433         bool IEnumerator.MoveNext()
434         {
435             return Next() != null;
436         }
437
438         ///<summary>Dummy implementation of the IEnumerator interface,
439         ///for permitting Subscription to be used in foreach loops;
440         ///Reset()ting a Subscription doesn't make sense, so this
441         ///method always throws InvalidOperationException.</summary>
442         void IEnumerator.Reset()
443         {
444             // It really doesn't make sense to try to reset a subscription.
445             throw new InvalidOperationException("Subscription.Reset() does not make sense");
446         }
447
448         ///<summary>Implementation of the IDisposable interface,
449         ///permitting Subscription to be used in using
450         ///statements. Simply calls Close().</summary>
451         void IDisposable.Dispose()
452         {
453             Close();
454         }
455     }
456 }