1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
6 //---------------------------------------------------------------------------
7 // Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial
8 // Technologies LLC., and Rabbit Technologies Ltd.
10 // Licensed under the Apache License, Version 2.0 (the "License");
11 // you may not use this file except in compliance with the License.
12 // You may obtain a copy of the License at
14 // http://www.apache.org/licenses/LICENSE-2.0
16 // Unless required by applicable law or agreed to in writing, software
17 // distributed under the License is distributed on an "AS IS" BASIS,
18 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 // See the License for the specific language governing permissions and
20 // limitations under the License.
21 //---------------------------------------------------------------------------
25 //---------------------------------------------------------------------------
26 // The contents of this file are subject to the Mozilla Public License
27 // Version 1.1 (the "License"); you may not use this file except in
28 // compliance with the License. You may obtain a copy of the License at
29 // http://www.rabbitmq.com/mpl.html
31 // Software distributed under the License is distributed on an "AS IS"
32 // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 // License for the specific language governing rights and limitations
36 // The Original Code is The RabbitMQ .NET Client.
38 // The Initial Developers of the Original Code are LShift Ltd,
39 // Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
41 // Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42 // Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43 // are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44 // Technologies LLC, and Rabbit Technologies Ltd.
46 // Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
47 // Ltd. Portions created by Cohesive Financial Technologies LLC are
48 // Copyright (C) 2007-2009 Cohesive Financial Technologies
49 // LLC. Portions created by Rabbit Technologies Ltd are Copyright
50 // (C) 2007-2009 Rabbit Technologies Ltd.
52 // All Rights Reserved.
54 // Contributor(s): ______________________________________.
56 //---------------------------------------------------------------------------
59 using System.Collections;
61 using RabbitMQ.Client;
62 using RabbitMQ.Client.Exceptions;
63 using RabbitMQ.Client.Events;
66 namespace RabbitMQ.Client.MessagePatterns {
67 ///<summary>Manages a subscription to a queue or exchange.</summary>
70 /// This convenience class abstracts away from much of the detail
71 /// involved in receiving messages from a queue or an exchange.
74 /// Once created, the Subscription consumes from a queue (using a
75 /// QueueingBasicConsumer). Received deliveries can be retrieved
76 /// by calling Next(), or by using the Subscription as an
77 /// IEnumerator in, for example, a foreach loop.
80 /// See the documentation for Bind() and for the various overloads
81 /// of the constructor for the various styles of binding and
82 /// subscription that are available.
85 /// Note that if the "noAck" option is enabled (which it is by
86 /// default), then received deliveries are automatically acked
87 /// within the server before they are even transmitted across the
88 /// network to us. Calling Ack() on received events will always do
89 /// the right thing: if "noAck" is enabled, nothing is done on an
90 /// Ack() call, and if "noAck" is disabled, IModel.BasicAck() is
91 /// called with the correct parameters.
94 public class Subscription: IEnumerable, IEnumerator, IDisposable {
95 protected IModel m_model;
97 ///<summary>Retrieve the IModel our subscription is carried by.</summary>
98 public IModel Model { get { return m_model; } }
100 protected string m_queueName;
101 protected QueueingBasicConsumer m_consumer;
102 protected string m_consumerTag;
103 protected bool m_noAck;
104 protected bool m_shouldDelete;
106 ///<summary>Retrieve the queue name we have subscribed to. May
107 ///be a server-generated name, depending on how the
108 ///Subscription was constructed.</summary>
109 public string QueueName { get { return m_queueName; } }
110 ///<summary>Retrieve the IBasicConsumer that is receiving the
111 ///messages from the server for us. Normally, you will not
112 ///need to access this property - use Next() and friends
113 ///instead.</summary>
114 public IBasicConsumer Consumer { get { return m_consumer; } }
115 ///<summary>Retrieve the consumer-tag that this subscription
116 ///is using. Will usually be a server-generated
118 public string ConsumerTag { get { return m_consumerTag; } }
119 ///<summary>Returns true if we are in "noAck" mode, where
120 ///calls to Ack() will be no-ops, and where the server acks
121 ///messages before they are delivered to us. Returns false if
122 ///we are in a mode where calls to Ack() are required, and
123 ///where such calls will actually send an acknowledgement
124 ///message across the network to the server.</summary>
125 public bool NoAck { get { return m_noAck; } }
127 protected BasicDeliverEventArgs m_latestEvent;
129 ///<summary>Returns the most recent value returned by Next(),
130 ///or null when either no values have been retrieved yet, or
131 ///the most recent value has already been Ack()ed. See also
132 ///the documentation for Ack().</summary>
133 public BasicDeliverEventArgs LatestEvent { get { return m_latestEvent; } }
135 ///<summary>Creates a new Subscription in "noAck" mode,
136 ///consuming from a fresh, autodelete, anonymous queue. The
137 ///name of the queue can be retrieved using the QueueName
138 ///property of the Subscription. After creating the queue, the
139 ///queue is bound to the named exchange, using Bind() with the
140 ///given routingKey bind parameter.</summary>
141 public Subscription(IModel model, string exchangeName,
142 string exchangeType, string routingKey)
145 Bind(exchangeName, exchangeType, routingKey);
148 ///<summary>Creates a new Subscription in "noAck" mode,
149 ///consuming from a fresh, autodelete, anonymous queue. The
150 ///name of the queue can be retrieved using the QueueName
151 ///property of the Subscription.</summary>
152 public Subscription(IModel model)
153 : this(model, null) {}
155 ///<summary>Creates a new Subscription in "noAck" mode,
156 ///consuming from a named queue. If the queueName parameter is
157 ///null or the empty-string, creates a fresh, autodelete,
158 ///anonymous queue; otherwise, the queue is declared using
159 ///IModel.QueueDeclare() before IModel.BasicConsume() is
160 ///called. After declaring the queue and starting the
161 ///consumer, the queue is bound to the named exchange, using
162 ///Bind() with the given routingKey bind parameter.</summary>
163 public Subscription(IModel model, string queueName, string exchangeName,
164 string exchangeType, string routingKey)
165 : this(model, queueName)
167 Bind(exchangeName, exchangeType, routingKey);
170 ///<summary>Creates a new Subscription in "noAck" mode,
171 ///consuming from a named queue. If the queueName parameter is
172 ///null or the empty-string, creates a fresh, autodelete,
173 ///anonymous queue; otherwise, the queue is declared using
174 ///IModel.QueueDeclare() before IModel.BasicConsume() is
176 public Subscription(IModel model, string queueName)
177 : this(model, queueName, true) {}
179 ///<summary>Creates a new Subscription, with full control over
180 ///both "noAck" mode and the name of the queue (which, if null
181 ///or the empty-string, will be a fresh autodelete queue, as
182 ///for the other constructor overloads). After declaring the
183 ///queue and starting the consumer, the queue is bound to the
184 ///named exchange, using Bind() with the given routingKey bind
185 ///parameter.</summary>
186 public Subscription(IModel model, string queueName, bool noAck,
187 string exchangeName, string exchangeType, string routingKey)
188 : this(model, queueName, noAck)
190 Bind(exchangeName, exchangeType, routingKey);
193 ///<summary>Creates a new Subscription, with full control over
194 ///both "noAck" mode and the name of the queue (which, if null
195 ///or the empty-string, will be a fresh autodelete queue, as
196 ///for the other constructor overloads).</summary>
197 public Subscription(IModel model, string queueName, bool noAck)
200 if (queueName == null || queueName.Equals("")) {
201 m_queueName = m_model.QueueDeclare();
202 m_shouldDelete = true;
204 m_queueName = m_model.QueueDeclare(queueName);
205 m_shouldDelete = false;
207 m_consumer = new QueueingBasicConsumer(m_model);
208 m_consumerTag = m_model.BasicConsume(m_queueName, m_noAck, null, m_consumer);
209 m_latestEvent = null;
212 ///<summary>Closes this Subscription, cancelling the consumer
213 ///record in the server. If an anonymous, autodelete queue
214 ///(i.e., one with a server-generated name) was created during
215 ///construction of the Subscription, this method also deletes
216 ///the created queue (which is an optimisation: autodelete
217 ///queues will be deleted when the IModel closes in any
222 if (m_consumer != null) {
223 m_model.BasicCancel(m_consumerTag);
225 if (m_shouldDelete) {
226 m_shouldDelete = false;
227 // We set m_shouldDelete false before attempting
228 // the delete, because trying twice is worse than
229 // trying once and failing.
230 m_model.QueueDelete(m_queueName, false, false, false);
232 } catch (OperationInterruptedException) {
233 // We don't mind, here.
236 m_consumerTag = null;
239 ///<summary>Causes the queue to which we have subscribed to be
240 ///bound to an exchange. Uses IModel.ExchangeDeclare and
241 ///IModel.QueueBind to (a) ensure the exchange exists, and (b)
242 ///link the exchange to our queue.</summary>
245 /// This method is called by some of the overloads of the
246 /// Subscription constructor.
249 /// Calling Bind() multiple times to bind to multiple
250 /// exchanges, or to bind to a single exchange more than once
251 /// with a different routingKey, is perfectly
252 /// acceptable. Calling Bind() twice with exactly the same
253 /// arguments is permitted and idempotent. For details, see
254 /// the AMQP specification.
257 public void Bind(string exchangeName, string exchangeType, string routingKey)
259 m_model.ExchangeDeclare(exchangeName, exchangeType);
260 m_model.QueueBind(m_queueName, exchangeName, routingKey, false, null);
263 ///<summary>If LatestEvent is non-null, passes it to
264 ///Ack(BasicDeliverEventArgs). Causes LatestEvent to become
268 if (m_latestEvent != null) {
273 ///<summary>If we are not in "noAck" mode, calls
274 ///IModel.BasicAck with the delivery-tag from the passed in
275 ///event; otherwise, sends nothing to the server. In both
276 ///cases, if the passed-in event is the same as LatestEvent
277 ///(by pointer comparison), sets LatestEvent to
280 /// Make sure that this method is only called with events that
281 /// originated from this Subscription - other usage will have
282 /// unpredictable results.
284 public void Ack(BasicDeliverEventArgs evt)
291 m_model.BasicAck(evt.DeliveryTag, false);
294 if (evt == m_latestEvent) {
295 m_latestEvent = null;
299 ///<summary>Retrieves the next incoming delivery in our
300 ///subscription queue.</summary>
303 /// Returns null when the end of the stream is reached and on
304 /// every subsequent call. End-of-stream can arise through the
305 /// action of the Subscription.Close() method, or through the
306 /// closure of the IModel or its underlying IConnection.
309 /// Updates LatestEvent to the value returned.
312 /// Does not acknowledge any deliveries at all (but in "noAck"
313 /// mode, the server will have auto-acknowledged each event
314 /// before it is even sent across the wire to us).
317 public BasicDeliverEventArgs Next()
320 if (m_consumer == null) {
322 throw new InvalidOperationException();
324 m_latestEvent = (BasicDeliverEventArgs) m_consumer.Queue.Dequeue();
325 } catch (EndOfStreamException) {
326 m_latestEvent = null;
328 return m_latestEvent;
331 ///<summary>Retrieves the next incoming delivery in our
332 ///subscription queue, or times out after a specified number
333 ///of milliseconds.</summary>
336 /// Returns false only if the timeout expires before either a
337 /// delivery appears or the end-of-stream is reached. If false
338 /// is returned, the out parameter "result" is set to null,
339 /// but LatestEvent is not updated.
342 /// Returns true to indicate a delivery or the end-of-stream.
345 /// If a delivery is already waiting in the queue, or one
346 /// arrives before the timeout expires, it is removed from the
347 /// queue and placed in the "result" out parameter. If the
348 /// end-of-stream is detected before the timeout expires,
349 /// "result" is set to null.
352 /// Whenever this method returns true, it updates LatestEvent
353 /// to the value placed in "result" before returning.
356 /// End-of-stream can arise through the action of the
357 /// Subscription.Close() method, or through the closure of the
358 /// IModel or its underlying IConnection.
361 /// This method does not acknowledge any deliveries at all
362 /// (but in "noAck" mode, the server will have
363 /// auto-acknowledged each event before it is even sent across
367 /// A timeout of -1 (i.e. System.Threading.Timeout.Infinite)
368 /// will be interpreted as a command to wait for an
369 /// indefinitely long period of time for an item or the end of
370 /// the stream to become available. Usage of such a timeout is
371 /// equivalent to calling Next() with no arguments (modulo
372 /// predictable method signature differences).
375 public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
378 if (m_consumer == null) {
380 throw new InvalidOperationException();
383 if (!m_consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
387 m_latestEvent = (BasicDeliverEventArgs) qValue;
388 } catch (EndOfStreamException) {
389 m_latestEvent = null;
391 result = m_latestEvent;
395 ///<summary>Implementation of the IEnumerable interface, for
396 ///permitting Subscription to be used in foreach
398 IEnumerator IEnumerable.GetEnumerator()
403 ///<summary>Implementation of the IEnumerator interface, for
404 ///permitting Subscription to be used in foreach
408 /// As per the IEnumerator interface definition, throws
409 /// InvalidOperationException if LatestEvent is null.
412 /// Does not acknowledge any deliveries at all. Ack() must be
413 /// called explicitly on received deliveries.
416 object IEnumerator.Current {
418 if (m_latestEvent == null) {
419 throw new InvalidOperationException();
421 return m_latestEvent;
425 ///<summary>Implementation of the IEnumerator interface, for
426 ///permitting Subscription to be used in foreach
430 /// Does not acknowledge any deliveries at all. Ack() must be
431 /// called explicitly on received deliveries.
434 bool IEnumerator.MoveNext()
436 return Next() != null;
439 ///<summary>Dummy implementation of the IEnumerator interface,
440 ///for permitting Subscription to be used in foreach loops;
441 ///Reset()ting a Subscription doesn't make sense, so this
442 ///method always throws InvalidOperationException.</summary>
443 void IEnumerator.Reset()
445 // It really doesn't make sense to try to reset a subscription.
446 throw new InvalidOperationException("Subscription.Reset() does not make sense");
449 ///<summary>Implementation of the IDisposable interface,
450 ///permitting Subscription to be used in using
451 ///statements. Simply calls Close().</summary>
452 void IDisposable.Dispose()