3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Threading;
15 using System.Threading.Tasks;
16 using System.Diagnostics.Contracts;
18 namespace System.Linq.Parallel
22 /// A factory class to execute spooling logic.
24 internal static class SpoolingTask
26 //-----------------------------------------------------------------------------------
27 // Creates and begins execution of a new spooling task. Executes synchronously,
28 // and by the time this API has returned all of the results have been produced.
31 // groupState - values for inter-task communication
32 // partitions - the producer enumerators
33 // channels - the producer-consumer channels
34 // taskScheduler - the task manager on which to execute
37 internal static void SpoolStopAndGo<TInputOutput, TIgnoreKey>(
38 QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions,
39 SynchronousChannel<TInputOutput>[] channels, TaskScheduler taskScheduler)
41 Contract.Requires(partitions.PartitionCount == channels.Length);
42 Contract.Requires(groupState != null);
44 // Ensure all tasks in this query are parented under a common root.
45 Task rootTask = new Task(
48 int maxToRunInParallel = partitions.PartitionCount - 1;
50 // A stop-and-go merge uses the current thread for one task and then blocks before
51 // returning to the caller, until all results have been accumulated. We do this by
52 // running the last partition on the calling thread.
53 for (int i = 0; i < maxToRunInParallel; i++)
55 TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i);
57 QueryTask asyncTask = new StopAndGoSpoolingTask<TInputOutput, TIgnoreKey>(i, groupState, partitions[i], channels[i]);
58 asyncTask.RunAsynchronously(taskScheduler);
61 TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel);
63 // Run one task synchronously on the current thread.
64 QueryTask syncTask = new StopAndGoSpoolingTask<TInputOutput, TIgnoreKey>(
65 maxToRunInParallel, groupState, partitions[maxToRunInParallel], channels[maxToRunInParallel]);
66 syncTask.RunSynchronously(taskScheduler);
69 // Begin the query on the calling thread.
70 groupState.QueryBegin(rootTask);
72 // We don't want to return until the task is finished. Run it on the calling thread.
73 rootTask.RunSynchronously(taskScheduler);
75 // Wait for the query to complete, propagate exceptions, and so on.
76 // For pipelined queries, this step happens in the async enumerator.
77 groupState.QueryEnd(false);
80 //-----------------------------------------------------------------------------------
81 // Creates and begins execution of a new spooling task. Runs asynchronously.
84 // groupState - values for inter-task communication
85 // partitions - the producer enumerators
86 // channels - the producer-consumer channels
87 // taskScheduler - the task manager on which to execute
90 internal static void SpoolPipeline<TInputOutput, TIgnoreKey>(
91 QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions,
92 AsynchronousChannel<TInputOutput>[] channels, TaskScheduler taskScheduler)
94 Contract.Requires(partitions.PartitionCount == channels.Length);
95 Contract.Requires(groupState != null);
97 // Ensure all tasks in this query are parented under a common root. Because this
98 // is a pipelined query, we detach it from the parent (to avoid blocking the calling
99 // thread), and run the query on a separate thread.
100 Task rootTask = new Task(
103 // Create tasks that will enumerate the partitions in parallel. Because we're pipelining,
104 // we will begin running these tasks in parallel and then return.
105 for (int i = 0; i < partitions.PartitionCount; i++)
107 TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i);
109 QueryTask asyncTask = new PipelineSpoolingTask<TInputOutput, TIgnoreKey>(i, groupState, partitions[i], channels[i]);
110 asyncTask.RunAsynchronously(taskScheduler);
114 // Begin the query on the calling thread.
115 groupState.QueryBegin(rootTask);
117 // And schedule it for execution. This is done after beginning to ensure no thread tries to
118 // end the query before its root task has been recorded properly.
119 rootTask.Start(taskScheduler);
121 // We don't call QueryEnd here; when we return, the query is still executing, and the
122 // last enumerator to be disposed of will call QueryEnd for us.
125 //-----------------------------------------------------------------------------------
126 // Creates and begins execution of a new spooling task. This is a for-all style
127 // execution, meaning that the query will be run fully (for effect) before returning
128 // and that there are no channels into which data will be queued.
131 // groupState - values for inter-task communication
132 // partitions - the producer enumerators
133 // taskScheduler - the task manager on which to execute
136 internal static void SpoolForAll<TInputOutput, TIgnoreKey>(
137 QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions, TaskScheduler taskScheduler)
139 Contract.Requires(groupState != null);
141 // Ensure all tasks in this query are parented under a common root.
142 Task rootTask = new Task(
145 int maxToRunInParallel = partitions.PartitionCount - 1;
147 // Create tasks that will enumerate the partitions in parallel "for effect"; in other words,
148 // no data will be placed into any kind of producer-consumer channel.
149 for (int i = 0; i < maxToRunInParallel; i++)
151 TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i);
153 QueryTask asyncTask = new ForAllSpoolingTask<TInputOutput, TIgnoreKey>(i, groupState, partitions[i]);
154 asyncTask.RunAsynchronously(taskScheduler);
157 TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel);
159 // Run one task synchronously on the current thread.
160 QueryTask syncTask = new ForAllSpoolingTask<TInputOutput, TIgnoreKey>(maxToRunInParallel, groupState, partitions[maxToRunInParallel]);
161 syncTask.RunSynchronously(taskScheduler);
164 // Begin the query on the calling thread.
165 groupState.QueryBegin(rootTask);
167 // We don't want to return until the task is finished. Run it on the calling thread.
168 rootTask.RunSynchronously(taskScheduler);
170 // Wait for the query to complete, propagate exceptions, and so on.
171 // For pipelined queries, this step happens in the async enumerator.
172 groupState.QueryEnd(false);
177 /// A spooling task handles marshaling data from a producer to a consumer. It's given
178 /// a single enumerator object that contains all of the production algorithms, a single
179 /// destination channel from which consumers draw results, and (optionally) a
180 /// synchronization primitive using which to notify asynchronous consumers.
182 /// <typeparam name="TInputOutput"></typeparam>
183 /// <typeparam name="TIgnoreKey"></typeparam>
184 internal class StopAndGoSpoolingTask<TInputOutput, TIgnoreKey> : SpoolingTaskBase
186 // The data source from which to pull data.
187 private QueryOperatorEnumerator<TInputOutput, TIgnoreKey> m_source;
189 // The destination channel into which data is placed. This can be null if we are
190 // enumerating "for effect", e.g. forall loop.
191 private SynchronousChannel<TInputOutput> m_destination;
193 //-----------------------------------------------------------------------------------
194 // Creates, but does not execute, a new spooling task.
197 // taskIndex - the unique index of this task
198 // source - the producer enumerator
199 // destination - the destination channel into which to spool elements
202 // Source cannot be null, although the other arguments may be.
205 internal StopAndGoSpoolingTask(
206 int taskIndex, QueryTaskGroupState groupState,
207 QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source, SynchronousChannel<TInputOutput> destination)
208 : base(taskIndex, groupState)
210 Contract.Requires(source != null);
212 m_destination = destination;
215 //-----------------------------------------------------------------------------------
216 // This method is responsible for enumerating results and enqueueing them to
217 // the output channel(s) as appropriate. Each base class implements its own.
220 protected override void SpoolingWork()
222 // We just enumerate over the entire source data stream, placing each element
223 // into the destination channel.
224 TInputOutput current = default(TInputOutput);
225 TIgnoreKey keyUnused = default(TIgnoreKey);
227 QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source = m_source;
228 SynchronousChannel<TInputOutput> destination = m_destination;
229 CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;
232 while (source.MoveNext(ref current, ref keyUnused))
234 // If an abort has been requested, stop this worker immediately.
235 if (cancelToken.IsCancellationRequested)
240 destination.Enqueue(current);
244 //-----------------------------------------------------------------------------------
245 // Ensure we signal that the channel is complete.
248 protected override void SpoolingFinally()
250 // Call the base implementation.
251 base.SpoolingFinally();
253 // Signal that we are done, in the case of asynchronous consumption.
254 if (m_destination != null)
256 m_destination.SetDone();
259 // Dispose of the source enumerator *after* signaling that the task is done.
260 // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
266 /// A spooling task handles marshaling data from a producer to a consumer. It's given
267 /// a single enumerator object that contains all of the production algorithms, a single
268 /// destination channel from which consumers draw results, and (optionally) a
269 /// synchronization primitive using which to notify asynchronous consumers.
271 /// <typeparam name="TInputOutput"></typeparam>
272 /// <typeparam name="TIgnoreKey"></typeparam>
273 internal class PipelineSpoolingTask<TInputOutput, TIgnoreKey> : SpoolingTaskBase
275 // The data source from which to pull data.
276 private QueryOperatorEnumerator<TInputOutput, TIgnoreKey> m_source;
278 // The destination channel into which data is placed. This can be null if we are
279 // enumerating "for effect", e.g. forall loop.
280 private AsynchronousChannel<TInputOutput> m_destination;
282 //-----------------------------------------------------------------------------------
283 // Creates, but does not execute, a new spooling task.
286 // taskIndex - the unique index of this task
287 // source - the producer enumerator
288 // destination - the destination channel into which to spool elements
291 // Source cannot be null, although the other arguments may be.
294 internal PipelineSpoolingTask(
295 int taskIndex, QueryTaskGroupState groupState,
296 QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source, AsynchronousChannel<TInputOutput> destination)
297 : base(taskIndex, groupState)
299 Contract.Assert(source != null);
301 m_destination = destination;
304 //-----------------------------------------------------------------------------------
305 // This method is responsible for enumerating results and enqueueing them to
306 // the output channel(s) as appropriate. Each base class implements its own.
309 protected override void SpoolingWork()
311 // We just enumerate over the entire source data stream, placing each element
312 // into the destination channel.
313 TInputOutput current = default(TInputOutput);
314 TIgnoreKey keyUnused = default(TIgnoreKey);
316 QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source = m_source;
317 AsynchronousChannel<TInputOutput> destination = m_destination;
318 CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;
320 while (source.MoveNext(ref current, ref keyUnused))
322 // If an abort has been requested, stop this worker immediately.
323 if (cancelToken.IsCancellationRequested)
328 destination.Enqueue(current);
331 // Flush remaining data to the query consumer in preparation for channel shutdown.
332 destination.FlushBuffers();
335 //-----------------------------------------------------------------------------------
336 // Ensure we signal that the channel is complete.
339 protected override void SpoolingFinally()
341 // Call the base implementation.
342 base.SpoolingFinally();
344 // Signal that we are done, in the case of asynchronous consumption.
345 if (m_destination != null)
347 m_destination.SetDone();
350 // Dispose of the source enumerator *after* signaling that the task is done.
351 // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
357 /// A spooling task handles marshaling data from a producer to a consumer. It's given
358 /// a single enumerator object that contains all of the production algorithms, a single
359 /// destination channel from which consumers draw results, and (optionally) a
360 /// synchronization primitive using which to notify asynchronous consumers.
362 /// <typeparam name="TInputOutput"></typeparam>
363 /// <typeparam name="TIgnoreKey"></typeparam>
364 internal class ForAllSpoolingTask<TInputOutput, TIgnoreKey> : SpoolingTaskBase
366 // The data source from which to pull data.
367 private QueryOperatorEnumerator<TInputOutput, TIgnoreKey> m_source;
369 //-----------------------------------------------------------------------------------
370 // Creates, but does not execute, a new spooling task.
373 // taskIndex - the unique index of this task
374 // source - the producer enumerator
375 // destination - the destination channel into which to spool elements
378 // Source cannot be null, although the other arguments may be.
381 internal ForAllSpoolingTask(
382 int taskIndex, QueryTaskGroupState groupState,
383 QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source)
384 : base(taskIndex, groupState)
386 Contract.Assert(source != null);
390 //-----------------------------------------------------------------------------------
391 // This method is responsible for enumerating results and enqueueing them to
392 // the output channel(s) as appropriate. Each base class implements its own.
395 protected override void SpoolingWork()
397 // We just enumerate over the entire source data stream for effect.
398 TInputOutput currentUnused = default(TInputOutput);
399 TIgnoreKey keyUnused = default(TIgnoreKey);
401 //Note: this only ever runs with a ForAll operator, and ForAllEnumerator performs cancellation checks
402 while (m_source.MoveNext(ref currentUnused, ref keyUnused))
406 //-----------------------------------------------------------------------------------
407 // Ensure we signal that the channel is complete.
410 protected override void SpoolingFinally()
412 // Call the base implementation.
413 base.SpoolingFinally();
415 // Dispose of the source enumerator