2010-06-23: Michael Barker <mike@middlesoft.co.uk>
[mono.git] / mcs / class / RabbitMQ.Client / src / util / SharedQueue.cs
1 // This source code is dual-licensed under the Apache License, version
2 // 2.0, and the Mozilla Public License, version 1.1.
3 //
4 // The APL v2.0:
5 //
6 //---------------------------------------------------------------------------
7 //   Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial
8 //   Technologies LLC., and Rabbit Technologies Ltd.
9 //
10 //   Licensed under the Apache License, Version 2.0 (the "License");
11 //   you may not use this file except in compliance with the License.
12 //   You may obtain a copy of the License at
13 //
14 //       http://www.apache.org/licenses/LICENSE-2.0
15 //
16 //   Unless required by applicable law or agreed to in writing, software
17 //   distributed under the License is distributed on an "AS IS" BASIS,
18 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 //   See the License for the specific language governing permissions and
20 //   limitations under the License.
21 //---------------------------------------------------------------------------
22 //
23 // The MPL v1.1:
24 //
25 //---------------------------------------------------------------------------
26 //   The contents of this file are subject to the Mozilla Public License
27 //   Version 1.1 (the "License"); you may not use this file except in
28 //   compliance with the License. You may obtain a copy of the License at
29 //   http://www.rabbitmq.com/mpl.html
30 //
31 //   Software distributed under the License is distributed on an "AS IS"
32 //   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33 //   License for the specific language governing rights and limitations
34 //   under the License.
35 //
36 //   The Original Code is The RabbitMQ .NET Client.
37 //
38 //   The Initial Developers of the Original Code are LShift Ltd,
39 //   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
40 //
41 //   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42 //   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43 //   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44 //   Technologies LLC, and Rabbit Technologies Ltd.
45 //
46 //   Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
47 //   Ltd. Portions created by Cohesive Financial Technologies LLC are
48 //   Copyright (C) 2007-2010 Cohesive Financial Technologies
49 //   LLC. Portions created by Rabbit Technologies Ltd are Copyright
50 //   (C) 2007-2010 Rabbit Technologies Ltd.
51 //
52 //   All Rights Reserved.
53 //
54 //   Contributor(s): ______________________________________.
55 //
56 //---------------------------------------------------------------------------
57 using System;
58 using System.IO;
59 using System.Collections;
60 using System.Threading;
61
62 namespace RabbitMQ.Util {
63     ///<summary>A thread-safe shared queue implementation.</summary>
64     public class SharedQueue : IEnumerable {
65         ///<summary>The shared queue.</summary>
66         ///<remarks>
67         ///Subclasses must ensure appropriate locking discipline when
68         ///accessing this field. See the implementation of Enqueue,
69         ///Dequeue.
70         ///</remarks>
71         protected Queue m_queue = new Queue();
72
73         ///<summary>Flag holding our current status.</summary>
74         protected bool m_isOpen = true;
75
76         ///<summary>Construct a fresh, empty SharedQueue.</summary>
77         public SharedQueue() {
78         }
79
80         ///<summary>Close the queue. Causes all further Enqueue()
81         ///operations to throw EndOfStreamException, and all pending
82         ///or subsequent Dequeue() operations to throw an
83         ///EndOfStreamException once the queue is empty.</summary>
84         public void Close() {
85             lock (m_queue) {
86                 m_isOpen = false;
87                 Monitor.PulseAll(m_queue);
88             }
89         }
90
91         ///<summary>Call only when the lock on m_queue is held.</summary>
92         /// <exception cref="EndOfStreamException" />
93         private void EnsureIsOpen() {
94             if (!m_isOpen) {
95                 throw new EndOfStreamException("SharedQueue closed");
96             }
97         }
98
99         ///<summary>Place an item at the end of the queue.</summary>
100         ///<remarks>
101         ///If there is a thread waiting for an item to arrive, the
102         ///waiting thread will be woken, and the newly Enqueued item
103         ///will be passed to it. If the queue is closed on entry to
104         ///this method, EndOfStreamException will be thrown.
105         ///</remarks>
106         public void Enqueue(object o) {
107             lock (m_queue) {
108                 EnsureIsOpen();
109                 m_queue.Enqueue(o);
110                 Monitor.Pulse(m_queue);
111             }
112         }
113     
114         ///<summary>Retrieve the first item from the queue, or block if none available</summary>
115         ///<remarks>
116         ///Callers of Dequeue() will block if no items are available
117         ///until some other thread calls Enqueue() or the queue is
118         ///closed. In the latter case this method will throw
119         ///EndOfStreamException.
120         ///</remarks>
121         public object Dequeue() {
122             lock (m_queue) {
123                 while (m_queue.Count == 0) {
124                     EnsureIsOpen();
125                     Monitor.Wait(m_queue);
126                 }
127                 return m_queue.Dequeue();
128             }
129         }
130
131         ///<summary>Retrieve the first item from the queue, or return
132         ///defaultValue immediately if no items are
133         ///available</summary>
134         ///<remarks>
135         ///<para>
136         /// If one or more objects are present in the queue at the
137         /// time of the call, the first item is removed from the queue
138         /// and returned. Otherwise, the defaultValue that was passed
139         /// in is returned immediately. This defaultValue may be null,
140         /// or in cases where null is part of the range of the queue,
141         /// may be some other sentinel object. The difference between
142         /// DequeueNoWait() and Dequeue() is that DequeueNoWait() will
143         /// not block when no items are available in the queue,
144         /// whereas Dequeue() will.
145         ///</para>
146         ///<para>
147         /// If at the time of call the queue is empty and in a
148         /// closed state (following a call to Close()), then this
149         /// method will throw EndOfStreamException.
150         ///</para>
151         ///</remarks>
152         public object DequeueNoWait(object defaultValue) {
153             lock (m_queue) {
154                 if (m_queue.Count == 0) {
155                     EnsureIsOpen();
156                     return defaultValue;
157                 } else {
158                     return m_queue.Dequeue();
159                 }
160             }
161         }
162
163         ///<summary>Retrieve the first item from the queue, or return
164         ///nothing if no items are available after the given
165         ///timeout</summary>
166         ///<remarks>
167         ///<para>
168         /// If one or more items are present on the queue at the time
169         /// the call is made, the call will return
170         /// immediately. Otherwise, the calling thread blocks until
171         /// either an item appears on the queue, or
172         /// millisecondsTimeout milliseconds have elapsed.
173         ///</para>
174         ///<para>
175         /// Returns true in the case that an item was available before
176         /// the timeout, in which case the out parameter "result" is
177         /// set to the item itself.
178         ///</para>
179         ///<para>
180         /// If no items were available before the timeout, returns
181         /// false, and sets "result" to null.
182         ///</para>
183         ///<para>
184         /// A timeout of -1 (i.e. System.Threading.Timeout.Infinite)
185         /// will be interpreted as a command to wait for an
186         /// indefinitely long period of time for an item to become
187         /// available. Usage of such a timeout is equivalent to
188         /// calling Dequeue() with no arguments. See also the MSDN
189         /// documentation for
190         /// System.Threading.Monitor.Wait(object,int).
191         ///</para>
192         ///<para>
193         /// If no items are present and the queue is in a closed
194         /// state, or if at any time while waiting the queue
195         /// transitions to a closed state (by a call to Close()), this
196         /// method will throw EndOfStreamException.
197         ///</para>
198         ///</remarks>
199         public bool Dequeue(int millisecondsTimeout, out object result) {
200             if (millisecondsTimeout == Timeout.Infinite) {
201                 result = Dequeue();
202                 return true;
203             }
204
205             DateTime startTime = DateTime.Now;
206             lock (m_queue) {
207                 while (m_queue.Count == 0) {
208                     EnsureIsOpen();
209                     int elapsedTime = (int) ((DateTime.Now - startTime).TotalMilliseconds);
210                     int remainingTime = millisecondsTimeout - elapsedTime;
211                     if (remainingTime <= 0) {
212                         result = null;
213                         return false;
214                     }
215
216                     Monitor.Wait(m_queue, remainingTime);
217                 }
218
219                 result = m_queue.Dequeue();
220                 return true;
221             }
222         }
223
224         ///<summary>Implementation of the IEnumerable interface, for
225         ///permitting SharedQueue to be used in foreach
226         ///loops.</summary>
227         IEnumerator IEnumerable.GetEnumerator() {
228             return new SharedQueueEnumerator(this);
229         }
230
231     }
232
233     ///<summary>Implementation of the IEnumerator interface, for
234     ///permitting SharedQueue to be used in foreach loops.</summary>
235     public class SharedQueueEnumerator : IEnumerator {
236
237         protected SharedQueue m_queue;
238         protected object m_current;
239
240         ///<summary>Construct an enumerator for the given
241         ///SharedQueue.</summary>
242         public SharedQueueEnumerator(SharedQueue queue) {
243             m_queue = queue;
244         }
245
246         object IEnumerator.Current {
247             get {
248                 if (m_current == null) {
249                     throw new InvalidOperationException();
250                 }
251                 return m_current;
252             }
253         }
254
255         bool IEnumerator.MoveNext() {
256             try {
257                 m_current = m_queue.Dequeue();
258                 return true;
259             } catch (EndOfStreamException) {
260                 m_current = null;
261                 return false;
262             }
263         }
264
265         ///<summary>Reset()ting a SharedQueue doesn't make sense, so
266         ///this method always throws
267         ///InvalidOperationException.</summary>
268         void IEnumerator.Reset() {
269             throw new InvalidOperationException("SharedQueue.Reset() does not make sense");
270         }
271
272     }
273
274 }