1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 // ProducerConsumerQueues.cs
9 // Specialized producer/consumer queues.
12 // ************<IMPORTANT NOTE>*************
14 // There are two exact copies of this file:
15 // src\ndp\clr\src\bcl\system\threading\tasks\producerConsumerQueue.cs
16 // src\ndp\fx\src\dataflow\system\threading\tasks\dataflow\internal\producerConsumerQueue.cs
17 // Keep both of them consistent by changing the other file when you change this one, also avoid:
18 // 1- To reference internal types in mscorlib
19 // 2- To reference any dataflow specific types
20 // This should be fixed post Dev11 when this class becomes public.
22 // ************</IMPORTANT NOTE>*************
23 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
25 using System.Collections;
26 #if CONCURRENT_COLLECTIONS
27 using System.Collections.Concurrent;
29 using System.Threading.Tasks.Dataflow.Internal.Collections;
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Diagnostics.Contracts;
34 using System.Runtime.InteropServices;
36 namespace System.Threading.Tasks
38 /// <summary>Represents a producer/consumer queue used internally by dataflow blocks.</summary>
39 /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
40 internal interface IProducerConsumerQueue<T> : IEnumerable<T>
42 /// <summary>Enqueues an item into the queue.</summary>
43 /// <param name="item">The item to enqueue.</param>
44 /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
47 /// <summary>Attempts to dequeue an item from the queue.</summary>
48 /// <param name="result">The dequeued item.</param>
49 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
50 /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
51 bool TryDequeue(out T result);
53 /// <summary>Gets whether the collection is currently empty.</summary>
54 /// <remarks>This method may or may not be thread-safe.</remarks>
57 /// <summary>Gets the number of items in the collection.</summary>
58 /// <remarks>In many implementations, this method will not be thread-safe.</remarks>
61 /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
62 /// <param name="syncObj">The sync object used to lock</param>
63 /// <returns>The collection count</returns>
64 int GetCountSafe(object syncObj);
68 /// Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently.
70 /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
71 [DebuggerDisplay("Count = {Count}")]
72 internal sealed class MultiProducerMultiConsumerQueue<T> : ConcurrentQueue<T>, IProducerConsumerQueue<T>
74 /// <summary>Enqueues an item into the queue.</summary>
75 /// <param name="item">The item to enqueue.</param>
76 void IProducerConsumerQueue<T>.Enqueue(T item) { base.Enqueue(item); }
78 /// <summary>Attempts to dequeue an item from the queue.</summary>
79 /// <param name="result">The dequeued item.</param>
80 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
81 bool IProducerConsumerQueue<T>.TryDequeue(out T result) { return base.TryDequeue(out result); }
83 /// <summary>Gets whether the collection is currently empty.</summary>
84 bool IProducerConsumerQueue<T>.IsEmpty { get { return base.IsEmpty; } }
86 /// <summary>Gets the number of items in the collection.</summary>
87 int IProducerConsumerQueue<T>.Count { get { return base.Count; } }
89 /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
90 /// <remarks>ConcurrentQueue.Count is thread safe, no need to acquire the lock.</remarks>
91 int IProducerConsumerQueue<T>.GetCountSafe(object syncObj) { return base.Count; }
95 /// Provides a producer/consumer queue safe to be used by only one producer and one consumer concurrently.
97 /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
98 [DebuggerDisplay("Count = {Count}")]
99 [DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))]
100 internal sealed class SingleProducerSingleConsumerQueue<T> : IProducerConsumerQueue<T>
104 // SingleProducerSingleConsumerQueue (SPSCQueue) is a concurrent queue designed to be used
105 // by one producer thread and one consumer thread. SPSCQueue does not work correctly when used by
106 // multiple producer threads concurrently or multiple consumer threads concurrently.
108 // SPSCQueue is based on segments that behave like circular buffers. Each circular buffer is represented
109 // as an array with two indexes: _first and _last. _first is the index of the array slot for the consumer
110 // to read next, and _last is the slot for the producer to write next. The circular buffer is empty when
111 // (_first == _last), and full when ((_last+1) % _array.Length == _first).
113 // Since _first is only ever modified by the consumer thread and _last by the producer, the two indices can
114 // be updated without interlocked operations. As long as the queue size fits inside a single circular buffer,
115 // enqueues and dequeues simply advance the corresponding indices around the circular buffer. If an enqueue finds
116 // that there is no room in the existing buffer, however, a new circular buffer is allocated that is twice as big
117 // as the old buffer. From then on, the producer will insert values into the new buffer. The consumer will first
118 // empty out the old buffer and only then follow the producer into the new (larger) buffer.
120 // As described above, the enqueue operation on the fast path only modifies the _first field of the current segment.
121 // However, it also needs to read _last in order to verify that there is room in the current segment. Similarly, the
122 // dequeue operation on the fast path only needs to modify _last, but also needs to read _first to verify that the
123 // queue is non-empty. This results in true cache line sharing between the producer and the consumer.
125 // The cache line sharing issue can be mitigating by having a possibly stale copy of _first that is owned by the producer,
126 // and a possibly stale copy of _last that is owned by the consumer. So, the consumer state is described using
127 // (_first, _lastCopy) and the producer state using (_firstCopy, _last). The consumer state is separated from
128 // the producer state by padding, which allows fast-path enqueues and dequeues from hitting shared cache lines.
129 // _lastCopy is the consumer's copy of _last. Whenever the consumer can tell that there is room in the buffer
130 // simply by observing _lastCopy, the consumer thread does not need to read _last and thus encounter a cache miss. Only
131 // when the buffer appears to be empty will the consumer refresh _lastCopy from _last. _firstCopy is used by the producer
132 // in the same way to avoid reading _first on the hot path.
134 /// <summary>The initial size to use for segments (in number of elements).</summary>
135 private const int INIT_SEGMENT_SIZE = 32; // must be a power of 2
136 /// <summary>The maximum size to use for segments (in number of elements).</summary>
137 private const int MAX_SEGMENT_SIZE = 0x1000000; // this could be made as large as Int32.MaxValue / 2
139 /// <summary>The head of the linked list of segments.</summary>
140 private volatile Segment _head;
141 /// <summary>The tail of the linked list of segments.</summary>
142 private volatile Segment _tail;
144 /// <summary>Initializes the queue.</summary>
145 internal SingleProducerSingleConsumerQueue()
147 // Validate constants in ctor rather than in an explicit cctor that would cause perf degradation
148 Debug.Assert(INIT_SEGMENT_SIZE > 0, "Initial segment size must be > 0.");
149 Debug.Assert((INIT_SEGMENT_SIZE & (INIT_SEGMENT_SIZE - 1)) == 0, "Initial segment size must be a power of 2");
150 Debug.Assert(INIT_SEGMENT_SIZE <= MAX_SEGMENT_SIZE, "Initial segment size should be <= maximum.");
151 Debug.Assert(MAX_SEGMENT_SIZE < Int32.MaxValue / 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur.");
153 // Initialize the queue
154 _head = _tail = new Segment(INIT_SEGMENT_SIZE);
157 /// <summary>Enqueues an item into the queue.</summary>
158 /// <param name="item">The item to enqueue.</param>
159 public void Enqueue(T item)
161 Segment segment = _tail;
162 T[] array = segment._array;
163 int last = segment._state._last; // local copy to avoid multiple volatile reads
165 // Fast path: there's obviously room in the current segment
166 int tail2 = (last + 1) & (array.Length - 1);
167 if (tail2 != segment._state._firstCopy)
170 segment._state._last = tail2;
172 // Slow path: there may not be room in the current segment.
173 else EnqueueSlow(item, ref segment);
176 /// <summary>Enqueues an item into the queue.</summary>
177 /// <param name="item">The item to enqueue.</param>
178 /// <param name="segment">The segment in which to first attempt to store the item.</param>
179 private void EnqueueSlow(T item, ref Segment segment)
181 Contract.Requires(segment != null, "Expected a non-null segment.");
183 if (segment._state._firstCopy != segment._state._first)
185 segment._state._firstCopy = segment._state._first;
186 Enqueue(item); // will only recur once for this enqueue operation
190 int newSegmentSize = _tail._array.Length << 1; // double size
191 Debug.Assert(newSegmentSize > 0, "The max size should always be small enough that we don't overflow.");
192 if (newSegmentSize > MAX_SEGMENT_SIZE) newSegmentSize = MAX_SEGMENT_SIZE;
194 var newSegment = new Segment(newSegmentSize);
195 newSegment._array[0] = item;
196 newSegment._state._last = 1;
197 newSegment._state._lastCopy = 1;
202 // Finally block to protect against corruption due to a thread abort
203 // between setting _next and setting _tail.
204 Volatile.Write(ref _tail._next, newSegment); // ensure segment not published until item is fully stored
209 /// <summary>Attempts to dequeue an item from the queue.</summary>
210 /// <param name="result">The dequeued item.</param>
211 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
212 public bool TryDequeue(out T result)
214 Segment segment = _head;
215 T[] array = segment._array;
216 int first = segment._state._first; // local copy to avoid multiple volatile reads
218 // Fast path: there's obviously data available in the current segment
219 if (first != segment._state._lastCopy)
221 result = array[first];
222 array[first] = default(T); // Clear the slot to release the element
223 segment._state._first = (first + 1) & (array.Length - 1);
226 // Slow path: there may not be data available in the current segment
227 else return TryDequeueSlow(ref segment, ref array, out result);
230 /// <summary>Attempts to dequeue an item from the queue.</summary>
231 /// <param name="array">The array from which the item was dequeued.</param>
232 /// <param name="segment">The segment from which the item was dequeued.</param>
233 /// <param name="result">The dequeued item.</param>
234 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
235 private bool TryDequeueSlow(ref Segment segment, ref T[] array, out T result)
237 Contract.Requires(segment != null, "Expected a non-null segment.");
238 Contract.Requires(array != null, "Expected a non-null item array.");
240 if (segment._state._last != segment._state._lastCopy)
242 segment._state._lastCopy = segment._state._last;
243 return TryDequeue(out result); // will only recur once for this dequeue operation
246 if (segment._next != null && segment._state._first == segment._state._last)
248 segment = segment._next;
249 array = segment._array;
253 int first = segment._state._first; // local copy to avoid extraneous volatile reads
255 if (first == segment._state._last)
261 result = array[first];
262 array[first] = default(T); // Clear the slot to release the element
263 segment._state._first = (first + 1) & (segment._array.Length - 1);
264 segment._state._lastCopy = segment._state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy
269 /// <summary>Attempts to peek at an item in the queue.</summary>
270 /// <param name="result">The peeked item.</param>
271 /// <returns>true if an item could be peeked; otherwise, false.</returns>
272 public bool TryPeek(out T result)
274 Segment segment = _head;
275 T[] array = segment._array;
276 int first = segment._state._first; // local copy to avoid multiple volatile reads
278 // Fast path: there's obviously data available in the current segment
279 if (first != segment._state._lastCopy)
281 result = array[first];
284 // Slow path: there may not be data available in the current segment
285 else return TryPeekSlow(ref segment, ref array, out result);
288 /// <summary>Attempts to peek at an item in the queue.</summary>
289 /// <param name="array">The array from which the item is peeked.</param>
290 /// <param name="segment">The segment from which the item is peeked.</param>
291 /// <param name="result">The peeked item.</param>
292 /// <returns>true if an item could be peeked; otherwise, false.</returns>
293 private bool TryPeekSlow(ref Segment segment, ref T[] array, out T result)
295 Contract.Requires(segment != null, "Expected a non-null segment.");
296 Contract.Requires(array != null, "Expected a non-null item array.");
298 if (segment._state._last != segment._state._lastCopy)
300 segment._state._lastCopy = segment._state._last;
301 return TryPeek(out result); // will only recur once for this peek operation
304 if (segment._next != null && segment._state._first == segment._state._last)
306 segment = segment._next;
307 array = segment._array;
311 int first = segment._state._first; // local copy to avoid extraneous volatile reads
313 if (first == segment._state._last)
319 result = array[first];
323 /// <summary>Attempts to dequeue an item from the queue.</summary>
324 /// <param name="predicate">The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.</param>
325 /// <param name="result">The dequeued item.</param>
326 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
327 public bool TryDequeueIf(Predicate<T> predicate, out T result)
329 Segment segment = _head;
330 T[] array = segment._array;
331 int first = segment._state._first; // local copy to avoid multiple volatile reads
333 // Fast path: there's obviously data available in the current segment
334 if (first != segment._state._lastCopy)
336 result = array[first];
337 if (predicate == null || predicate(result))
339 array[first] = default(T); // Clear the slot to release the element
340 segment._state._first = (first + 1) & (array.Length - 1);
349 // Slow path: there may not be data available in the current segment
350 else return TryDequeueIfSlow(predicate, ref segment, ref array, out result);
353 /// <summary>Attempts to dequeue an item from the queue.</summary>
354 /// <param name="predicate">The predicate that must return true for the item to be dequeued. If null, all items implicitly return true.</param>
355 /// <param name="array">The array from which the item was dequeued.</param>
356 /// <param name="segment">The segment from which the item was dequeued.</param>
357 /// <param name="result">The dequeued item.</param>
358 /// <returns>true if an item could be dequeued; otherwise, false.</returns>
359 private bool TryDequeueIfSlow(Predicate<T> predicate, ref Segment segment, ref T[] array, out T result)
361 Contract.Requires(segment != null, "Expected a non-null segment.");
362 Contract.Requires(array != null, "Expected a non-null item array.");
364 if (segment._state._last != segment._state._lastCopy)
366 segment._state._lastCopy = segment._state._last;
367 return TryDequeueIf(predicate, out result); // will only recur once for this dequeue operation
370 if (segment._next != null && segment._state._first == segment._state._last)
372 segment = segment._next;
373 array = segment._array;
377 int first = segment._state._first; // local copy to avoid extraneous volatile reads
379 if (first == segment._state._last)
385 result = array[first];
386 if (predicate == null || predicate(result))
388 array[first] = default(T); // Clear the slot to release the element
389 segment._state._first = (first + 1) & (segment._array.Length - 1);
390 segment._state._lastCopy = segment._state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy
403 while (TryDequeue(out ignored)) ;
406 /// <summary>Gets whether the collection is currently empty.</summary>
407 /// <remarks>WARNING: This should not be used concurrently without further vetting.</remarks>
410 // This implementation is optimized for calls from the consumer.
413 Segment head = _head;
414 if (head._state._first != head._state._lastCopy) return false; // _first is volatile, so the read of _lastCopy cannot get reordered
415 if (head._state._first != head._state._last) return false;
416 return head._next == null;
420 /// <summary>Gets an enumerable for the collection.</summary>
421 /// <remarks>WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.</remarks>
422 public IEnumerator<T> GetEnumerator()
424 for (Segment segment = _head; segment != null; segment = segment._next)
426 for (int pt = segment._state._first;
427 pt != segment._state._last;
428 pt = (pt + 1) & (segment._array.Length - 1))
430 yield return segment._array[pt];
434 /// <summary>Gets an enumerable for the collection.</summary>
435 /// <remarks>WARNING: This should only be used for debugging purposes. It is not safe to be used concurrently.</remarks>
436 IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
438 /// <summary>Gets the number of items in the collection.</summary>
439 /// <remarks>WARNING: This should only be used for debugging purposes. It is not meant to be used concurrently.</remarks>
445 for (Segment segment = _head; segment != null; segment = segment._next)
447 int arraySize = segment._array.Length;
449 while (true) // Count is not meant to be used concurrently, but this helps to avoid issues if it is
451 first = segment._state._first;
452 last = segment._state._last;
453 if (first == segment._state._first) break;
455 count += (last - first) & (arraySize - 1);
461 /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
462 /// <remarks>The Count is not thread safe, so we need to acquire the lock.</remarks>
463 int IProducerConsumerQueue<T>.GetCountSafe(object syncObj)
465 Debug.Assert(syncObj != null, "The syncObj parameter is null.");
472 /// <summary>A segment in the queue containing one or more items.</summary>
473 [StructLayout(LayoutKind.Sequential)]
474 private sealed class Segment
476 /// <summary>The next segment in the linked list of segments.</summary>
477 internal Segment _next;
478 /// <summary>The data stored in this segment.</summary>
479 internal readonly T[] _array;
480 /// <summary>Details about the segment.</summary>
481 internal SegmentState _state; // separated out to enable StructLayout attribute to take effect
483 /// <summary>Initializes the segment.</summary>
484 /// <param name="size">The size to use for this segment.</param>
485 internal Segment(int size)
487 Contract.Requires((size & (size - 1)) == 0, "Size must be a power of 2");
488 _array = new T[size];
492 /// <summary>Stores information about a segment.</summary>
493 [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing
494 private struct SegmentState
496 /// <summary>Padding to reduce false sharing between the segment's array and _first.</summary>
497 internal PaddingFor32 _pad0;
499 /// <summary>The index of the current head in the segment.</summary>
500 internal volatile int _first;
501 /// <summary>A copy of the current tail index.</summary>
502 internal int _lastCopy; // not volatile as read and written by the producer, except for IsEmpty, and there _lastCopy is only read after reading the volatile _first
504 /// <summary>Padding to reduce false sharing between the first and last.</summary>
505 internal PaddingFor32 _pad1;
507 /// <summary>A copy of the current head index.</summary>
508 internal int _firstCopy; // not volatile as only read and written by the consumer thread
509 /// <summary>The index of the current tail in the segment.</summary>
510 internal volatile int _last;
512 /// <summary>Padding to reduce false sharing with the last and what's after the segment.</summary>
513 internal PaddingFor32 _pad2;
516 /// <summary>Debugger type proxy for a SingleProducerSingleConsumerQueue of T.</summary>
517 private sealed class SingleProducerSingleConsumerQueue_DebugView
519 /// <summary>The queue being visualized.</summary>
520 private readonly SingleProducerSingleConsumerQueue<T> _queue;
522 /// <summary>Initializes the debug view.</summary>
523 /// <param name="queue">The queue being debugged.</param>
524 public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue<T> queue)
526 Contract.Requires(queue != null, "Expected a non-null queue.");
530 /// <summary>Gets the contents of the list.</summary>
531 [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
536 List<T> list = new List<T>();
537 foreach (T item in _queue)
539 return list.ToArray();
546 /// <summary>A placeholder class for common padding constants and eventually routines.</summary>
547 static class PaddingHelpers
549 /// <summary>A size greater than or equal to the size of the most common CPU cache lines.</summary>
550 internal const int CACHE_LINE_SIZE = 128;
553 /// <summary>Padding structure used to minimize false sharing in SingleProducerSingleConsumerQueue{T}.</summary>
554 [StructLayout(LayoutKind.Explicit, Size = PaddingHelpers.CACHE_LINE_SIZE - sizeof(Int32))] // Based on common case of 64-byte cache lines