3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // OrderPreservingPipeliningSpoolingTask.cs
10 // <OWNER>[....]</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
15 using System.Collections.Generic;
17 using System.Linq.Parallel;
19 using System.Threading;
20 using System.Threading.Tasks;
21 using System.Diagnostics.Contracts;
23 namespace System.Linq.Parallel
25 class OrderPreservingPipeliningSpoolingTask<TOutput, TKey> : SpoolingTaskBase
27 private readonly QueryTaskGroupState m_taskGroupState; // State shared among tasks.
29 private readonly TaskScheduler m_taskScheduler; // The task manager to execute the query.
31 private readonly QueryOperatorEnumerator<TOutput, TKey> m_partition; // The source partition.
32 private readonly bool[] m_consumerWaiting; // Whether a consumer is waiting on a particular producer
33 private readonly bool[] m_producerWaiting; // Whether a particular producer is waiting on the consumer
34 private readonly bool[] m_producerDone; // Whether each producer is done
35 private readonly int m_partitionIndex; // Index of the partition owned by this task.
37 private readonly Queue<Pair<TKey, TOutput>>[] m_buffers; // The buffer for the results
38 private readonly object m_bufferLock; // A lock for the buffer
41 /// Whether the producer is allowed to buffer up elements before handing a chunk to the consumer.
42 /// If false, the producer will make each result available to the consumer immediately after it is
45 private readonly bool m_autoBuffered;
48 /// The number of elements to accumulate on the producer before copying the elements to the
49 /// producer-consumer buffer. This constant is only used in the AutoBuffered mode.
51 /// Experimentally, 16 appears to be sufficient buffer size to compensate for the synchronization
54 private const int PRODUCER_BUFFER_AUTO_SIZE = 16;
59 internal OrderPreservingPipeliningSpoolingTask(
60 QueryOperatorEnumerator<TOutput, TKey> partition,
61 QueryTaskGroupState taskGroupState,
62 bool[] consumerWaiting,
63 bool[] producerWaiting,
66 Queue<Pair<TKey, TOutput>>[] buffers,
68 TaskScheduler taskScheduler,
70 :base(partitionIndex, taskGroupState)
72 Contract.Requires(partition != null);
73 Contract.Requires(taskGroupState != null);
74 Contract.Requires(consumerWaiting != null);
75 Contract.Requires(producerWaiting != null && producerWaiting.Length == consumerWaiting.Length);
76 Contract.Requires(producerDone != null && producerDone.Length == consumerWaiting.Length);
77 Contract.Requires(buffers != null && buffers.Length == consumerWaiting.Length);
78 Contract.Requires(partitionIndex >= 0 && partitionIndex < consumerWaiting.Length);
80 m_partition = partition;
81 m_taskGroupState = taskGroupState;
82 m_producerDone = producerDone;
83 m_consumerWaiting = consumerWaiting;
84 m_producerWaiting = producerWaiting;
85 m_partitionIndex = partitionIndex;
87 m_bufferLock = bufferLock;
89 m_taskScheduler = taskScheduler;
91 m_autoBuffered = autoBuffered;
95 /// This method is responsible for enumerating results and enqueueing them to
96 /// the output buffer as appropriate. Each base class implements its own.
98 protected override void SpoolingWork()
100 TOutput element = default(TOutput);
101 TKey key = default(TKey);
103 int chunkSize = m_autoBuffered ? PRODUCER_BUFFER_AUTO_SIZE : 1;
104 Pair<TKey,TOutput>[] chunk = new Pair<TKey,TOutput>[chunkSize];
105 var partition = m_partition;
106 CancellationToken cancelToken = m_taskGroupState.CancellationState.MergedCancellationToken;
112 while (lastChunkSize < chunkSize && partition.MoveNext(ref element, ref key))
114 chunk[lastChunkSize] = new Pair<TKey, TOutput>(key, element);
118 if (lastChunkSize == 0) break;
122 // Check if the query has been cancelled.
123 if (cancelToken.IsCancellationRequested)
128 for (int i = 0; i < lastChunkSize; i++)
130 m_buffers[m_partitionIndex].Enqueue(chunk[i]);
133 if (m_consumerWaiting[m_partitionIndex])
135 Monitor.Pulse(m_bufferLock);
136 m_consumerWaiting[m_partitionIndex] = false;
139 // If the producer buffer is too large, wait.
140 // Note: we already checked for cancellation after acquiring the lock on this producer.
141 // That guarantees that the consumer will eventually wake up the producer.
142 if (m_buffers[m_partitionIndex].Count >= OrderPreservingPipeliningMergeHelper<TOutput, TKey>.MAX_BUFFER_SIZE)
144 m_producerWaiting[m_partitionIndex] = true;
145 Monitor.Wait(m_bufferLock);
148 } while (lastChunkSize == chunkSize);
153 /// Creates and begins execution of a new set of spooling tasks.
155 public static void Spool(
156 QueryTaskGroupState groupState, PartitionedStream<TOutput, TKey> partitions,
157 bool[] consumerWaiting, bool[] producerWaiting, bool[] producerDone,
158 Queue<Pair<TKey,TOutput>>[] buffers, object[] bufferLocks,
159 TaskScheduler taskScheduler, bool autoBuffered)
161 Contract.Requires(groupState != null);
162 Contract.Requires(partitions != null);
163 Contract.Requires(producerDone != null && producerDone.Length == partitions.PartitionCount);
164 Contract.Requires(buffers != null && buffers.Length == partitions.PartitionCount);
165 Contract.Requires(bufferLocks != null);
167 int degreeOfParallelism = partitions.PartitionCount;
169 // Initialize the buffers and buffer locks.
170 for (int i = 0; i < degreeOfParallelism; i++)
172 buffers[i] = new Queue<Pair<TKey, TOutput>>(OrderPreservingPipeliningMergeHelper<TOutput, TKey>.INITIAL_BUFFER_SIZE);
173 bufferLocks[i] = new object();
176 // Ensure all tasks in this query are parented under a common root. Because this
177 // is a pipelined query, we detach it from the parent (to avoid blocking the calling
178 // thread), and run the query on a separate thread.
179 Task rootTask = new Task(
182 for (int i = 0; i < degreeOfParallelism; i++)
184 QueryTask asyncTask = new OrderPreservingPipeliningSpoolingTask<TOutput, TKey>(
185 partitions[i], groupState, consumerWaiting, producerWaiting,
186 producerDone, i, buffers, bufferLocks[i], taskScheduler, autoBuffered);
187 asyncTask.RunAsynchronously(taskScheduler);
191 // Begin the query on the calling thread.
192 groupState.QueryBegin(rootTask);
194 // And schedule it for execution. This is done after beginning to ensure no thread tries to
195 // end the query before its root task has been recorded properly.
196 rootTask.Start(taskScheduler);
198 // We don't call QueryEnd here; when we return, the query is still executing, and the
199 // last enumerator to be disposed of will call QueryEnd for us.
203 /// Dispose the underlying enumerator and wake up the consumer if necessary.
205 protected override void SpoolingFinally()
207 // Let the consumer know that this producer is done.
210 m_producerDone[m_partitionIndex] = true;
211 if (m_consumerWaiting[m_partitionIndex])
213 Monitor.Pulse(m_bufferLock);
214 m_consumerWaiting[m_partitionIndex] = false;
218 // Call the base implementation.
219 base.SpoolingFinally();
221 // Dispose of the source enumerator *after* signaling that the task is done.
222 // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
223 m_partition.Dispose();