Merge pull request #3626 from lateralusX/jlorenss/win-api-family-support-eglib
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / Scheduling / OrderPreservingPipeliningSpoolingTask.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // OrderPreservingPipeliningSpoolingTask.cs
9 //
10 // <OWNER>[....]</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System;
15 using System.Collections.Generic;
16 using System.Linq;
17 using System.Linq.Parallel;
18 using System.Text;
19 using System.Threading;
20 using System.Threading.Tasks;
21 using System.Diagnostics.Contracts;
22
23 namespace System.Linq.Parallel
24 {
25     class OrderPreservingPipeliningSpoolingTask<TOutput, TKey> : SpoolingTaskBase
26     {
27         private readonly QueryTaskGroupState m_taskGroupState; // State shared among tasks.
28 #if !MONO
29         private readonly TaskScheduler m_taskScheduler; // The task manager to execute the query.
30 #endif
31         private readonly QueryOperatorEnumerator<TOutput, TKey> m_partition; // The source partition.
32         private readonly bool[] m_consumerWaiting; // Whether a consumer is waiting on a particular producer
33         private readonly bool[] m_producerWaiting; // Whether a particular producer is waiting on the consumer
34         private readonly bool[] m_producerDone; // Whether each producer is done
35         private readonly int m_partitionIndex; // Index of the partition owned by this task.
36
37         private readonly Queue<Pair<TKey, TOutput>>[] m_buffers; // The buffer for the results
38         private readonly object m_bufferLock; // A lock for the buffer
39
40         /// <summary>
41         /// Whether the producer is allowed to buffer up elements before handing a chunk to the consumer.
42         /// If false, the producer will make each result available to the consumer immediately after it is
43         /// produced.
44         /// </summary>
45         private readonly bool m_autoBuffered;
46
47         /// <summary>
48         /// The number of elements to accumulate on the producer before copying the elements to the 
49         /// producer-consumer buffer. This constant is only used in the AutoBuffered mode.
50         /// 
51         /// Experimentally, 16 appears to be sufficient buffer size to compensate for the synchronization
52         /// cost.
53         /// </summary>
54         private const int PRODUCER_BUFFER_AUTO_SIZE = 16;
55
56         /// <summary>
57         /// Constructor
58         /// </summary>
59         internal OrderPreservingPipeliningSpoolingTask(
60             QueryOperatorEnumerator<TOutput, TKey> partition, 
61             QueryTaskGroupState taskGroupState,
62             bool[] consumerWaiting,
63             bool[] producerWaiting,
64             bool[] producerDone,
65             int partitionIndex,
66             Queue<Pair<TKey, TOutput>>[] buffers,
67             object bufferLock,
68             TaskScheduler taskScheduler,
69             bool autoBuffered)
70             :base(partitionIndex, taskGroupState)
71         {
72             Contract.Requires(partition != null);
73             Contract.Requires(taskGroupState != null);
74             Contract.Requires(consumerWaiting != null);
75             Contract.Requires(producerWaiting != null && producerWaiting.Length == consumerWaiting.Length);
76             Contract.Requires(producerDone != null && producerDone.Length == consumerWaiting.Length);
77             Contract.Requires(buffers != null && buffers.Length == consumerWaiting.Length);
78             Contract.Requires(partitionIndex >= 0 && partitionIndex < consumerWaiting.Length);
79
80             m_partition = partition;
81             m_taskGroupState = taskGroupState;
82             m_producerDone = producerDone;
83             m_consumerWaiting = consumerWaiting;
84             m_producerWaiting = producerWaiting;
85             m_partitionIndex = partitionIndex;
86             m_buffers = buffers;
87             m_bufferLock = bufferLock;
88 #if !MONO
89             m_taskScheduler = taskScheduler;
90 #endif
91             m_autoBuffered = autoBuffered;
92         }
93
94         /// <summary>
95         /// This method is responsible for enumerating results and enqueueing them to
96         /// the output buffer as appropriate.  Each base class implements its own.
97         /// </summary>
98         protected override void SpoolingWork()
99         {
100             TOutput element = default(TOutput);
101             TKey key = default(TKey);
102
103             int chunkSize = m_autoBuffered ? PRODUCER_BUFFER_AUTO_SIZE : 1;
104             Pair<TKey,TOutput>[] chunk = new Pair<TKey,TOutput>[chunkSize];
105             var partition = m_partition;
106             CancellationToken cancelToken = m_taskGroupState.CancellationState.MergedCancellationToken;
107
108             int lastChunkSize;
109             do
110             {
111                 lastChunkSize = 0;
112                 while (lastChunkSize < chunkSize && partition.MoveNext(ref element, ref key))
113                 {
114                     chunk[lastChunkSize] = new Pair<TKey, TOutput>(key, element);
115                     lastChunkSize++;
116                 }
117
118                 if (lastChunkSize == 0) break;
119
120                 lock (m_bufferLock)
121                 {
122                     // Check if the query has been cancelled.
123                     if (cancelToken.IsCancellationRequested)
124                     {
125                         break;
126                     }
127
128                     for (int i = 0; i < lastChunkSize; i++)
129                     {
130                         m_buffers[m_partitionIndex].Enqueue(chunk[i]);
131                     }
132
133                     if (m_consumerWaiting[m_partitionIndex])
134                     {
135                         Monitor.Pulse(m_bufferLock);
136                         m_consumerWaiting[m_partitionIndex] = false;
137                     }
138
139                     // If the producer buffer is too large, wait.
140                     // Note: we already checked for cancellation after acquiring the lock on this producer.
141                     // That guarantees that the consumer will eventually wake up the producer.
142                     if (m_buffers[m_partitionIndex].Count >= OrderPreservingPipeliningMergeHelper<TOutput, TKey>.MAX_BUFFER_SIZE)
143                     {
144                         m_producerWaiting[m_partitionIndex] = true;
145                         Monitor.Wait(m_bufferLock);
146                     }
147                 }
148             } while (lastChunkSize == chunkSize);
149         }
150
151
152         /// <summary>
153         /// Creates and begins execution of a new set of spooling tasks.
154         /// </summary>
155         public static void Spool(
156             QueryTaskGroupState groupState, PartitionedStream<TOutput, TKey> partitions,
157             bool[] consumerWaiting, bool[] producerWaiting, bool[] producerDone, 
158             Queue<Pair<TKey,TOutput>>[] buffers, object[] bufferLocks,
159             TaskScheduler taskScheduler, bool autoBuffered)
160         {
161             Contract.Requires(groupState != null);
162             Contract.Requires(partitions != null);
163             Contract.Requires(producerDone != null && producerDone.Length == partitions.PartitionCount);
164             Contract.Requires(buffers != null && buffers.Length == partitions.PartitionCount);
165             Contract.Requires(bufferLocks != null);
166
167             int degreeOfParallelism = partitions.PartitionCount;
168
169             // Initialize the buffers and buffer locks.
170             for (int i = 0; i < degreeOfParallelism; i++)
171             {
172                 buffers[i] = new Queue<Pair<TKey, TOutput>>(OrderPreservingPipeliningMergeHelper<TOutput, TKey>.INITIAL_BUFFER_SIZE);
173                 bufferLocks[i] = new object();
174             }
175
176             // Ensure all tasks in this query are parented under a common root. Because this
177             // is a pipelined query, we detach it from the parent (to avoid blocking the calling
178             // thread), and run the query on a separate thread.
179             Task rootTask = new Task(
180                 () =>
181                 {
182                     for (int i = 0; i < degreeOfParallelism; i++)
183                     {
184                         QueryTask asyncTask = new OrderPreservingPipeliningSpoolingTask<TOutput, TKey>(
185                             partitions[i], groupState, consumerWaiting, producerWaiting, 
186                             producerDone, i, buffers, bufferLocks[i], taskScheduler, autoBuffered);
187                         asyncTask.RunAsynchronously(taskScheduler);
188                     }
189                 });
190
191             // Begin the query on the calling thread.
192             groupState.QueryBegin(rootTask);
193
194             // And schedule it for execution.  This is done after beginning to ensure no thread tries to
195             // end the query before its root task has been recorded properly.
196             rootTask.Start(taskScheduler);
197
198             // We don't call QueryEnd here; when we return, the query is still executing, and the
199             // last enumerator to be disposed of will call QueryEnd for us.
200         }
201
202         /// <summary>
203         /// Dispose the underlying enumerator and wake up the consumer if necessary.
204         /// </summary>
205         protected override void SpoolingFinally()
206         {
207             // Let the consumer know that this producer is done.
208             lock (m_bufferLock)
209             {
210                 m_producerDone[m_partitionIndex] = true;
211                 if (m_consumerWaiting[m_partitionIndex])
212                 {
213                     Monitor.Pulse(m_bufferLock);
214                     m_consumerWaiting[m_partitionIndex] = false;
215                 }
216             }
217
218             // Call the base implementation.
219             base.SpoolingFinally();
220
221             // Dispose of the source enumerator *after* signaling that the task is done.
222             // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
223             m_partition.Dispose();
224         }
225     }
226 }