1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
6 //---------------------------------------------------------------------------
7 // Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial
8 // Technologies LLC., and Rabbit Technologies Ltd.
10 // Licensed under the Apache License, Version 2.0 (the "License");
11 // you may not use this file except in compliance with the License.
12 // You may obtain a copy of the License at
14 // http://www.apache.org/licenses/LICENSE-2.0
16 // Unless required by applicable law or agreed to in writing, software
17 // distributed under the License is distributed on an "AS IS" BASIS,
18 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 // See the License for the specific language governing permissions and
20 // limitations under the License.
21 //---------------------------------------------------------------------------
25 //---------------------------------------------------------------------------
26 // The contents of this file are subject to the Mozilla Public License
27 // Version 1.1 (the "License"); you may not use this file except in
28 // compliance with the License. You may obtain a copy of the License at
29 // http://www.rabbitmq.com/mpl.html
31 // Software distributed under the License is distributed on an "AS IS"
32 // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 // License for the specific language governing rights and limitations
36 // The Original Code is The RabbitMQ .NET Client.
38 // The Initial Developers of the Original Code are LShift Ltd,
39 // Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
41 // Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42 // Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43 // are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44 // Technologies LLC, and Rabbit Technologies Ltd.
46 // Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
47 // Ltd. Portions created by Cohesive Financial Technologies LLC are
48 // Copyright (C) 2007-2010 Cohesive Financial Technologies
49 // LLC. Portions created by Rabbit Technologies Ltd are Copyright
50 // (C) 2007-2010 Rabbit Technologies Ltd.
52 // All Rights Reserved.
54 // Contributor(s): ______________________________________.
56 //---------------------------------------------------------------------------
59 using System.Collections;
60 using System.Threading;
62 namespace RabbitMQ.Util {
63 ///<summary>A thread-safe shared queue implementation.</summary>
64 public class SharedQueue : IEnumerable {
65 ///<summary>The shared queue.</summary>
67 ///Subclasses must ensure appropriate locking discipline when
68 ///accessing this field. See the implementation of Enqueue,
71 protected Queue m_queue = new Queue();
73 ///<summary>Flag holding our current status.</summary>
74 protected bool m_isOpen = true;
76 ///<summary>Construct a fresh, empty SharedQueue.</summary>
77 public SharedQueue() {
80 ///<summary>Close the queue. Causes all further Enqueue()
81 ///operations to throw EndOfStreamException, and all pending
82 ///or subsequent Dequeue() operations to throw an
83 ///EndOfStreamException once the queue is empty.</summary>
87 Monitor.PulseAll(m_queue);
91 ///<summary>Call only when the lock on m_queue is held.</summary>
92 /// <exception cref="EndOfStreamException" />
93 private void EnsureIsOpen() {
95 throw new EndOfStreamException("SharedQueue closed");
99 ///<summary>Place an item at the end of the queue.</summary>
101 ///If there is a thread waiting for an item to arrive, the
102 ///waiting thread will be woken, and the newly Enqueued item
103 ///will be passed to it. If the queue is closed on entry to
104 ///this method, EndOfStreamException will be thrown.
106 public void Enqueue(object o) {
110 Monitor.Pulse(m_queue);
114 ///<summary>Retrieve the first item from the queue, or block if none available</summary>
116 ///Callers of Dequeue() will block if no items are available
117 ///until some other thread calls Enqueue() or the queue is
118 ///closed. In the latter case this method will throw
119 ///EndOfStreamException.
121 public object Dequeue() {
123 while (m_queue.Count == 0) {
125 Monitor.Wait(m_queue);
127 return m_queue.Dequeue();
131 ///<summary>Retrieve the first item from the queue, or return
132 ///defaultValue immediately if no items are
133 ///available</summary>
136 /// If one or more objects are present in the queue at the
137 /// time of the call, the first item is removed from the queue
138 /// and returned. Otherwise, the defaultValue that was passed
139 /// in is returned immediately. This defaultValue may be null,
140 /// or in cases where null is part of the range of the queue,
141 /// may be some other sentinel object. The difference between
142 /// DequeueNoWait() and Dequeue() is that DequeueNoWait() will
143 /// not block when no items are available in the queue,
144 /// whereas Dequeue() will.
147 /// If at the time of call the queue is empty and in a
148 /// closed state (following a call to Close()), then this
149 /// method will throw EndOfStreamException.
152 public object DequeueNoWait(object defaultValue) {
154 if (m_queue.Count == 0) {
158 return m_queue.Dequeue();
163 ///<summary>Retrieve the first item from the queue, or return
164 ///nothing if no items are available after the given
168 /// If one or more items are present on the queue at the time
169 /// the call is made, the call will return
170 /// immediately. Otherwise, the calling thread blocks until
171 /// either an item appears on the queue, or
172 /// millisecondsTimeout milliseconds have elapsed.
175 /// Returns true in the case that an item was available before
176 /// the timeout, in which case the out parameter "result" is
177 /// set to the item itself.
180 /// If no items were available before the timeout, returns
181 /// false, and sets "result" to null.
184 /// A timeout of -1 (i.e. System.Threading.Timeout.Infinite)
185 /// will be interpreted as a command to wait for an
186 /// indefinitely long period of time for an item to become
187 /// available. Usage of such a timeout is equivalent to
188 /// calling Dequeue() with no arguments. See also the MSDN
189 /// documentation for
190 /// System.Threading.Monitor.Wait(object,int).
193 /// If no items are present and the queue is in a closed
194 /// state, or if at any time while waiting the queue
195 /// transitions to a closed state (by a call to Close()), this
196 /// method will throw EndOfStreamException.
199 public bool Dequeue(int millisecondsTimeout, out object result) {
200 if (millisecondsTimeout == Timeout.Infinite) {
205 DateTime startTime = DateTime.Now;
207 while (m_queue.Count == 0) {
209 int elapsedTime = (int) ((DateTime.Now - startTime).TotalMilliseconds);
210 int remainingTime = millisecondsTimeout - elapsedTime;
211 if (remainingTime <= 0) {
216 Monitor.Wait(m_queue, remainingTime);
219 result = m_queue.Dequeue();
224 ///<summary>Implementation of the IEnumerable interface, for
225 ///permitting SharedQueue to be used in foreach
227 IEnumerator IEnumerable.GetEnumerator() {
228 return new SharedQueueEnumerator(this);
233 ///<summary>Implementation of the IEnumerator interface, for
234 ///permitting SharedQueue to be used in foreach loops.</summary>
235 public class SharedQueueEnumerator : IEnumerator {
237 protected SharedQueue m_queue;
238 protected object m_current;
240 ///<summary>Construct an enumerator for the given
241 ///SharedQueue.</summary>
242 public SharedQueueEnumerator(SharedQueue queue) {
246 object IEnumerator.Current {
248 if (m_current == null) {
249 throw new InvalidOperationException();
255 bool IEnumerator.MoveNext() {
257 m_current = m_queue.Dequeue();
259 } catch (EndOfStreamException) {
265 ///<summary>Reset()ting a SharedQueue doesn't make sense, so
266 ///this method always throws
267 ///InvalidOperationException.</summary>
268 void IEnumerator.Reset() {
269 throw new InvalidOperationException("SharedQueue.Reset() does not make sense");