a80160150889c9dd8008ebcd1a2b283193743486
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / Scheduling / SpoolingTask.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // SpoolingTask.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Threading;
15 using System.Threading.Tasks;
16 using System.Diagnostics.Contracts;
17
18 namespace System.Linq.Parallel
19 {
20
21     /// <summary>
22     /// A factory class to execute spooling logic.
23     /// </summary>
24     internal static class SpoolingTask
25     {
26         //-----------------------------------------------------------------------------------
27         // Creates and begins execution of a new spooling task. Executes synchronously,
28         // and by the time this API has returned all of the results have been produced.
29         //
30         // Arguments:
31         //     groupState      - values for inter-task communication
32         //     partitions      - the producer enumerators
33         //     channels        - the producer-consumer channels
34         //     taskScheduler   - the task manager on which to execute
35         //
36
37         internal static void SpoolStopAndGo<TInputOutput, TIgnoreKey>(
38             QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions,
39             SynchronousChannel<TInputOutput>[] channels, TaskScheduler taskScheduler)
40         {
41             Contract.Requires(partitions.PartitionCount == channels.Length);
42             Contract.Requires(groupState != null);
43
44             // Ensure all tasks in this query are parented under a common root.
45             Task rootTask = new Task(
46                 () =>
47                 {
48                     int maxToRunInParallel = partitions.PartitionCount - 1;
49
50                     // A stop-and-go merge uses the current thread for one task and then blocks before
51                     // returning to the caller, until all results have been accumulated. We do this by
52                     // running the last partition on the calling thread.
53                     for (int i = 0; i < maxToRunInParallel; i++)
54                     {
55                         TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i);
56
57                         QueryTask asyncTask = new StopAndGoSpoolingTask<TInputOutput, TIgnoreKey>(i, groupState, partitions[i], channels[i]);
58                         asyncTask.RunAsynchronously(taskScheduler);
59                     }
60
61                     TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel);
62
63                     // Run one task synchronously on the current thread.
64                     QueryTask syncTask = new StopAndGoSpoolingTask<TInputOutput, TIgnoreKey>(
65                         maxToRunInParallel, groupState, partitions[maxToRunInParallel], channels[maxToRunInParallel]);
66                     syncTask.RunSynchronously(taskScheduler);
67                 });
68
69             // Begin the query on the calling thread.
70             groupState.QueryBegin(rootTask);
71
72             // We don't want to return until the task is finished.  Run it on the calling thread.
73             rootTask.RunSynchronously(taskScheduler);
74
75             // Wait for the query to complete, propagate exceptions, and so on.
76             // For pipelined queries, this step happens in the async enumerator.
77             groupState.QueryEnd(false);
78         }
79
80         //-----------------------------------------------------------------------------------
81         // Creates and begins execution of a new spooling task. Runs asynchronously.
82         //
83         // Arguments:
84         //     groupState      - values for inter-task communication
85         //     partitions      - the producer enumerators
86         //     channels        - the producer-consumer channels
87         //     taskScheduler   - the task manager on which to execute
88         //
89
90         internal static void SpoolPipeline<TInputOutput, TIgnoreKey>(
91             QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions,
92             AsynchronousChannel<TInputOutput>[] channels, TaskScheduler taskScheduler)
93         {
94             Contract.Requires(partitions.PartitionCount == channels.Length);
95             Contract.Requires(groupState != null);
96
97             // Ensure all tasks in this query are parented under a common root. Because this
98             // is a pipelined query, we detach it from the parent (to avoid blocking the calling
99             // thread), and run the query on a separate thread.
100             Task rootTask = new Task(
101                 () =>
102                 {
103                     // Create tasks that will enumerate the partitions in parallel. Because we're pipelining,
104                     // we will begin running these tasks in parallel and then return.
105                     for (int i = 0; i < partitions.PartitionCount; i++)
106                     {
107                         TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i);
108
109                         QueryTask asyncTask = new PipelineSpoolingTask<TInputOutput, TIgnoreKey>(i, groupState, partitions[i], channels[i]);
110                         asyncTask.RunAsynchronously(taskScheduler);
111                     }
112                 });
113
114             // Begin the query on the calling thread.
115             groupState.QueryBegin(rootTask);
116
117             // And schedule it for execution.  This is done after beginning to ensure no thread tries to
118             // end the query before its root task has been recorded properly.
119             rootTask.Start(taskScheduler);
120
121             // We don't call QueryEnd here; when we return, the query is still executing, and the
122             // last enumerator to be disposed of will call QueryEnd for us.
123         }
124
125         //-----------------------------------------------------------------------------------
126         // Creates and begins execution of a new spooling task. This is a for-all style
127         // execution, meaning that the query will be run fully (for effect) before returning
128         // and that there are no channels into which data will be queued.
129         //
130         // Arguments:
131         //     groupState      - values for inter-task communication
132         //     partitions      - the producer enumerators
133         //     taskScheduler   - the task manager on which to execute
134         //
135
136         internal static void SpoolForAll<TInputOutput, TIgnoreKey>(
137             QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions, TaskScheduler taskScheduler)
138         {
139             Contract.Requires(groupState != null);
140
141             // Ensure all tasks in this query are parented under a common root.
142             Task rootTask = new Task(
143                 () =>
144                 {
145                     int maxToRunInParallel = partitions.PartitionCount - 1;
146
147                     // Create tasks that will enumerate the partitions in parallel "for effect"; in other words,
148                     // no data will be placed into any kind of producer-consumer channel.
149                     for (int i = 0; i < maxToRunInParallel; i++)
150                     {
151                         TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i);
152
153                         QueryTask asyncTask = new ForAllSpoolingTask<TInputOutput, TIgnoreKey>(i, groupState, partitions[i]);
154                         asyncTask.RunAsynchronously(taskScheduler);
155                     }
156
157                     TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel);
158
159                     // Run one task synchronously on the current thread.
160                     QueryTask syncTask = new ForAllSpoolingTask<TInputOutput, TIgnoreKey>(maxToRunInParallel, groupState, partitions[maxToRunInParallel]);
161                     syncTask.RunSynchronously(taskScheduler);
162                 });
163
164             // Begin the query on the calling thread.
165             groupState.QueryBegin(rootTask);
166
167             // We don't want to return until the task is finished.  Run it on the calling thread.
168             rootTask.RunSynchronously(taskScheduler);
169
170             // Wait for the query to complete, propagate exceptions, and so on.
171             // For pipelined queries, this step happens in the async enumerator.
172             groupState.QueryEnd(false);
173         }
174     }
175
176     /// <summary>
177     /// A spooling task handles marshaling data from a producer to a consumer. It's given
178     /// a single enumerator object that contains all of the production algorithms, a single
179     /// destination channel from which consumers draw results, and (optionally) a
180     /// synchronization primitive using which to notify asynchronous consumers.
181     /// </summary>
182     /// <typeparam name="TInputOutput"></typeparam>
183     /// <typeparam name="TIgnoreKey"></typeparam>
184     internal class StopAndGoSpoolingTask<TInputOutput, TIgnoreKey> : SpoolingTaskBase
185     {
186         // The data source from which to pull data.
187         private QueryOperatorEnumerator<TInputOutput, TIgnoreKey> m_source;
188
189         // The destination channel into which data is placed. This can be null if we are
190         // enumerating "for effect", e.g. forall loop.
191         private SynchronousChannel<TInputOutput> m_destination;
192
193         //-----------------------------------------------------------------------------------
194         // Creates, but does not execute, a new spooling task.
195         //
196         // Arguments:
197         //     taskIndex   - the unique index of this task
198         //     source      - the producer enumerator
199         //     destination - the destination channel into which to spool elements
200         //
201         // Assumptions:
202         //     Source cannot be null, although the other arguments may be.
203         //
204
205         internal StopAndGoSpoolingTask(
206             int taskIndex, QueryTaskGroupState groupState,
207             QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source, SynchronousChannel<TInputOutput> destination)
208             : base(taskIndex, groupState)
209         {
210             Contract.Requires(source != null);
211             m_source = source;
212             m_destination = destination;
213         }
214
215         //-----------------------------------------------------------------------------------
216         // This method is responsible for enumerating results and enqueueing them to
217         // the output channel(s) as appropriate.  Each base class implements its own.
218         //
219
220         protected override void SpoolingWork()
221         {
222             // We just enumerate over the entire source data stream, placing each element
223             // into the destination channel.
224             TInputOutput current = default(TInputOutput);
225             TIgnoreKey keyUnused = default(TIgnoreKey);
226
227             QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source = m_source;
228             SynchronousChannel<TInputOutput> destination = m_destination;
229             CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;
230
231             destination.Init();
232             while (source.MoveNext(ref current, ref keyUnused))
233             {
234                 // If an abort has been requested, stop this worker immediately.
235                 if (cancelToken.IsCancellationRequested)
236                 {
237                     break;
238                 }
239
240                 destination.Enqueue(current);
241             }
242         }
243
244         //-----------------------------------------------------------------------------------
245         // Ensure we signal that the channel is complete.
246         //
247
248         protected override void SpoolingFinally()
249         {
250             // Call the base implementation.
251             base.SpoolingFinally();
252
253             // Signal that we are done, in the case of asynchronous consumption.
254             if (m_destination != null)
255             {
256                 m_destination.SetDone();
257             }
258
259             // Dispose of the source enumerator *after* signaling that the task is done.
260             // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
261             m_source.Dispose();
262         }
263     }
264
265     /// <summary>
266     /// A spooling task handles marshaling data from a producer to a consumer. It's given
267     /// a single enumerator object that contains all of the production algorithms, a single
268     /// destination channel from which consumers draw results, and (optionally) a
269     /// synchronization primitive using which to notify asynchronous consumers.
270     /// </summary>
271     /// <typeparam name="TInputOutput"></typeparam>
272     /// <typeparam name="TIgnoreKey"></typeparam>
273     internal class PipelineSpoolingTask<TInputOutput, TIgnoreKey> : SpoolingTaskBase
274     {
275         // The data source from which to pull data.
276         private QueryOperatorEnumerator<TInputOutput, TIgnoreKey> m_source;
277
278         // The destination channel into which data is placed. This can be null if we are
279         // enumerating "for effect", e.g. forall loop.
280         private AsynchronousChannel<TInputOutput> m_destination;
281
282         //-----------------------------------------------------------------------------------
283         // Creates, but does not execute, a new spooling task.
284         //
285         // Arguments:
286         //     taskIndex   - the unique index of this task
287         //     source      - the producer enumerator
288         //     destination - the destination channel into which to spool elements
289         //
290         // Assumptions:
291         //     Source cannot be null, although the other arguments may be.
292         //
293
294         internal PipelineSpoolingTask(
295             int taskIndex, QueryTaskGroupState groupState,
296             QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source, AsynchronousChannel<TInputOutput> destination)
297             : base(taskIndex, groupState)
298         {
299             Contract.Assert(source != null);
300             m_source = source;
301             m_destination = destination;
302         }
303
304         //-----------------------------------------------------------------------------------
305         // This method is responsible for enumerating results and enqueueing them to
306         // the output channel(s) as appropriate.  Each base class implements its own.
307         //
308
309         protected override void SpoolingWork()
310         {
311             // We just enumerate over the entire source data stream, placing each element
312             // into the destination channel.
313             TInputOutput current = default(TInputOutput);
314             TIgnoreKey keyUnused = default(TIgnoreKey);
315
316             QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source = m_source;
317             AsynchronousChannel<TInputOutput> destination = m_destination;
318             CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken;
319
320             while (source.MoveNext(ref current, ref keyUnused))
321             {
322                 // If an abort has been requested, stop this worker immediately.
323                 if (cancelToken.IsCancellationRequested)
324                 {
325                     break;
326                 }
327
328                 destination.Enqueue(current);
329             }
330
331             // Flush remaining data to the query consumer in preparation for channel shutdown.
332             destination.FlushBuffers();
333         }
334
335         //-----------------------------------------------------------------------------------
336         // Ensure we signal that the channel is complete.
337         //
338
339         protected override void SpoolingFinally()
340         {
341             // Call the base implementation.
342             base.SpoolingFinally();
343
344             // Signal that we are done, in the case of asynchronous consumption.
345             if (m_destination != null)
346             {
347                 m_destination.SetDone();
348             }
349
350             // Dispose of the source enumerator *after* signaling that the task is done.
351             // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
352             m_source.Dispose();
353         }
354     }
355
356     /// <summary>
357     /// A spooling task handles marshaling data from a producer to a consumer. It's given
358     /// a single enumerator object that contains all of the production algorithms, a single
359     /// destination channel from which consumers draw results, and (optionally) a
360     /// synchronization primitive using which to notify asynchronous consumers.
361     /// </summary>
362     /// <typeparam name="TInputOutput"></typeparam>
363     /// <typeparam name="TIgnoreKey"></typeparam>
364     internal class ForAllSpoolingTask<TInputOutput, TIgnoreKey> : SpoolingTaskBase
365     {
366         // The data source from which to pull data.
367         private QueryOperatorEnumerator<TInputOutput, TIgnoreKey> m_source;
368
369         //-----------------------------------------------------------------------------------
370         // Creates, but does not execute, a new spooling task.
371         //
372         // Arguments:
373         //     taskIndex   - the unique index of this task
374         //     source      - the producer enumerator
375         //     destination - the destination channel into which to spool elements
376         //
377         // Assumptions:
378         //     Source cannot be null, although the other arguments may be.
379         //
380
381         internal ForAllSpoolingTask(
382             int taskIndex, QueryTaskGroupState groupState,
383             QueryOperatorEnumerator<TInputOutput, TIgnoreKey> source)
384             : base(taskIndex, groupState)
385         {
386             Contract.Assert(source != null);
387             m_source = source;
388         }
389
390         //-----------------------------------------------------------------------------------
391         // This method is responsible for enumerating results and enqueueing them to
392         // the output channel(s) as appropriate.  Each base class implements its own.
393         //
394
395         protected override void SpoolingWork()
396         {
397             // We just enumerate over the entire source data stream for effect.
398             TInputOutput currentUnused = default(TInputOutput);
399             TIgnoreKey keyUnused = default(TIgnoreKey);
400
401             //Note: this only ever runs with a ForAll operator, and ForAllEnumerator performs cancellation checks
402             while (m_source.MoveNext(ref currentUnused, ref keyUnused)) 
403                 ;
404         }
405
406         //-----------------------------------------------------------------------------------
407         // Ensure we signal that the channel is complete.
408         //
409
410         protected override void SpoolingFinally()
411         {
412             // Call the base implementation.
413             base.SpoolingFinally();
414
415             // Dispose of the source enumerator
416             m_source.Dispose();
417         }
418     }
419 }