3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // AsynchronousOneToOneChannel.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Threading;
15 using System.Diagnostics.Contracts;
17 namespace System.Linq.Parallel
20 /// This is a bounded channel meant for single-producer/single-consumer scenarios.
22 /// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
23 internal sealed class AsynchronousChannel<T> : IDisposable
25 // The producer will be blocked once the channel reaches a capacity, and unblocked
26 // as soon as a consumer makes room. A consumer can block waiting until a producer
27 // enqueues a new element. We use a chunking scheme to adjust the granularity and
28 // frequency of synchronization, e.g. by enqueueing/dequeueing N elements at a time.
29 // Because there is only ever a single producer and consumer, we are able to acheive
30 // efficient and low-overhead synchronization.
32 // In general, the buffer has four logical states:
33 // FULL <--> OPEN <--> EMPTY <--> DONE
35 // Here is a summary of the state transitions and what they mean:
37 // A buffer starts in the OPEN state. When the buffer is in the READY state,
38 // a consumer and producer can dequeue and enqueue new elements.
40 // A producer transitions the buffer from OPEN->FULL when it enqueues a chunk
41 // that causes the buffer to reach capacity; a producer can no longer enqueue
42 // new chunks when this happens, causing it to block.
44 // When the consumer takes a chunk from a FULL buffer, it transitions back from
45 // FULL->OPEN and the producer is woken up.
47 // When the consumer takes the last chunk from a buffer, the buffer is
48 // transitioned from OPEN->EMPTY; a consumer can no longer take new chunks,
49 // causing it to block.
51 // Lastly, when the producer enqueues an item into an EMPTY buffer, it
52 // transitions to the OPEN state. This causes any waiting consumers to wake up.
54 // If the buffer is empty, and the producer is done enqueueing new
55 // items, the buffer is DONE. There will be no more consumption or production.
58 // There is only ever one producer and one consumer operating on this channel
59 // concurrently. The internal synchronization cannot handle anything else.
61 // ** WARNING ** WARNING ** WARNING ** WARNING ** WARNING ** WARNING ** WARNING **
62 // VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV
64 // There... got your attention now... just in case you didn't read the comments
65 // very carefully above, this channel will deadlock, become corrupt, and generally
66 // make you an unhappy camper if you try to use more than 1 producer or more than
67 // 1 consumer thread to access this thing concurrently. It's been carefully designed
68 // to avoid locking, but only because of this restriction...
70 private T[][] m_buffer; // The buffer of chunks.
71 private readonly int m_index; // Index of this channel
72 private volatile int m_producerBufferIndex; // Producer's current index, i.e. where to put the next chunk.
73 private volatile int m_consumerBufferIndex; // Consumer's current index, i.e. where to get the next chunk.
75 private volatile bool m_done; // Set to true once the producer is done.
77 private T[] m_producerChunk; // The temporary chunk being generated by the producer.
78 private int m_producerChunkIndex; // A producer's index into its temporary chunk.
79 private T[] m_consumerChunk; // The temporary chunk being enumerated by the consumer.
80 private int m_consumerChunkIndex; // A consumer's index into its temporary chunk.
82 private int m_chunkSize; // The number of elements that comprise a chunk.
84 // These events are used to signal a waiting producer when the consumer dequeues, and to signal a
85 // waiting consumer when the producer enqueues.
86 private ManualResetEventSlim m_producerEvent;
87 private IntValueEvent m_consumerEvent;
89 // These two-valued ints track whether a producer or consumer _might_ be waiting. They are marked
90 // volatile because they are used in synchronization critical regions of code (see usage below).
91 private volatile int m_producerIsWaiting;
92 private volatile int m_consumerIsWaiting;
93 private CancellationToken m_cancellationToken;
95 //-----------------------------------------------------------------------------------
96 // Initializes a new channel with the specific capacity and chunk size.
99 // orderingHelper - the ordering helper to use for order preservation
100 // capacity - the maximum number of elements before a producer blocks
101 // chunkSize - the granularity of chunking on enqueue/dequeue. 0 means default size.
104 // The capacity represents the maximum number of chunks a channel can hold. That
105 // means producers will actually block after enqueueing capacity*chunkSize
106 // individual elements.
109 internal AsynchronousChannel(int index, int chunkSize, CancellationToken cancellationToken, IntValueEvent consumerEvent) :
110 this(index, Scheduling.DEFAULT_BOUNDED_BUFFER_CAPACITY, chunkSize, cancellationToken, consumerEvent)
114 internal AsynchronousChannel(int index, int capacity, int chunkSize, CancellationToken cancellationToken, IntValueEvent consumerEvent)
116 if (chunkSize == 0) chunkSize = Scheduling.GetDefaultChunkSize<T>();
118 Contract.Assert(chunkSize > 0, "chunk size must be greater than 0");
119 Contract.Assert(capacity > 1, "this impl doesn't support capacity of 1 or 0");
121 // Initialize a buffer with enough space to hold 'capacity' elements.
122 // We need one extra unused element as a sentinel to detect a full buffer,
123 // thus we add one to the capacity requested.
125 m_buffer = new T[capacity + 1][];
126 m_producerBufferIndex = 0;
127 m_consumerBufferIndex = 0;
129 m_producerEvent = new ManualResetEventSlim();
130 m_consumerEvent = consumerEvent;
131 m_chunkSize = chunkSize;
132 m_producerChunk = new T[chunkSize];
133 m_producerChunkIndex = 0;
134 m_cancellationToken = cancellationToken;
137 //-----------------------------------------------------------------------------------
138 // Checks whether the buffer is full. If the consumer is calling this, they can be
139 // assured that a true value won't change before the consumer has a chance to dequeue
140 // elements. That's because only one consumer can run at once. A producer might see
141 // a true value, however, and then a consumer might transition to non-full, so it's
142 // not stable for them. Lastly, it's of course possible to see a false value when
143 // there really is a full queue, it's all dependent on small race conditions.
150 // Read the fields once. One of these is always stable, since the only threads
151 // that call this are the 1 producer/1 consumer threads.
152 int producerIndex = m_producerBufferIndex;
153 int consumerIndex = m_consumerBufferIndex;
157 // 1) Is the producer index one less than the consumer?
158 // 2) The producer is at the end of the buffer and the consumer at the beginning.
160 return (producerIndex == consumerIndex - 1) ||
161 (consumerIndex == 0 && producerIndex == m_buffer.Length - 1);
163 // Note to readers: you might have expected us to consider the case where
164 // m_producerBufferIndex == m_buffer.Length && m_consumerBufferIndex == 1.
165 // That is, a producer has gone off the end of the array, but is about to
166 // wrap around to the 0th element again. We don't need this for a subtle
167 // reason. It is SAFE for a consumer to think we are non-full when we
168 // actually are full; it is NOT for a producer; but thankfully, there is
169 // only one producer, and hence the producer will never see this seemingly
170 // invalid state. Hence, we're fine producing a false negative. It's all
171 // based on a race condition we have to deal with anyway.
175 //-----------------------------------------------------------------------------------
176 // Checks whether the buffer is empty. If the producer is calling this, they can be
177 // assured that a true value won't change before the producer has a chance to enqueue
178 // an item. That's because only one producer can run at once. A consumer might see
179 // a true value, however, and then a producer might transition to non-empty.
182 internal bool IsChunkBufferEmpty
186 // The queue is empty when the producer and consumer are at the same index.
187 return m_producerBufferIndex == m_consumerBufferIndex;
191 //-----------------------------------------------------------------------------------
192 // Checks whether the producer is done enqueueing new elements.
197 get { return m_done; }
201 //-----------------------------------------------------------------------------------
202 // Used by a producer to flush out any internal buffers that have been accumulating
203 // data, but which hasn't yet been published to the consumer.
205 internal void FlushBuffers()
207 TraceHelpers.TraceInfo("tid {0}: AsynchronousChannel<T>::FlushBuffers() called",
208 Thread.CurrentThread.ManagedThreadId);
210 // Ensure that a partially filled chunk is made available to the consumer.
214 //-----------------------------------------------------------------------------------
215 // Used by a producer to signal that it is done producing new elements. This will
216 // also wake up any consumers that have gone to sleep.
219 internal void SetDone()
221 TraceHelpers.TraceInfo("tid {0}: AsynchronousChannel<T>::SetDone() called",
222 Thread.CurrentThread.ManagedThreadId);
224 // This is set with a volatile write to ensure that, after the consumer
225 // sees done, they can re-read the enqueued chunks and see the last one we
226 // enqueued just above.
229 // We set the event to ensure consumers that may have waited or are
230 // considering waiting will notice that the producer is done. This is done
231 // after setting the done flag to facilitate a Dekker-style check/recheck.
233 // Because we can ---- with threads trying to Dispose of the event, we must
234 // acquire a lock around our setting, and double-check that the event isn't null.
236 // Update 8/2/2011: Dispose() should never be called with SetDone() concurrently,
237 // but in order to reduce churn late in the product cycle, we decided not to
241 if (m_consumerEvent != null)
243 m_consumerEvent.Set(m_index);
247 //-----------------------------------------------------------------------------------
248 // Enqueues a new element to the buffer, possibly blocking in the process.
251 // item - the new element to enqueue
252 // timeoutMilliseconds - a timeout (or -1 for no timeout) used in case the buffer
253 // is full; we return false if it expires
256 // This API will block until the buffer is non-full. This internally buffers
257 // elements up into chunks, so elements are not immediately available to consumers.
260 internal void Enqueue(T item)
262 // Store the element into our current chunk.
263 int producerChunkIndex = m_producerChunkIndex;
264 m_producerChunk[producerChunkIndex] = item;
266 // And lastly, if we have filled a chunk, make it visible to consumers.
267 if (producerChunkIndex == m_chunkSize - 1)
269 EnqueueChunk(m_producerChunk);
270 m_producerChunk = new T[m_chunkSize];
273 m_producerChunkIndex = (producerChunkIndex + 1) % m_chunkSize;
276 //-----------------------------------------------------------------------------------
277 // Internal helper to queue a real chunk, not just an element.
280 // chunk - the chunk to make visible to consumers
281 // timeoutMilliseconds - an optional timeout; we return false if it expires
284 // This API will block if the buffer is full. A chunk must contain only valid
285 // elements; if the chunk wasn't filled, it should be trimmed to size before
286 // enqueueing it for consumers to observe.
289 private void EnqueueChunk(T[] chunk)
291 Contract.Assert(chunk != null);
292 Contract.Assert(!m_done, "can't continue producing after the production is over");
296 Contract.Assert(!IsFull, "expected a non-full buffer");
298 // We can safely store into the current producer index because we know no consumers
299 // will be reading from it concurrently.
300 int bufferIndex = m_producerBufferIndex;
301 m_buffer[bufferIndex] = chunk;
303 // Increment the producer index, taking into count wrapping back to 0. This is a shared
304 // write; the CLR 2.0 memory model ensures the write won't move before the write to the
305 // corresponding element, so a consumer won't see the new index but the corresponding
306 // element in the array as empty.
307 #pragma warning disable 0420
308 Interlocked.Exchange(ref m_producerBufferIndex, (bufferIndex + 1) % m_buffer.Length);
309 #pragma warning restore 0420
311 // (If there is a consumer waiting, we have to ensure to signal the event. Unfortunately,
312 // this requires that we issue a memory barrier: We need to guarantee that the write to
313 // our producer index doesn't pass the read of the consumer waiting flags; the CLR memory
314 // model unfortunately permits this reordering. That is handled by using a CAS above.)
316 if (m_consumerIsWaiting == 1 && !IsChunkBufferEmpty)
318 TraceHelpers.TraceInfo("AsynchronousChannel::EnqueueChunk - producer waking consumer");
319 m_consumerIsWaiting = 0;
320 m_consumerEvent.Set(m_index);
324 //-----------------------------------------------------------------------------------
325 // Just waits until the queue is non-full.
328 private void WaitUntilNonFull()
330 // We must loop; sometimes the producer event will have been set
331 // prematurely due to the way waiting flags are managed. By looping,
332 // we will only return from this method when space is truly available.
335 // If the queue is full, we have to wait for a consumer to make room.
336 // Reset the event to unsignaled state before waiting.
337 m_producerEvent.Reset();
339 // We have to handle the case where a producer and consumer are racing to
340 // wait simultaneously. For instance, a producer might see a full queue (by
341 // reading IsFull just above), but meanwhile a consumer might drain the queue
342 // very quickly, suddenly seeing an empty queue. This would lead to deadlock
343 // if we aren't careful. Therefore we check the empty/full state AGAIN after
344 // setting our flag to see if a real wait is warranted.
345 #pragma warning disable 0420
346 Interlocked.Exchange(ref m_producerIsWaiting, 1);
347 #pragma warning restore 0420
349 // (We have to prevent the reads that go into determining whether the buffer
350 // is full from moving before the write to the producer-wait flag. Hence the CAS.)
352 // Because we might be racing with a consumer that is transitioning the
353 // buffer from full to non-full, we must check that the queue is full once
354 // more. Otherwise, we might decide to wait and never be woken up (since
355 // we just reset the event).
358 // Assuming a consumer didn't make room for us, we can wait on the event.
359 TraceHelpers.TraceInfo("AsynchronousChannel::EnqueueChunk - producer waiting, buffer full");
360 m_producerEvent.Wait(m_cancellationToken);
364 // Reset the flags, we don't actually have to wait after all.
365 m_producerIsWaiting = 0;
371 //-----------------------------------------------------------------------------------
372 // Flushes any built up elements that haven't been made available to a consumer yet.
373 // Only safe to be called by a producer.
376 // This API can block if the channel is currently full.
379 private void FlushCachedChunk()
381 // If the producer didn't fill their temporary working chunk, flushing forces an enqueue
382 // so that a consumer will see the partially filled chunk of elements.
383 if (m_producerChunk != null && m_producerChunkIndex != 0)
385 // Trim the partially-full chunk to an array just big enough to hold it.
386 Contract.Assert(1 <= m_producerChunkIndex && m_producerChunkIndex <= m_chunkSize);
387 T[] leftOverChunk = new T[m_producerChunkIndex];
388 Array.Copy(m_producerChunk, leftOverChunk, m_producerChunkIndex);
390 // And enqueue the right-sized temporary chunk, possibly blocking if it's full.
391 EnqueueChunk(leftOverChunk);
392 m_producerChunk = null;
396 //-----------------------------------------------------------------------------------
397 // Dequeues the next element in the queue.
400 // item - a byref to the location into which we'll store the dequeued element
403 // True if an item was found, false otherwise.
406 internal bool TryDequeue(ref T item)
408 // Ensure we have a chunk to work with.
409 if (m_consumerChunk == null)
411 if (!TryDequeueChunk(ref m_consumerChunk))
413 Contract.Assert(m_consumerChunk == null);
417 m_consumerChunkIndex = 0;
420 // Retrieve the current item in the chunk.
421 Contract.Assert(m_consumerChunk != null, "consumer chunk is null");
422 Contract.Assert(0 <= m_consumerChunkIndex && m_consumerChunkIndex < m_consumerChunk.Length, "chunk index out of bounds");
423 item = m_consumerChunk[m_consumerChunkIndex];
425 // And lastly, if we have consumed the chunk, null it out so we'll get the
426 // next one when dequeue is called again.
427 ++m_consumerChunkIndex;
428 if (m_consumerChunkIndex == m_consumerChunk.Length)
430 m_consumerChunk = null;
436 //-----------------------------------------------------------------------------------
437 // Internal helper method to dequeue a whole chunk.
440 // chunk - a byref to the location into which we'll store the chunk
443 // True if a chunk was found, false otherwise.
446 private bool TryDequeueChunk(ref T[] chunk)
448 // This is the non-blocking version of dequeue. We first check to see
449 // if the queue is empty. If the caller chooses to wait later, they can
450 // call the overload with an event.
451 if (IsChunkBufferEmpty)
456 chunk = InternalDequeueChunk();
460 //-----------------------------------------------------------------------------------
461 // Blocking dequeue for the next element. This version of the API is used when the
462 // caller will possibly wait for a new chunk to be enqueued.
465 // item - a byref for the returned element
466 // waitEvent - a byref for the event used to signal blocked consumers
469 // True if an element was found, false otherwise.
472 // If the return value is false, it doesn't always mean waitEvent will be non-
473 // null. If the producer is done enqueueing, the return will be false and the
474 // event will remain null. A caller must check for this condition.
476 // If the return value is false and an event is returned, there have been
477 // side-effects on the channel. Namely, the flag telling producers a consumer
478 // might be waiting will have been set. DequeueEndAfterWait _must_ be called
479 // eventually regardless of whether the caller actually waits or not.
482 internal bool TryDequeue(ref T item, ref bool isDone)
486 // Ensure we have a buffer to work with.
487 if (m_consumerChunk == null)
489 if (!TryDequeueChunk(ref m_consumerChunk, ref isDone))
491 Contract.Assert(m_consumerChunk == null);
495 m_consumerChunkIndex = 0;
498 // Retrieve the current item in the chunk.
499 Contract.Assert(m_consumerChunk != null, "consumer chunk is null");
500 Contract.Assert(0 <= m_consumerChunkIndex && m_consumerChunkIndex < m_consumerChunk.Length, "chunk index out of bounds");
501 item = m_consumerChunk[m_consumerChunkIndex];
503 // And lastly, if we have consumed the chunk, null it out.
504 ++m_consumerChunkIndex;
505 if (m_consumerChunkIndex == m_consumerChunk.Length)
507 m_consumerChunk = null;
513 //-----------------------------------------------------------------------------------
514 // Internal helper method to dequeue a whole chunk. This version of the API is used
515 // when the caller will wait for a new chunk to be enqueued.
518 // chunk - a byref for the dequeued chunk
519 // waitEvent - a byref for the event used to signal blocked consumers
522 // True if a chunk was found, false otherwise.
525 // If the return value is false, it doesn't always mean waitEvent will be non-
526 // null. If the producer is done enqueueing, the return will be false and the
527 // event will remain null. A caller must check for this condition.
529 // If the return value is false and an event is returned, there have been
530 // side-effects on the channel. Namely, the flag telling producers a consumer
531 // might be waiting will have been set. DequeueEndAfterWait _must_ be called
532 // eventually regardless of whether the caller actually waits or not.
535 private bool TryDequeueChunk(ref T[] chunk, ref bool isDone)
539 // We will register our interest in waiting, and then return an event
540 // that the caller can use to wait.
541 while (IsChunkBufferEmpty)
543 // If the producer is done and we've drained the queue, we can bail right away.
546 // We have to see if the buffer is empty AFTER we've seen that it's done.
547 // Otherwise, we would possibly miss the elements enqueued before the
548 // producer signaled that it's done. This is done with a volatile load so
549 // that the read of empty doesn't move before the read of done.
550 if (IsChunkBufferEmpty)
552 // Return isDone=true so callers know not to wait
558 // We have to handle the case where a producer and consumer are racing to
559 // wait simultaneously. For instance, a consumer might see an empty queue (by
560 // reading IsChunkBufferEmpty just above), but meanwhile a producer might fill the queue
561 // very quickly, suddenly seeing a full queue. This would lead to deadlock
562 // if we aren't careful. Therefore we check the empty/full state AGAIN after
563 // setting our flag to see if a real wait is warranted.
564 #pragma warning disable 0420
565 Interlocked.Exchange(ref m_consumerIsWaiting, 1);
566 #pragma warning restore 0420
568 // (We have to prevent the reads that go into determining whether the buffer
569 // is full from moving before the write to the producer-wait flag. Hence the CAS.)
571 // Because we might be racing with a producer that is transitioning the
572 // buffer from empty to non-full, we must check that the queue is empty once
573 // more. Similarly, if the queue has been marked as done, we must not wait
574 // because we just reset the event, possibly losing as signal. In both cases,
575 // we would otherwise decide to wait and never be woken up (i.e. deadlock).
576 if (IsChunkBufferEmpty && !IsDone)
578 // Note that the caller must eventually call DequeueEndAfterWait to set the
579 // flags back to a state where no consumer is waiting, whether they choose
581 TraceHelpers.TraceInfo("AsynchronousChannel::DequeueChunk - consumer possibly waiting");
586 // Reset the wait flags, we don't need to wait after all. We loop back around
587 // and recheck that the queue isn't empty, done, etc.
588 m_consumerIsWaiting = 0;
592 Contract.Assert(!IsChunkBufferEmpty, "single-consumer should never witness an empty queue here");
594 chunk = InternalDequeueChunk();
598 //-----------------------------------------------------------------------------------
599 // Internal helper method that dequeues a chunk after we've verified that there is
600 // a chunk available to dequeue.
603 // The dequeued chunk.
606 // The caller has verified that a chunk is available, i.e. the queue is non-empty.
609 private T[] InternalDequeueChunk()
611 Contract.Assert(!IsChunkBufferEmpty);
613 // We can safely read from the consumer index because we know no producers
614 // will write concurrently.
615 int consumerBufferIndex = m_consumerBufferIndex;
616 T[] chunk = m_buffer[consumerBufferIndex];
618 // Zero out contents to avoid holding on to memory for longer than necessary. This
619 // ensures the entire chunk is eligible for GC sooner. (More important for big chunks.)
620 m_buffer[consumerBufferIndex] = null;
622 // Increment the consumer index, taking into count wrapping back to 0. This is a shared
623 // write; the CLR 2.0 memory model ensures the write won't move before the write to the
624 // corresponding element, so a consumer won't see the new index but the corresponding
625 // element in the array as empty.
626 #pragma warning disable 0420
627 Interlocked.Exchange(ref m_consumerBufferIndex, (consumerBufferIndex + 1) % m_buffer.Length);
628 #pragma warning restore 0420
630 // (Unfortunately, this whole sequence requires a memory barrier: We need to guarantee
631 // that the write to m_consumerBufferIndex doesn't pass the read of the wait-flags; the CLR memory
632 // model sadly permits this reordering. Hence the CAS above.)
634 if (m_producerIsWaiting == 1 && !IsFull)
636 TraceHelpers.TraceInfo("BoundedSingleLockFreeChannel::DequeueChunk - consumer waking producer");
637 m_producerIsWaiting = 0;
638 m_producerEvent.Set();
644 //-----------------------------------------------------------------------------------
645 // Clears the flag set when a blocking Dequeue is called, letting producers know
646 // the consumer is no longer waiting.
649 internal void DoneWithDequeueWait()
651 // On our way out, be sure to reset the flags.
652 m_consumerIsWaiting = 0;
655 //-----------------------------------------------------------------------------------
656 // Closes Win32 events possibly allocated during execution.
659 public void Dispose()
661 // We need to take a lock to deal with consumer threads racing to call Dispose
662 // and producer threads racing inside of SetDone.
664 // Update 8/2/2011: Dispose() should never be called with SetDone() concurrently,
665 // but in order to reduce churn late in the product cycle, we decided not to
669 Contract.Assert(m_done, "Expected channel to be done before disposing");
670 Contract.Assert(m_producerEvent != null);
671 Contract.Assert(m_consumerEvent != null);
672 m_producerEvent.Dispose();
673 m_producerEvent = null;
674 m_consumerEvent = null;