Merge pull request #2247 from ivmai/match-ext-libgc-api
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Internal / ProducerConsumerQueues.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // ProducerConsumerQueues.cs
7 //
8 //
9 // Specialized producer/consumer queues.
10 //
11 //
12 // ************<IMPORTANT NOTE>*************
13 //
14 // There are two exact copies of this file:
15 //  src\ndp\clr\src\bcl\system\threading\tasks\producerConsumerQueue.cs
16 //  src\ndp\fx\src\dataflow\system\threading\tasks\dataflow\internal\producerConsumerQueue.cs
17 // Keep both of them consistent by changing the other file when you change this one, also avoid:
18 //  1- To reference internal types in mscorlib
19 //  2- To reference any dataflow specific types
20 // This should be fixed post Dev11 when this class becomes public.
21 //
22 // ************</IMPORTANT NOTE>*************
23 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
24
25 using System.Collections;
26 #if CONCURRENT_COLLECTIONS
27 using System.Collections.Concurrent;
28 #else
29 using System.Threading.Tasks.Dataflow.Internal.Collections;
30 #endif
31 using System.Collections.Generic;
32 using System.Diagnostics;
33 using System.Diagnostics.Contracts;
34 using System.Runtime.InteropServices;
35
36 namespace System.Threading.Tasks
37 {
38     /// <summary>Represents a producer/consumer queue used internally by dataflow blocks.</summary>
39     /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
40     internal interface IProducerConsumerQueue<T> : IEnumerable<T>
41     {
42         /// <summary>Enqueues an item into the queue.</summary>
43         /// <param name="item">The item to enqueue.</param>
44         /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
45         void Enqueue(T item);
46
47         /// <summary>Attempts to dequeue an item from the queue.</summary>
48         /// <param name="result">The dequeued item.</param>
49         /// <returns>true if an item could be dequeued; otherwise, false.</returns>
50         /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
51         bool TryDequeue(out T result);
52
53         /// <summary>Gets whether the collection is currently empty.</summary>
54         /// <remarks>This method may or may not be thread-safe.</remarks>
55         bool IsEmpty { get; }
56
57         /// <summary>Gets the number of items in the collection.</summary>
58         /// <remarks>In many implementations, this method will not be thread-safe.</remarks>
59         int Count { get; }
60
61         /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
62         /// <param name="syncObj">The sync object used to lock</param>
63         /// <returns>The collection count</returns>
64         int GetCountSafe(object syncObj);
65     }
66
67     /// <summary>
68     /// Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently.
69     /// </summary>
70     /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
71     [DebuggerDisplay("Count = {Count}")]
72     internal sealed class MultiProducerMultiConsumerQueue<T> : ConcurrentQueue<T>, IProducerConsumerQueue<T>
73     {
74         /// <summary>Enqueues an item into the queue.</summary>
75         /// <param name="item">The item to enqueue.</param>
76         void IProducerConsumerQueue<T>.Enqueue(T item) { base.Enqueue(item); }
77
78         /// <summary>Attempts to dequeue an item from the queue.</summary>
79         /// <param name="result">The dequeued item.</param>
80         /// <returns>true if an item could be dequeued; otherwise, false.</returns>
81         bool IProducerConsumerQueue<T>.TryDequeue(out T result) { return base.TryDequeue(out result); }
82
83         /// <summary>Gets whether the collection is currently empty.</summary>
84         bool IProducerConsumerQueue<T>.IsEmpty { get { return base.IsEmpty; } }
85
86         /// <summary>Gets the number of items in the collection.</summary>
87         int IProducerConsumerQueue<T>.Count { get { return base.Count; } }
88
89         /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
90         /// <remarks>ConcurrentQueue.Count is thread safe, no need to acquire the lock.</remarks>
91         int IProducerConsumerQueue<T>.GetCountSafe(object syncObj) { return base.Count; }
92     }
93
94     /// <summary>
95     /// Provides a producer/consumer queue safe to be used by only one producer and one consumer concurrently.
96     /// </summary>
97     /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
98     [DebuggerDisplay("Count = {Count}")]
99     [DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))]
100     internal sealed class SingleProducerSingleConsumerQueue<T> : IProducerConsumerQueue<T>
101     {
102         // Design:
103         //
104         // SingleProducerSingleConsumerQueue (SPSCQueue) is a concurrent queue designed to be used 
105         // by one producer thread and one consumer thread. SPSCQueue does not work correctly when used by 
106         // multiple producer threads concurrently or multiple consumer threads concurrently.
107         // 
108         // SPSCQueue is based on segments that behave like circular buffers. Each circular buffer is represented 
109         // as an array with two indexes: _first and _last. _first is the index of the array slot for the consumer 
110         // to read next, and _last is the slot for the producer to write next. The circular buffer is empty when 
111         // (_first == _last), and full when ((_last+1) % _array.Length == _first).
112         //
113         // Since _first is only ever modified by the consumer thread and _last by the producer, the two indices can 
114         // be updated without interlocked operations. As long as the queue size fits inside a single circular buffer, 
115         // enqueues and dequeues simply advance the corresponding indices around the circular buffer. If an enqueue finds 
116         // that there is no room in the existing buffer, however, a new circular buffer is allocated that is twice as big 
117         // as the old buffer. From then on, the producer will insert values into the new buffer. The consumer will first 
118         // empty out the old buffer and only then follow the producer into the new (larger) buffer.
119         //
120         // As described above, the enqueue operation on the fast path only modifies the _first field of the current segment. 
121         // However, it also needs to read _last in order to verify that there is room in the current segment. Similarly, the 
122         // dequeue operation on the fast path only needs to modify _last, but also needs to read _first to verify that the 
123         // queue is non-empty. This results in true cache line sharing between the producer and the consumer.
124         //
125         // The cache line sharing issue can be mitigating by having a possibly stale copy of _first that is owned by the producer, 
126         // and a possibly stale copy of _last that is owned by the consumer. So, the consumer state is described using 
127         // (_first, _lastCopy) and the producer state using (_firstCopy, _last). The consumer state is separated from 
128         // the producer state by padding, which allows fast-path enqueues and dequeues from hitting shared cache lines. 
129         // _lastCopy is the consumer's copy of _last. Whenever the consumer can tell that there is room in the buffer 
130         // simply by observing _lastCopy, the consumer thread does not need to read _last and thus encounter a cache miss. Only 
131         // when the buffer appears to be empty will the consumer refresh _lastCopy from _last. _firstCopy is used by the producer 
132         // in the same way to avoid reading _first on the hot path.
133
134         /// <summary>The initial size to use for segments (in number of elements).</summary>
135         private const int INIT_SEGMENT_SIZE = 32; // must be a power of 2
136         /// <summary>The maximum size to use for segments (in number of elements).</summary>
137         private const int MAX_SEGMENT_SIZE = 0x1000000; // this could be made as large as Int32.MaxValue / 2
138
139         /// <summary>The head of the linked list of segments.</summary>
140         private volatile Segment _head;
141         /// <summary>The tail of the linked list of segments.</summary>
142         private volatile Segment _tail;
143
144         /// <summary>Initializes the queue.</summary>
145         internal SingleProducerSingleConsumerQueue()
146         {
147             // Validate constants in ctor rather than in an explicit cctor that would cause perf degradation
148             Debug.Assert(INIT_SEGMENT_SIZE > 0, "Initial segment size must be > 0.");
149             Debug.Assert((INIT_SEGMENT_SIZE & (INIT_SEGMENT_SIZE - 1)) == 0, "Initial segment size must be a power of 2");
150             Debug.Assert(INIT_SEGMENT_SIZE <= MAX_SEGMENT_SIZE, "Initial segment size should be <= maximum.");
151             Debug.Assert(MAX_SEGMENT_SIZE < Int32.MaxValue / 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur.");
152
153             // Initialize the queue
154             _head = _tail = new Segment(INIT_SEGMENT_SIZE);
155         }
156
157         /// <summary>Enqueues an item into the queue.</summary>
158         /// <param name="item">The item to enqueue.</param>
159         public void Enqueue(T item)
160         {
161             Segment segment = _tail;
162             T[] array = segment._array;
163             int last = segment._state._last; // local copy to avoid multiple volatile reads
164
165             // Fast path: there's obviously room in the current segment
166             int tail2 = (last + 1) & (array.Length - 1);
167             if (tail2 != segment._state._firstCopy)
168             {
169                 array[last] = item;
170                 segment._state._last = tail2;
171             }
172             // Slow path: there may not be room in the current segment.
173             else EnqueueSlow(item, ref segment);
174         }
175
176         /// <summary>Enqueues an item into the queue.</summary>
177         /// <param name="item">The item to enqueue.</param>
178         /// <param name="segment">The segment in which to first attempt to store the item.</param>
179         private void EnqueueSlow(T item, ref Segment segment)
180         {
181             Contract.Requires(segment != null, "Expected a non-null segment.");
182
183             if (segment._state._firstCopy != segment._state._first)
184             {
185                 segment._state._firstCopy = segment._state._first;
186                 Enqueue(item); // will only recur once for this enqueue operation
187                 return;
188             }
189
190             int newSegmentSize = _tail._array.Length << 1; // double size
191             Debug.Assert(newSegmentSize > 0, "The max size should always be small enough that we don't overflow.");
192             if (newSegmentSize > MAX_SEGMENT_SIZE) newSegmentSize = MAX_SEGMENT_SIZE;
193
194             var newSegment = new Segment(newSegmentSize);
195             newSegment._array[0] = item;
196             newSegment._state._last = 1;
197             newSegment._state._lastCopy = 1;
198
199             try { }
200             finally
201             {
202                 // Finally block to protect against corruption due to a thread abort 
203                 // between setting _next and setting _tail.
204                 Volatile.Write(ref _tail._next, newSegment); // ensure segment not published until item is fully stored
205                 _tail = newSegment;
206             }
207         }
208
209         /// <summary>Attempts to dequeue an item from the queue.</summary>
210         /// <param name="result">The dequeued item.</param>
211         /// <returns>true if an item could be dequeued; otherwise, false.</returns>
212         public bool TryDequeue(out T result)
213         {
214             Segment segment = _head;
215             T[] array = segment._array;
216             int first = segment._state._first; // local copy to avoid multiple volatile reads
217
218             // Fast path: there's obviously data available in the current segment
219             if (first != segment._state._lastCopy)
220             {
221                 result = array[first];
222                 array[first] = default(T); // Clear the slot to release the element
223                 segment._state._first = (first + 1) & (array.Length - 1);
224                 return true;
225             }
226             // Slow path: there may not be data available in the current segment
227             else return TryDequeueSlow(ref segment, ref array, out result);
228         }
229
230         /// <summary>Attempts to dequeue an item from the queue.</summary>
231         /// <param name="array">The array from which the item was dequeued.</param>
232         /// <param name="segment">The segment from which the item was dequeued.</param>
233         /// <param name="result">The dequeued item.</param>
234         /// <returns>true if an item could be dequeued; otherwise, false.</returns>
235         private bool TryDequeueSlow(ref Segment segment, ref T[] array, out T result)
236         {
237             Contract.Requires(segment != null, "Expected a non-null segment.");
238             Contract.Requires(array != null, "Expected a non-null item array.");
239
240             if (segment._state._last != segment._state._lastCopy)
241             {
242                 segment._state._lastCopy = segment._state._last;
243                 return TryDequeue(out result); // will only recur once for this dequeue operation
244             }
245
246             if (segment._next != null && segment._state._first == segment._state._last)
247             {
248                 segment = segment._next;
249                 array = segment._array;
250                 _head = segment;
251             }
252
253             int first = segment._state._first; // local copy to avoid extraneous volatile reads
254
255             if (first == segment._state._last)
256             {
257                 result = default(T);
258                 return false;
259             }
260
261             result = array[first];
262             array[first] = default(T); // Clear the slot to release the element
263             segment._state._first = (first + 1) & (segment._array.Length - 1);
264             segment._state._lastCopy = segment._state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy
265
266             return true;
267         }
268
269         /// <summary>Attempts to peek at an item in the queue.</summary>
270         /// <param name="result">The peeked item.</param>
271         /// <returns>true if an item could be peeked; otherwise, false.</returns>
272         public bool TryPeek(out T result)
273         {
274             Segment segment = _head;
275             T[] array = segment._array;
276             int first = segment._state._first; // local copy to avoid multiple volatile reads
277
278             // Fast path: there's obviously data available in the current segment
279             if (first != segment._state._lastCopy)
280             {
281                 result = array[first];
282                 return true;
283             }
284             // Slow path: there may not be data available in the current segment
285             else return TryPeekSlow(ref segment, ref array, out result);
286         }
287
288         /// <summary>Attempts to peek at an item in the queue.</summary>
289         /// <param name="array">The array from which the item is peeked.</param>
290         /// <param name="segment">The segment from which the item is peeked.</param>
291         /// <param name="result">The peeked item.</param>
292         /// <returns>true if an item could be peeked; otherwise, false.</returns>
293         private bool TryPeekSlow(ref Segment segment, ref T[] array, out T result)
294         {
295             Contract.Requires(segment != null, "Expected a non-null segment.");
296             Contract.Requires(array != null, "Expected a non-null item array.");
297
298             if (segment._state._last != segment._state._lastCopy)
299             {
300                 segment._state._lastCopy = segment._state._last;
301                 return TryPeek(out result); // will only recur once for this peek operation
302             }
303
304             if (segment._next != null && segment._state._first == segment._state._last)
305             {
306                 segment = segment._next;
307                 array = segment._array;
308                 _head = segment;
309             }
310
311             int first = segment._state._first; // local copy to avoid extraneous volatile reads
312
313             if (first == segment._state._last)
314             {
315                 result = default(T);
316                 return false;
317             }
318
319             result = array[first];
320             return true;
321         }
322
323         /// <summary>Attempts to dequeue an item from the queue.</summary>
324         /// <param name="predicate">The predicate that must return true for the item to be dequeued.  If null, all items implicitly return true.</param>
325         /// <param name="result">The dequeued item.</param>
326         /// <returns>true if an item could be dequeued; otherwise, false.</returns>
327         public bool TryDequeueIf(Predicate<T> predicate, out T result)
328         {
329             Segment segment = _head;
330             T[] array = segment._array;
331             int first = segment._state._first; // local copy to avoid multiple volatile reads
332
333             // Fast path: there's obviously data available in the current segment
334             if (first != segment._state._lastCopy)
335             {
336                 result = array[first];
337                 if (predicate == null || predicate(result))
338                 {
339                     array[first] = default(T); // Clear the slot to release the element
340                     segment._state._first = (first + 1) & (array.Length - 1);
341                     return true;
342                 }
343                 else
344                 {
345                     result = default(T);
346                     return false;
347                 }
348             }
349             // Slow path: there may not be data available in the current segment
350             else return TryDequeueIfSlow(predicate, ref segment, ref array, out result);
351         }
352
353         /// <summary>Attempts to dequeue an item from the queue.</summary>
354         /// <param name="predicate">The predicate that must return true for the item to be dequeued.  If null, all items implicitly return true.</param>
355         /// <param name="array">The array from which the item was dequeued.</param>
356         /// <param name="segment">The segment from which the item was dequeued.</param>
357         /// <param name="result">The dequeued item.</param>
358         /// <returns>true if an item could be dequeued; otherwise, false.</returns>
359         private bool TryDequeueIfSlow(Predicate<T> predicate, ref Segment segment, ref T[] array, out T result)
360         {
361             Contract.Requires(segment != null, "Expected a non-null segment.");
362             Contract.Requires(array != null, "Expected a non-null item array.");
363
364             if (segment._state._last != segment._state._lastCopy)
365             {
366                 segment._state._lastCopy = segment._state._last;
367                 return TryDequeueIf(predicate, out result); // will only recur once for this dequeue operation
368             }
369
370             if (segment._next != null && segment._state._first == segment._state._last)
371             {
372                 segment = segment._next;
373                 array = segment._array;
374                 _head = segment;
375             }
376
377             int first = segment._state._first; // local copy to avoid extraneous volatile reads
378
379             if (first == segment._state._last)
380             {
381                 result = default(T);
382                 return false;
383             }
384
385             result = array[first];
386             if (predicate == null || predicate(result))
387             {
388                 array[first] = default(T); // Clear the slot to release the element
389                 segment._state._first = (first + 1) & (segment._array.Length - 1);
390                 segment._state._lastCopy = segment._state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy
391                 return true;
392             }
393             else
394             {
395                 result = default(T);
396                 return false;
397             }
398         }
399
400         public void Clear()
401         {
402             T ignored;
403             while (TryDequeue(out ignored)) ;
404         }
405
406         /// <summary>Gets whether the collection is currently empty.</summary>
407         /// <remarks>WARNING: This should not be used concurrently without further vetting.</remarks>
408         public bool IsEmpty
409         {
410             // This implementation is optimized for calls from the consumer.
411             get
412             {
413                 Segment head = _head;
414                 if (head._state._first != head._state._lastCopy) return false; // _first is volatile, so the read of _lastCopy cannot get reordered
415                 if (head._state._first != head._state._last) return false;
416                 return head._next == null;
417             }
418         }
419
420         /// <summary>Gets an enumerable for the collection.</summary>
421         /// <remarks>WARNING: This should only be used for debugging purposes.  It is not safe to be used concurrently.</remarks>
422         public IEnumerator<T> GetEnumerator()
423         {
424             for (Segment segment = _head; segment != null; segment = segment._next)
425             {
426                 for (int pt = segment._state._first;
427                     pt != segment._state._last;
428                     pt = (pt + 1) & (segment._array.Length - 1))
429                 {
430                     yield return segment._array[pt];
431                 }
432             }
433         }
434         /// <summary>Gets an enumerable for the collection.</summary>
435         /// <remarks>WARNING: This should only be used for debugging purposes.  It is not safe to be used concurrently.</remarks>
436         IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
437
438         /// <summary>Gets the number of items in the collection.</summary>
439         /// <remarks>WARNING: This should only be used for debugging purposes.  It is not meant to be used concurrently.</remarks>
440         public int Count
441         {
442             get
443             {
444                 int count = 0;
445                 for (Segment segment = _head; segment != null; segment = segment._next)
446                 {
447                     int arraySize = segment._array.Length;
448                     int first, last;
449                     while (true) // Count is not meant to be used concurrently, but this helps to avoid issues if it is
450                     {
451                         first = segment._state._first;
452                         last = segment._state._last;
453                         if (first == segment._state._first) break;
454                     }
455                     count += (last - first) & (arraySize - 1);
456                 }
457                 return count;
458             }
459         }
460
461         /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
462         /// <remarks>The Count is not thread safe, so we need to acquire the lock.</remarks>
463         int IProducerConsumerQueue<T>.GetCountSafe(object syncObj)
464         {
465             Debug.Assert(syncObj != null, "The syncObj parameter is null.");
466             lock (syncObj)
467             {
468                 return Count;
469             }
470         }
471
472         /// <summary>A segment in the queue containing one or more items.</summary>
473         [StructLayout(LayoutKind.Sequential)]
474         private sealed class Segment
475         {
476             /// <summary>The next segment in the linked list of segments.</summary>
477             internal Segment _next;
478             /// <summary>The data stored in this segment.</summary>
479             internal readonly T[] _array;
480             /// <summary>Details about the segment.</summary>
481             internal SegmentState _state; // separated out to enable StructLayout attribute to take effect
482
483             /// <summary>Initializes the segment.</summary>
484             /// <param name="size">The size to use for this segment.</param>
485             internal Segment(int size)
486             {
487                 Contract.Requires((size & (size - 1)) == 0, "Size must be a power of 2");
488                 _array = new T[size];
489             }
490         }
491
492         /// <summary>Stores information about a segment.</summary>
493         [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing
494         private struct SegmentState
495         {
496             /// <summary>Padding to reduce false sharing between the segment's array and _first.</summary>
497             internal PaddingFor32 _pad0;
498
499             /// <summary>The index of the current head in the segment.</summary>
500             internal volatile int _first;
501             /// <summary>A copy of the current tail index.</summary>
502             internal int _lastCopy; // not volatile as read and written by the producer, except for IsEmpty, and there _lastCopy is only read after reading the volatile _first
503
504             /// <summary>Padding to reduce false sharing between the first and last.</summary>
505             internal PaddingFor32 _pad1;
506
507             /// <summary>A copy of the current head index.</summary>
508             internal int _firstCopy; // not volatile as only read and written by the consumer thread
509             /// <summary>The index of the current tail in the segment.</summary>
510             internal volatile int _last;
511
512             /// <summary>Padding to reduce false sharing with the last and what's after the segment.</summary>
513             internal PaddingFor32 _pad2;
514         }
515
516         /// <summary>Debugger type proxy for a SingleProducerSingleConsumerQueue of T.</summary>
517         private sealed class SingleProducerSingleConsumerQueue_DebugView
518         {
519             /// <summary>The queue being visualized.</summary>
520             private readonly SingleProducerSingleConsumerQueue<T> _queue;
521
522             /// <summary>Initializes the debug view.</summary>
523             /// <param name="queue">The queue being debugged.</param>
524             public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue<T> queue)
525             {
526                 Contract.Requires(queue != null, "Expected a non-null queue.");
527                 _queue = queue;
528             }
529
530             /// <summary>Gets the contents of the list.</summary>
531             [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
532             public T[] Items
533             {
534                 get
535                 {
536                     List<T> list = new List<T>();
537                     foreach (T item in _queue)
538                         list.Add(item);
539                     return list.ToArray();
540                 }
541             }
542         }
543     }
544
545
546     /// <summary>A placeholder class for common padding constants and eventually routines.</summary>
547     static class PaddingHelpers
548     {
549         /// <summary>A size greater than or equal to the size of the most common CPU cache lines.</summary>
550         internal const int CACHE_LINE_SIZE = 128;
551     }
552
553     /// <summary>Padding structure used to minimize false sharing in SingleProducerSingleConsumerQueue{T}.</summary>
554     [StructLayout(LayoutKind.Explicit, Size = PaddingHelpers.CACHE_LINE_SIZE - sizeof(Int32))] // Based on common case of 64-byte cache lines
555     struct PaddingFor32
556     {
557     }
558 }