Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / Channels / AsynchronousChannel.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // AsynchronousOneToOneChannel.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Threading;
15 using System.Diagnostics.Contracts;
16
17 namespace System.Linq.Parallel
18 {
19     /// <summary>
20     /// This is a bounded channel meant for single-producer/single-consumer scenarios. 
21     /// </summary>
22     /// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
23     internal sealed class AsynchronousChannel<T> : IDisposable
24     {
25         // The producer will be blocked once the channel reaches a capacity, and unblocked
26         // as soon as a consumer makes room. A consumer can block waiting until a producer
27         // enqueues a new element. We use a chunking scheme to adjust the granularity and
28         // frequency of synchronization, e.g. by enqueueing/dequeueing N elements at a time.
29         // Because there is only ever a single producer and consumer, we are able to acheive
30         // efficient and low-overhead synchronization.
31         //
32         // In general, the buffer has four logical states:
33         //     FULL <--> OPEN <--> EMPTY <--> DONE
34         //
35         // Here is a summary of the state transitions and what they mean:
36         //     * OPEN:
37         //         A buffer starts in the OPEN state. When the buffer is in the READY state,
38         //         a consumer and producer can dequeue and enqueue new elements.
39         //     * OPEN->FULL:
40         //         A producer transitions the buffer from OPEN->FULL when it enqueues a chunk
41         //         that causes the buffer to reach capacity; a producer can no longer enqueue
42         //         new chunks when this happens, causing it to block.
43         //     * FULL->OPEN:
44         //         When the consumer takes a chunk from a FULL buffer, it transitions back from
45         //         FULL->OPEN and the producer is woken up.
46         //     * OPEN->EMPTY:
47         //         When the consumer takes the last chunk from a buffer, the buffer is
48         //         transitioned from OPEN->EMPTY; a consumer can no longer take new chunks,
49         //         causing it to block.
50         //     * EMPTY->OPEN:
51         //         Lastly, when the producer enqueues an item into an EMPTY buffer, it
52         //         transitions to the OPEN state. This causes any waiting consumers to wake up.
53         //     * EMPTY->DONE:
54         //         If the buffer is empty, and the producer is done enqueueing new
55         //         items, the buffer is DONE. There will be no more consumption or production.
56         //
57         // Assumptions:
58         //   There is only ever one producer and one consumer operating on this channel
59         //   concurrently. The internal synchronization cannot handle anything else.
60         //
61         //   ** WARNING ** WARNING ** WARNING ** WARNING ** WARNING ** WARNING ** WARNING **
62         //   VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV
63         //
64         //   There... got your attention now... just in case you didn't read the comments
65         //   very carefully above, this channel will deadlock, become corrupt, and generally
66         //   make you an unhappy camper if you try to use more than 1 producer or more than
67         //   1 consumer thread to access this thing concurrently. It's been carefully designed
68         //   to avoid locking, but only because of this restriction... 
69
70         private T[][] m_buffer;              // The buffer of chunks.
71         private readonly int m_index;            // Index of this channel
72         private volatile int m_producerBufferIndex;   // Producer's current index, i.e. where to put the next chunk.
73         private volatile int m_consumerBufferIndex;   // Consumer's current index, i.e. where to get the next chunk.
74
75         private volatile bool m_done;        // Set to true once the producer is done.
76
77         private T[] m_producerChunk;         // The temporary chunk being generated by the producer.
78         private int m_producerChunkIndex;    // A producer's index into its temporary chunk.
79         private T[] m_consumerChunk;         // The temporary chunk being enumerated by the consumer.
80         private int m_consumerChunkIndex;    // A consumer's index into its temporary chunk.
81
82         private int m_chunkSize;             // The number of elements that comprise a chunk.
83
84         // These events are used to signal a waiting producer when the consumer dequeues, and to signal a
85         // waiting consumer when the producer enqueues.
86         private ManualResetEventSlim m_producerEvent;
87         private IntValueEvent m_consumerEvent;
88
89         // These two-valued ints track whether a producer or consumer _might_ be waiting. They are marked
90         // volatile because they are used in synchronization critical regions of code (see usage below).
91         private volatile int m_producerIsWaiting;
92         private volatile int m_consumerIsWaiting;
93         private CancellationToken m_cancellationToken;
94
95         //-----------------------------------------------------------------------------------
96         // Initializes a new channel with the specific capacity and chunk size.
97         //
98         // Arguments:
99         //     orderingHelper - the ordering helper to use for order preservation
100         //     capacity   - the maximum number of elements before a producer blocks
101         //     chunkSize  - the granularity of chunking on enqueue/dequeue. 0 means default size.
102         //
103         // Notes:
104         //     The capacity represents the maximum number of chunks a channel can hold. That
105         //     means producers will actually block after enqueueing capacity*chunkSize
106         //     individual elements.
107         //
108
109         internal AsynchronousChannel(int index, int chunkSize, CancellationToken cancellationToken, IntValueEvent consumerEvent) :
110             this(index, Scheduling.DEFAULT_BOUNDED_BUFFER_CAPACITY, chunkSize, cancellationToken, consumerEvent)
111         {
112         }
113
114         internal AsynchronousChannel(int index, int capacity, int chunkSize, CancellationToken cancellationToken, IntValueEvent consumerEvent)
115         {
116             if (chunkSize == 0) chunkSize = Scheduling.GetDefaultChunkSize<T>();
117
118             Contract.Assert(chunkSize > 0, "chunk size must be greater than 0");
119             Contract.Assert(capacity > 1, "this impl doesn't support capacity of 1 or 0");
120
121             // Initialize a buffer with enough space to hold 'capacity' elements.
122             // We need one extra unused element as a sentinel to detect a full buffer,
123             // thus we add one to the capacity requested.
124             m_index = index;
125             m_buffer = new T[capacity + 1][];
126             m_producerBufferIndex = 0;
127             m_consumerBufferIndex = 0;
128
129             m_producerEvent = new ManualResetEventSlim();
130             m_consumerEvent = consumerEvent;
131             m_chunkSize = chunkSize;
132             m_producerChunk = new T[chunkSize];
133             m_producerChunkIndex = 0;
134             m_cancellationToken = cancellationToken;
135         }
136
137         //-----------------------------------------------------------------------------------
138         // Checks whether the buffer is full. If the consumer is calling this, they can be
139         // assured that a true value won't change before the consumer has a chance to dequeue
140         // elements. That's because only one consumer can run at once. A producer might see
141         // a true value, however, and then a consumer might transition to non-full, so it's
142         // not stable for them. Lastly, it's of course possible to see a false value when
143         // there really is a full queue, it's all dependent on small race conditions.
144         //
145
146         internal bool IsFull
147         {
148             get
149             {
150                 // Read the fields once. One of these is always stable, since the only threads
151                 // that call this are the 1 producer/1 consumer threads.
152                 int producerIndex = m_producerBufferIndex;
153                 int consumerIndex = m_consumerBufferIndex;
154
155
156                 // Two cases:
157                 //     1) Is the producer index one less than the consumer?
158                 //     2) The producer is at the end of the buffer and the consumer at the beginning.
159
160                 return (producerIndex == consumerIndex - 1) ||
161                     (consumerIndex == 0 && producerIndex == m_buffer.Length - 1);
162
163                 // Note to readers: you might have expected us to consider the case where
164                 // m_producerBufferIndex == m_buffer.Length && m_consumerBufferIndex == 1.
165                 // That is, a producer has gone off the end of the array, but is about to
166                 // wrap around to the 0th element again. We don't need this for a subtle
167                 // reason. It is SAFE for a consumer to think we are non-full when we
168                 // actually are full; it is NOT for a producer; but thankfully, there is
169                 // only one producer, and hence the producer will never see this seemingly
170                 // invalid state. Hence, we're fine producing a false negative. It's all
171                 // based on a race condition we have to deal with anyway.
172             }
173         }
174
175         //-----------------------------------------------------------------------------------
176         // Checks whether the buffer is empty. If the producer is calling this, they can be
177         // assured that a true value won't change before the producer has a chance to enqueue
178         // an item. That's because only one producer can run at once. A consumer might see
179         // a true value, however, and then a producer might transition to non-empty.
180         //
181
182         internal bool IsChunkBufferEmpty
183         {
184             get
185             {
186                 // The queue is empty when the producer and consumer are at the same index.
187                 return m_producerBufferIndex == m_consumerBufferIndex;
188             }
189         }
190
191         //-----------------------------------------------------------------------------------
192         // Checks whether the producer is done enqueueing new elements.
193         //
194
195         internal bool IsDone
196         {
197             get { return m_done; }
198         }
199
200
201         //-----------------------------------------------------------------------------------
202         // Used by a producer to flush out any internal buffers that have been accumulating
203         // data, but which hasn't yet been published to the consumer.
204         
205         internal void FlushBuffers()
206         {
207             TraceHelpers.TraceInfo("tid {0}: AsynchronousChannel<T>::FlushBuffers() called",
208                                    Thread.CurrentThread.ManagedThreadId);
209
210             // Ensure that a partially filled chunk is made available to the consumer.
211             FlushCachedChunk();
212         }
213
214         //-----------------------------------------------------------------------------------
215         // Used by a producer to signal that it is done producing new elements. This will
216         // also wake up any consumers that have gone to sleep.
217         //
218
219         internal void SetDone()
220         {
221             TraceHelpers.TraceInfo("tid {0}: AsynchronousChannel<T>::SetDone() called",
222                                    Thread.CurrentThread.ManagedThreadId);
223
224             // This is set with a volatile write to ensure that, after the consumer
225             // sees done, they can re-read the enqueued chunks and see the last one we
226             // enqueued just above.
227             m_done = true;
228
229             // We set the event to ensure consumers that may have waited or are
230             // considering waiting will notice that the producer is done. This is done
231             // after setting the done flag to facilitate a Dekker-style check/recheck.
232             //
233             // Because we can ---- with threads trying to Dispose of the event, we must 
234             // acquire a lock around our setting, and double-check that the event isn't null.
235             //
236             // Update 8/2/2011: Dispose() should never be called with SetDone() concurrently,
237             // but in order to reduce churn late in the product cycle, we decided not to 
238             // remove the lock.
239             lock (this)
240             {
241                 if (m_consumerEvent != null)
242                 {
243                     m_consumerEvent.Set(m_index);
244                 }
245             }
246         }
247         //-----------------------------------------------------------------------------------
248         // Enqueues a new element to the buffer, possibly blocking in the process.
249         //
250         // Arguments:
251         //     item                - the new element to enqueue
252         //     timeoutMilliseconds - a timeout (or -1 for no timeout) used in case the buffer
253         //                           is full; we return false if it expires
254         //
255         // Notes:
256         //     This API will block until the buffer is non-full. This internally buffers
257         //     elements up into chunks, so elements are not immediately available to consumers.
258         //
259
260         internal void Enqueue(T item)
261         {
262             // Store the element into our current chunk.
263             int producerChunkIndex = m_producerChunkIndex;
264             m_producerChunk[producerChunkIndex] = item;
265
266             // And lastly, if we have filled a chunk, make it visible to consumers.
267             if (producerChunkIndex == m_chunkSize - 1)
268             {
269                 EnqueueChunk(m_producerChunk);
270                 m_producerChunk = new T[m_chunkSize];
271             }
272
273             m_producerChunkIndex = (producerChunkIndex + 1) % m_chunkSize;
274         }
275
276         //-----------------------------------------------------------------------------------
277         // Internal helper to queue a real chunk, not just an element.
278         //
279         // Arguments:
280         //     chunk               - the chunk to make visible to consumers
281         //     timeoutMilliseconds - an optional timeout; we return false if it expires
282         //
283         // Notes:
284         //     This API will block if the buffer is full. A chunk must contain only valid
285         //     elements; if the chunk wasn't filled, it should be trimmed to size before
286         //     enqueueing it for consumers to observe.
287         //
288
289         private void EnqueueChunk(T[] chunk)
290         {
291             Contract.Assert(chunk != null);
292             Contract.Assert(!m_done, "can't continue producing after the production is over");
293
294             if (IsFull)
295                 WaitUntilNonFull();
296             Contract.Assert(!IsFull, "expected a non-full buffer");
297
298             // We can safely store into the current producer index because we know no consumers
299             // will be reading from it concurrently.
300             int bufferIndex = m_producerBufferIndex;
301             m_buffer[bufferIndex] = chunk;
302
303             // Increment the producer index, taking into count wrapping back to 0. This is a shared
304             // write; the CLR 2.0 memory model ensures the write won't move before the write to the
305             // corresponding element, so a consumer won't see the new index but the corresponding
306             // element in the array as empty.
307 #pragma warning disable 0420
308             Interlocked.Exchange(ref m_producerBufferIndex, (bufferIndex + 1) % m_buffer.Length);
309 #pragma warning restore 0420
310
311             // (If there is a consumer waiting, we have to ensure to signal the event. Unfortunately,
312             // this requires that we issue a memory barrier: We need to guarantee that the write to
313             // our producer index doesn't pass the read of the consumer waiting flags; the CLR memory
314             // model unfortunately permits this reordering. That is handled by using a CAS above.)
315
316             if (m_consumerIsWaiting == 1 && !IsChunkBufferEmpty)
317             {
318                 TraceHelpers.TraceInfo("AsynchronousChannel::EnqueueChunk - producer waking consumer");
319                 m_consumerIsWaiting = 0;
320                 m_consumerEvent.Set(m_index);
321             }
322         }
323
324         //-----------------------------------------------------------------------------------
325         // Just waits until the queue is non-full.
326         //
327
328         private void WaitUntilNonFull()
329         {
330             // We must loop; sometimes the producer event will have been set
331             // prematurely due to the way waiting flags are managed.  By looping,
332             // we will only return from this method when space is truly available.
333             do
334             {
335                 // If the queue is full, we have to wait for a consumer to make room.
336                 // Reset the event to unsignaled state before waiting.
337                 m_producerEvent.Reset();
338
339                 // We have to handle the case where a producer and consumer are racing to
340                 // wait simultaneously. For instance, a producer might see a full queue (by
341                 // reading IsFull just above), but meanwhile a consumer might drain the queue
342                 // very quickly, suddenly seeing an empty queue. This would lead to deadlock
343                 // if we aren't careful. Therefore we check the empty/full state AGAIN after
344                 // setting our flag to see if a real wait is warranted.
345 #pragma warning disable 0420
346                 Interlocked.Exchange(ref m_producerIsWaiting, 1);
347 #pragma warning restore 0420
348
349                 // (We have to prevent the reads that go into determining whether the buffer
350                 // is full from moving before the write to the producer-wait flag. Hence the CAS.)
351
352                 // Because we might be racing with a consumer that is transitioning the
353                 // buffer from full to non-full, we must check that the queue is full once
354                 // more. Otherwise, we might decide to wait and never be woken up (since
355                 // we just reset the event).
356                 if (IsFull)
357                 {
358                     // Assuming a consumer didn't make room for us, we can wait on the event.
359                     TraceHelpers.TraceInfo("AsynchronousChannel::EnqueueChunk - producer waiting, buffer full");
360                     m_producerEvent.Wait(m_cancellationToken);
361                 }
362                 else
363                 {
364                     // Reset the flags, we don't actually have to wait after all.
365                     m_producerIsWaiting = 0;
366                 }
367             }
368             while (IsFull);
369         }
370
371         //-----------------------------------------------------------------------------------
372         // Flushes any built up elements that haven't been made available to a consumer yet.
373         // Only safe to be called by a producer.
374         //
375         // Notes:
376         //     This API can block if the channel is currently full.
377         //
378
379         private void FlushCachedChunk()
380         {
381             // If the producer didn't fill their temporary working chunk, flushing forces an enqueue
382             // so that a consumer will see the partially filled chunk of elements.
383             if (m_producerChunk != null && m_producerChunkIndex != 0)
384             {
385                 // Trim the partially-full chunk to an array just big enough to hold it.
386                 Contract.Assert(1 <= m_producerChunkIndex && m_producerChunkIndex <= m_chunkSize);
387                 T[] leftOverChunk = new T[m_producerChunkIndex];
388                 Array.Copy(m_producerChunk, leftOverChunk, m_producerChunkIndex);
389
390                 // And enqueue the right-sized temporary chunk, possibly blocking if it's full.
391                 EnqueueChunk(leftOverChunk);
392                 m_producerChunk = null;
393             }
394         }
395
396         //-----------------------------------------------------------------------------------
397         // Dequeues the next element in the queue.
398         //
399         // Arguments:
400         //     item - a byref to the location into which we'll store the dequeued element
401         //
402         // Return Value:
403         //     True if an item was found, false otherwise.
404         //
405
406         internal bool TryDequeue(ref T item)
407         {
408             // Ensure we have a chunk to work with.
409             if (m_consumerChunk == null)
410             {
411                 if (!TryDequeueChunk(ref m_consumerChunk))
412                 {
413                     Contract.Assert(m_consumerChunk == null);
414                     return false;
415                 }
416
417                 m_consumerChunkIndex = 0;
418             }
419
420             // Retrieve the current item in the chunk.
421             Contract.Assert(m_consumerChunk != null, "consumer chunk is null");
422             Contract.Assert(0 <= m_consumerChunkIndex && m_consumerChunkIndex < m_consumerChunk.Length, "chunk index out of bounds");
423             item = m_consumerChunk[m_consumerChunkIndex];
424
425             // And lastly, if we have consumed the chunk, null it out so we'll get the
426             // next one when dequeue is called again.
427             ++m_consumerChunkIndex;
428             if (m_consumerChunkIndex == m_consumerChunk.Length)
429             {
430                 m_consumerChunk = null;
431             }
432
433             return true;
434         }
435
436         //-----------------------------------------------------------------------------------
437         // Internal helper method to dequeue a whole chunk.
438         //
439         // Arguments:
440         //     chunk - a byref to the location into which we'll store the chunk
441         //
442         // Return Value:
443         //     True if a chunk was found, false otherwise.
444         //
445
446         private bool TryDequeueChunk(ref T[] chunk)
447         {
448             // This is the non-blocking version of dequeue. We first check to see
449             // if the queue is empty. If the caller chooses to wait later, they can
450             // call the overload with an event.
451             if (IsChunkBufferEmpty)
452             {
453                 return false;
454             }
455
456             chunk = InternalDequeueChunk();
457             return true;
458         }
459
460         //-----------------------------------------------------------------------------------
461         // Blocking dequeue for the next element. This version of the API is used when the
462         // caller will possibly wait for a new chunk to be enqueued.
463         //
464         // Arguments:
465         //     item      - a byref for the returned element
466         //     waitEvent - a byref for the event used to signal blocked consumers
467         //
468         // Return Value:
469         //     True if an element was found, false otherwise.
470         //
471         // Notes:
472         //     If the return value is false, it doesn't always mean waitEvent will be non-
473         //     null. If the producer is done enqueueing, the return will be false and the
474         //     event will remain null. A caller must check for this condition.
475         //
476         //     If the return value is false and an event is returned, there have been
477         //     side-effects on the channel. Namely, the flag telling producers a consumer
478         //     might be waiting will have been set. DequeueEndAfterWait _must_ be called
479         //     eventually regardless of whether the caller actually waits or not.
480         //
481
482         internal bool TryDequeue(ref T item, ref bool isDone)
483         {
484             isDone = false;
485
486             // Ensure we have a buffer to work with.
487             if (m_consumerChunk == null)
488             {
489                 if (!TryDequeueChunk(ref m_consumerChunk, ref isDone))
490                 {
491                     Contract.Assert(m_consumerChunk == null);
492                     return false;
493                 }
494
495                 m_consumerChunkIndex = 0;
496             }
497
498             // Retrieve the current item in the chunk.
499             Contract.Assert(m_consumerChunk != null, "consumer chunk is null");
500             Contract.Assert(0 <= m_consumerChunkIndex && m_consumerChunkIndex < m_consumerChunk.Length, "chunk index out of bounds");
501             item = m_consumerChunk[m_consumerChunkIndex];
502
503             // And lastly, if we have consumed the chunk, null it out.
504             ++m_consumerChunkIndex;
505             if (m_consumerChunkIndex == m_consumerChunk.Length)
506             {
507                 m_consumerChunk = null;
508             }
509
510             return true;
511         }
512
513         //-----------------------------------------------------------------------------------
514         // Internal helper method to dequeue a whole chunk. This version of the API is used
515         // when the caller will wait for a new chunk to be enqueued.
516         //
517         // Arguments:
518         //     chunk     - a byref for the dequeued chunk
519         //     waitEvent - a byref for the event used to signal blocked consumers
520         //
521         // Return Value:
522         //     True if a chunk was found, false otherwise.
523         //
524         // Notes:
525         //     If the return value is false, it doesn't always mean waitEvent will be non-
526         //     null. If the producer is done enqueueing, the return will be false and the
527         //     event will remain null. A caller must check for this condition.
528         //
529         //     If the return value is false and an event is returned, there have been
530         //     side-effects on the channel. Namely, the flag telling producers a consumer
531         //     might be waiting will have been set. DequeueEndAfterWait _must_ be called
532         //     eventually regardless of whether the caller actually waits or not.
533         //
534
535         private bool TryDequeueChunk(ref T[] chunk, ref bool isDone)
536         {
537             isDone = false;
538
539             // We will register our interest in waiting, and then return an event
540             // that the caller can use to wait.
541             while (IsChunkBufferEmpty)
542             {
543                 // If the producer is done and we've drained the queue, we can bail right away.
544                 if (IsDone)
545                 {
546                     // We have to see if the buffer is empty AFTER we've seen that it's done.
547                     // Otherwise, we would possibly miss the elements enqueued before the
548                     // producer signaled that it's done. This is done with a volatile load so
549                     // that the read of empty doesn't move before the read of done.
550                     if (IsChunkBufferEmpty)
551                     {
552                         // Return isDone=true so callers know not to wait
553                         isDone = true;
554                         return false;
555                     }
556                 }
557
558                 // We have to handle the case where a producer and consumer are racing to
559                 // wait simultaneously. For instance, a consumer might see an empty queue (by
560                 // reading IsChunkBufferEmpty just above), but meanwhile a producer might fill the queue
561                 // very quickly, suddenly seeing a full queue. This would lead to deadlock
562                 // if we aren't careful. Therefore we check the empty/full state AGAIN after
563                 // setting our flag to see if a real wait is warranted.
564 #pragma warning disable 0420
565                 Interlocked.Exchange(ref m_consumerIsWaiting, 1);
566 #pragma warning restore 0420
567
568                 // (We have to prevent the reads that go into determining whether the buffer
569                 // is full from moving before the write to the producer-wait flag. Hence the CAS.)
570
571                 // Because we might be racing with a producer that is transitioning the
572                 // buffer from empty to non-full, we must check that the queue is empty once
573                 // more. Similarly, if the queue has been marked as done, we must not wait
574                 // because we just reset the event, possibly losing as signal. In both cases,
575                 // we would otherwise decide to wait and never be woken up (i.e. deadlock).
576                 if (IsChunkBufferEmpty && !IsDone)
577                 {
578                     // Note that the caller must eventually call DequeueEndAfterWait to set the
579                     // flags back to a state where no consumer is waiting, whether they choose
580                     // to wait or not.
581                     TraceHelpers.TraceInfo("AsynchronousChannel::DequeueChunk - consumer possibly waiting");
582                     return false;
583                 }
584                 else
585                 {
586                     // Reset the wait flags, we don't need to wait after all. We loop back around
587                     // and recheck that the queue isn't empty, done, etc.
588                     m_consumerIsWaiting = 0;
589                 }
590             }
591
592             Contract.Assert(!IsChunkBufferEmpty, "single-consumer should never witness an empty queue here");
593
594             chunk = InternalDequeueChunk();
595             return true;
596         }
597
598         //-----------------------------------------------------------------------------------
599         // Internal helper method that dequeues a chunk after we've verified that there is
600         // a chunk available to dequeue.
601         //
602         // Return Value:
603         //     The dequeued chunk.
604         //
605         // Assumptions:
606         //     The caller has verified that a chunk is available, i.e. the queue is non-empty.
607         //
608
609         private T[] InternalDequeueChunk()
610         {
611             Contract.Assert(!IsChunkBufferEmpty);
612
613             // We can safely read from the consumer index because we know no producers
614             // will write concurrently.
615             int consumerBufferIndex = m_consumerBufferIndex;
616             T[] chunk = m_buffer[consumerBufferIndex];
617
618             // Zero out contents to avoid holding on to memory for longer than necessary. This
619             // ensures the entire chunk is eligible for GC sooner. (More important for big chunks.)
620             m_buffer[consumerBufferIndex] = null;
621
622             // Increment the consumer index, taking into count wrapping back to 0. This is a shared
623             // write; the CLR 2.0 memory model ensures the write won't move before the write to the
624             // corresponding element, so a consumer won't see the new index but the corresponding
625             // element in the array as empty.
626 #pragma warning disable 0420
627             Interlocked.Exchange(ref m_consumerBufferIndex, (consumerBufferIndex + 1) % m_buffer.Length);
628 #pragma warning restore 0420
629
630             // (Unfortunately, this whole sequence requires a memory barrier: We need to guarantee
631             // that the write to m_consumerBufferIndex doesn't pass the read of the wait-flags; the CLR memory
632             // model sadly permits this reordering. Hence the CAS above.)
633
634             if (m_producerIsWaiting == 1 && !IsFull)
635             {
636                 TraceHelpers.TraceInfo("BoundedSingleLockFreeChannel::DequeueChunk - consumer waking producer");
637                 m_producerIsWaiting = 0;
638                 m_producerEvent.Set();
639             }
640
641             return chunk;
642         }
643
644         //-----------------------------------------------------------------------------------
645         // Clears the flag set when a blocking Dequeue is called, letting producers know
646         // the consumer is no longer waiting.
647         //
648
649         internal void DoneWithDequeueWait()
650         {
651             // On our way out, be sure to reset the flags.
652             m_consumerIsWaiting = 0;
653         }
654
655         //-----------------------------------------------------------------------------------
656         // Closes Win32 events possibly allocated during execution.
657         //
658
659         public void Dispose()
660         {
661             // We need to take a lock to deal with consumer threads racing to call Dispose
662             // and producer threads racing inside of SetDone.
663             //
664             // Update 8/2/2011: Dispose() should never be called with SetDone() concurrently,
665             // but in order to reduce churn late in the product cycle, we decided not to 
666             // remove the lock.
667             lock (this)
668             {
669                 Contract.Assert(m_done, "Expected channel to be done before disposing");
670                 Contract.Assert(m_producerEvent != null);
671                 Contract.Assert(m_consumerEvent != null);
672                 m_producerEvent.Dispose();
673                 m_producerEvent = null;
674                 m_consumerEvent = null;
675             }
676         }
677
678     }
679 }