1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
6 //---------------------------------------------------------------------------
7 // Copyright (C) 2007, 2008 LShift Ltd., Cohesive Financial
8 // Technologies LLC., and Rabbit Technologies Ltd.
10 // Licensed under the Apache License, Version 2.0 (the "License");
11 // you may not use this file except in compliance with the License.
12 // You may obtain a copy of the License at
14 // http://www.apache.org/licenses/LICENSE-2.0
16 // Unless required by applicable law or agreed to in writing, software
17 // distributed under the License is distributed on an "AS IS" BASIS,
18 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 // See the License for the specific language governing permissions and
20 // limitations under the License.
21 //---------------------------------------------------------------------------
25 //---------------------------------------------------------------------------
26 // The contents of this file are subject to the Mozilla Public License
27 // Version 1.1 (the "License"); you may not use this file except in
28 // compliance with the License. You may obtain a copy of the License at
29 // http://www.rabbitmq.com/mpl.html
31 // Software distributed under the License is distributed on an "AS IS"
32 // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 // License for the specific language governing rights and limitations
36 // The Original Code is The RabbitMQ .NET Client.
38 // The Initial Developers of the Original Code are LShift Ltd.,
39 // Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
41 // Portions created by LShift Ltd., Cohesive Financial Technologies
42 // LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007, 2008
43 // LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
46 // All Rights Reserved.
48 // Contributor(s): ______________________________________.
50 //---------------------------------------------------------------------------
53 using System.Collections;
55 using RabbitMQ.Client;
56 using RabbitMQ.Client.Exceptions;
57 using RabbitMQ.Client.Events;
60 namespace RabbitMQ.Client.MessagePatterns {
61 ///<summary>Manages a subscription to a queue or exchange.</summary>
64 /// This convenience class abstracts away from much of the detail
65 /// involved in receiving messages from a queue or an exchange.
68 /// Once created, the Subscription consumes from a queue (using a
69 /// QueueingBasicConsumer). Received deliveries can be retrieved
70 /// by calling Next(), or by using the Subscription as an
71 /// IEnumerator in, for example, a foreach loop.
74 /// See the documentation for Bind() and for the various overloads
75 /// of the constructor for the various styles of binding and
76 /// subscription that are available.
79 /// Note that if the "noAck" option is enabled (which it is by
80 /// default), then received deliveries are automatically acked
81 /// within the server before they are even transmitted across the
82 /// network to us. Calling Ack() on received events will always do
83 /// the right thing: if "noAck" is enabled, nothing is done on an
84 /// Ack() call, and if "noAck" is disabled, IModel.BasicAck() is
85 /// called with the correct parameters.
88 public class Subscription: IEnumerable, IEnumerator, IDisposable {
89 protected IModel m_model;
90 protected ushort m_ticket;
92 ///<summary>Retrieve the IModel our subscription is carried by.</summary>
93 public IModel Model { get { return m_model; } }
94 ///<summary>Retrieve the ticket we use for all our server operations.</summary>
95 public ushort Ticket { get { return m_ticket; } }
97 protected string m_queueName;
98 protected QueueingBasicConsumer m_consumer;
99 protected string m_consumerTag;
100 protected bool m_noAck;
101 protected bool m_shouldDelete;
103 ///<summary>Retrieve the queue name we have subscribed to. May
104 ///be a server-generated name, depending on how the
105 ///Subscription was constructed.</summary>
106 public string QueueName { get { return m_queueName; } }
107 ///<summary>Retrieve the IBasicConsumer that is receiving the
108 ///messages from the server for us. Normally, you will not
109 ///need to access this property - use Next() and friends
110 ///instead.</summary>
111 public IBasicConsumer Consumer { get { return m_consumer; } }
112 ///<summary>Retrieve the consumer-tag that this subscription
113 ///is using. Will usually be a server-generated
115 public string ConsumerTag { get { return m_consumerTag; } }
116 ///<summary>Returns true if we are in "noAck" mode, where
117 ///calls to Ack() will be no-ops, and where the server acks
118 ///messages before they are delivered to us. Returns false if
119 ///we are in a mode where calls to Ack() are required, and
120 ///where such calls will actually send an acknowledgement
121 ///message across the network to the server.</summary>
122 public bool NoAck { get { return m_noAck; } }
124 protected BasicDeliverEventArgs m_latestEvent;
126 ///<summary>Returns the most recent value returned by Next(),
127 ///or null when either no values have been retrieved yet, or
128 ///the most recent value has already been Ack()ed. See also
129 ///the documentation for Ack().</summary>
130 public BasicDeliverEventArgs LatestEvent { get { return m_latestEvent; } }
132 ///<summary>Creates a new Subscription in "noAck" mode,
133 ///consuming from a fresh, autodelete, anonymous queue. The
134 ///name of the queue can be retrieved using the QueueName
135 ///property of the Subscription. After creating the queue, the
136 ///queue is bound to the named exchange, using Bind() with the
137 ///given routingKey bind parameter.</summary>
138 public Subscription(IModel model, ushort ticket,
139 string exchangeName, string exchangeType, string routingKey)
140 : this(model, ticket)
142 Bind(exchangeName, exchangeType, routingKey);
145 ///<summary>Creates a new Subscription in "noAck" mode,
146 ///consuming from a fresh, autodelete, anonymous queue. The
147 ///name of the queue can be retrieved using the QueueName
148 ///property of the Subscription.</summary>
149 public Subscription(IModel model, ushort ticket)
150 : this(model, ticket, null) {}
152 ///<summary>Creates a new Subscription in "noAck" mode,
153 ///consuming from a named queue. If the queueName parameter is
154 ///null or the empty-string, creates a fresh, autodelete,
155 ///anonymous queue; otherwise, the queue is declared using
156 ///IModel.QueueDeclare() before IModel.BasicConsume() is
157 ///called. After declaring the queue and starting the
158 ///consumer, the queue is bound to the named exchange, using
159 ///Bind() with the given routingKey bind parameter.</summary>
160 public Subscription(IModel model, ushort ticket, string queueName,
161 string exchangeName, string exchangeType, string routingKey)
162 : this(model, ticket, queueName)
164 Bind(exchangeName, exchangeType, routingKey);
167 ///<summary>Creates a new Subscription in "noAck" mode,
168 ///consuming from a named queue. If the queueName parameter is
169 ///null or the empty-string, creates a fresh, autodelete,
170 ///anonymous queue; otherwise, the queue is declared using
171 ///IModel.QueueDeclare() before IModel.BasicConsume() is
173 public Subscription(IModel model, ushort ticket, string queueName)
174 : this(model, ticket, queueName, true) {}
176 ///<summary>Creates a new Subscription, with full control over
177 ///both "noAck" mode and the name of the queue (which, if null
178 ///or the empty-string, will be a fresh autodelete queue, as
179 ///for the other constructor overloads). After declaring the
180 ///queue and starting the consumer, the queue is bound to the
181 ///named exchange, using Bind() with the given routingKey bind
182 ///parameter.</summary>
183 public Subscription(IModel model, ushort ticket, string queueName, bool noAck,
184 string exchangeName, string exchangeType, string routingKey)
185 : this(model, ticket, queueName, noAck)
187 Bind(exchangeName, exchangeType, routingKey);
190 ///<summary>Creates a new Subscription, with full control over
191 ///both "noAck" mode and the name of the queue (which, if null
192 ///or the empty-string, will be a fresh autodelete queue, as
193 ///for the other constructor overloads).</summary>
194 public Subscription(IModel model, ushort ticket, string queueName, bool noAck)
198 if (queueName == null || queueName.Equals("")) {
199 m_queueName = m_model.QueueDeclare(ticket);
200 m_shouldDelete = true;
202 m_queueName = m_model.QueueDeclare(ticket, queueName);
203 m_shouldDelete = false;
205 m_consumer = new QueueingBasicConsumer(m_model);
206 m_consumerTag = m_model.BasicConsume(m_ticket, m_queueName, m_noAck, null, m_consumer);
207 m_latestEvent = null;
210 ///<summary>Closes this Subscription, cancelling the consumer
211 ///record in the server. If an anonymous, autodelete queue
212 ///(i.e., one with a server-generated name) was created during
213 ///construction of the Subscription, this method also deletes
214 ///the created queue (which is an optimisation: autodelete
215 ///queues will be deleted when the IModel closes in any
220 if (m_consumer != null) {
221 m_model.BasicCancel(m_consumerTag);
223 if (m_shouldDelete) {
224 m_shouldDelete = false;
225 // We set m_shouldDelete false before attempting
226 // the delete, because trying twice is worse than
227 // trying once and failing.
228 m_model.QueueDelete(m_ticket, m_queueName, false, false, false);
230 } catch (OperationInterruptedException) {
231 // We don't mind, here.
234 m_consumerTag = null;
237 ///<summary>Causes the queue to which we have subscribed to be
238 ///bound to an exchange. Uses IModel.ExchangeDeclare and
239 ///IModel.QueueBind to (a) ensure the exchange exists, and (b)
240 ///link the exchange to our queue.</summary>
243 /// This method is called by some of the overloads of the
244 /// Subscription constructor.
247 /// Calling Bind() multiple times to bind to multiple
248 /// exchanges, or to bind to a single exchange more than once
249 /// with a different routingKey, is perfectly
250 /// acceptable. Calling Bind() twice with exactly the same
251 /// arguments is permitted and idempotent. For details, see
252 /// the AMQP specification.
255 public void Bind(string exchangeName, string exchangeType, string routingKey)
257 m_model.ExchangeDeclare(m_ticket, exchangeName, exchangeType);
258 m_model.QueueBind(m_ticket, m_queueName, exchangeName, routingKey, false, null);
261 ///<summary>If LatestEvent is non-null, passes it to
262 ///Ack(BasicDeliverEventArgs). Causes LatestEvent to become
266 if (m_latestEvent != null) {
271 ///<summary>If we are not in "noAck" mode, calls
272 ///IModel.BasicAck with the delivery-tag from the passed in
273 ///event; otherwise, sends nothing to the server. In both
274 ///cases, if the passed-in event is the same as LatestEvent
275 ///(by pointer comparison), sets LatestEvent to
278 /// Make sure that this method is only called with events that
279 /// originated from this Subscription - other usage will have
280 /// unpredictable results.
282 public void Ack(BasicDeliverEventArgs evt)
289 m_model.BasicAck(evt.DeliveryTag, false);
292 if (evt == m_latestEvent) {
293 m_latestEvent = null;
297 ///<summary>Retrieves the next incoming delivery in our
298 ///subscription queue.</summary>
301 /// Returns null when the end of the stream is reached and on
302 /// every subsequent call. End-of-stream can arise through the
303 /// action of the Subscription.Close() method, or through the
304 /// closure of the IModel or its underlying IConnection.
307 /// Updates LatestEvent to the value returned.
310 /// Does not acknowledge any deliveries at all (but in "noAck"
311 /// mode, the server will have auto-acknowledged each event
312 /// before it is even sent across the wire to us).
315 public BasicDeliverEventArgs Next()
318 if (m_consumer == null) {
320 throw new InvalidOperationException();
322 m_latestEvent = (BasicDeliverEventArgs) m_consumer.Queue.Dequeue();
323 } catch (EndOfStreamException) {
324 m_latestEvent = null;
326 return m_latestEvent;
329 ///<summary>Retrieves the next incoming delivery in our
330 ///subscription queue, or times out after a specified number
331 ///of milliseconds.</summary>
335 /// Returns false only if the timeout expires before either a
336 /// delivery appears or the end-of-stream is reached. If false
337 /// is returned, the out parameter "result" is set to null,
338 /// but LatestEvent is not updated.
341 /// Returns true to indicate a delivery or the end-of-stream.
344 /// If a delivery is already waiting in the queue, or one
345 /// arrives before the timeout expires, it is removed from the
346 /// queue and placed in the "result" out parameter. If the
347 /// end-of-stream is detected before the timeout expires,
348 /// "result" is set to null.
351 /// Whenever this method returns true, it updates LatestEvent
352 /// to the value placed in "result" before returning.
355 /// End-of-stream can arise through the action of the
356 /// Subscription.Close() method, or through the closure of the
357 /// IModel or its underlying IConnection.
360 /// This method does not acknowledge any deliveries at all
361 /// (but in "noAck" mode, the server will have
362 /// auto-acknowledged each event before it is even sent across
366 /// A timeout of -1 (i.e. System.Threading.Timeout.Infinite)
367 /// will be interpreted as a command to wait for an
368 /// indefinitely long period of time for an item or the end of
369 /// the stream to become available. Usage of such a timeout is
370 /// equivalent to calling Next() with no arguments (modulo
371 /// predictable method signature differences).
374 public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
377 if (m_consumer == null) {
379 throw new InvalidOperationException();
382 if (!m_consumer.Queue.Dequeue(millisecondsTimeout, out qValue)) {
386 m_latestEvent = (BasicDeliverEventArgs) qValue;
387 } catch (EndOfStreamException) {
388 m_latestEvent = null;
390 result = m_latestEvent;
394 ///<summary>Implementation of the IEnumerable interface, for
395 ///permitting Subscription to be used in foreach
397 IEnumerator IEnumerable.GetEnumerator()
402 ///<summary>Implementation of the IEnumerator interface, for
403 ///permitting Subscription to be used in foreach
407 /// As per the IEnumerator interface definition, throws
408 /// InvalidOperationException if LatestEvent is null.
411 /// Does not acknowledge any deliveries at all. Ack() must be
412 /// called explicitly on received deliveries.
415 object IEnumerator.Current {
417 if (m_latestEvent == null) {
418 throw new InvalidOperationException();
420 return m_latestEvent;
424 ///<summary>Implementation of the IEnumerator interface, for
425 ///permitting Subscription to be used in foreach
429 /// Does not acknowledge any deliveries at all. Ack() must be
430 /// called explicitly on received deliveries.
433 bool IEnumerator.MoveNext()
435 return Next() != null;
438 ///<summary>Dummy implementation of the IEnumerator interface,
439 ///for permitting Subscription to be used in foreach loops;
440 ///Reset()ting a Subscription doesn't make sense, so this
441 ///method always throws InvalidOperationException.</summary>
442 void IEnumerator.Reset()
444 // It really doesn't make sense to try to reset a subscription.
445 throw new InvalidOperationException("Subscription.Reset() does not make sense");
448 ///<summary>Implementation of the IDisposable interface,
449 ///permitting Subscription to be used in using
450 ///statements. Simply calls Close().</summary>
451 void IDisposable.Dispose()