[sgen] Untag the vtable during concurrent mark
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / Scheduling / OrderPreservingPipeliningSpoolingTask.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // OrderPreservingPipeliningSpoolingTask.cs
9 //
10 // <OWNER>[....]</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System;
15 using System.Collections.Generic;
16 using System.Linq;
17 using System.Linq.Parallel;
18 using System.Text;
19 using System.Threading;
20 using System.Threading.Tasks;
21 using System.Diagnostics.Contracts;
22
23 namespace System.Linq.Parallel
24 {
25     class OrderPreservingPipeliningSpoolingTask<TOutput, TKey> : SpoolingTaskBase
26     {
27         private readonly QueryTaskGroupState m_taskGroupState; // State shared among tasks.
28         private readonly TaskScheduler m_taskScheduler; // The task manager to execute the query.
29         private readonly QueryOperatorEnumerator<TOutput, TKey> m_partition; // The source partition.
30         private readonly bool[] m_consumerWaiting; // Whether a consumer is waiting on a particular producer
31         private readonly bool[] m_producerWaiting; // Whether a particular producer is waiting on the consumer
32         private readonly bool[] m_producerDone; // Whether each producer is done
33         private readonly int m_partitionIndex; // Index of the partition owned by this task.
34
35         private readonly Queue<Pair<TKey, TOutput>>[] m_buffers; // The buffer for the results
36         private readonly object m_bufferLock; // A lock for the buffer
37
38         /// <summary>
39         /// Whether the producer is allowed to buffer up elements before handing a chunk to the consumer.
40         /// If false, the producer will make each result available to the consumer immediately after it is
41         /// produced.
42         /// </summary>
43         private readonly bool m_autoBuffered;
44
45         /// <summary>
46         /// The number of elements to accumulate on the producer before copying the elements to the 
47         /// producer-consumer buffer. This constant is only used in the AutoBuffered mode.
48         /// 
49         /// Experimentally, 16 appears to be sufficient buffer size to compensate for the synchronization
50         /// cost.
51         /// </summary>
52         private const int PRODUCER_BUFFER_AUTO_SIZE = 16;
53
54         /// <summary>
55         /// Constructor
56         /// </summary>
57         internal OrderPreservingPipeliningSpoolingTask(
58             QueryOperatorEnumerator<TOutput, TKey> partition, 
59             QueryTaskGroupState taskGroupState,
60             bool[] consumerWaiting,
61             bool[] producerWaiting,
62             bool[] producerDone,
63             int partitionIndex,
64             Queue<Pair<TKey, TOutput>>[] buffers,
65             object bufferLock,
66             TaskScheduler taskScheduler,
67             bool autoBuffered)
68             :base(partitionIndex, taskGroupState)
69         {
70             Contract.Requires(partition != null);
71             Contract.Requires(taskGroupState != null);
72             Contract.Requires(consumerWaiting != null);
73             Contract.Requires(producerWaiting != null && producerWaiting.Length == consumerWaiting.Length);
74             Contract.Requires(producerDone != null && producerDone.Length == consumerWaiting.Length);
75             Contract.Requires(buffers != null && buffers.Length == consumerWaiting.Length);
76             Contract.Requires(partitionIndex >= 0 && partitionIndex < consumerWaiting.Length);
77
78             m_partition = partition;
79             m_taskGroupState = taskGroupState;
80             m_producerDone = producerDone;
81             m_consumerWaiting = consumerWaiting;
82             m_producerWaiting = producerWaiting;
83             m_partitionIndex = partitionIndex;
84             m_buffers = buffers;
85             m_bufferLock = bufferLock;
86             m_taskScheduler = taskScheduler;
87             m_autoBuffered = autoBuffered;
88         }
89
90         /// <summary>
91         /// This method is responsible for enumerating results and enqueueing them to
92         /// the output buffer as appropriate.  Each base class implements its own.
93         /// </summary>
94         protected override void SpoolingWork()
95         {
96             TOutput element = default(TOutput);
97             TKey key = default(TKey);
98
99             int chunkSize = m_autoBuffered ? PRODUCER_BUFFER_AUTO_SIZE : 1;
100             Pair<TKey,TOutput>[] chunk = new Pair<TKey,TOutput>[chunkSize];
101             var partition = m_partition;
102             CancellationToken cancelToken = m_taskGroupState.CancellationState.MergedCancellationToken;
103
104             int lastChunkSize;
105             do
106             {
107                 lastChunkSize = 0;
108                 while (lastChunkSize < chunkSize && partition.MoveNext(ref element, ref key))
109                 {
110                     chunk[lastChunkSize] = new Pair<TKey, TOutput>(key, element);
111                     lastChunkSize++;
112                 }
113
114                 if (lastChunkSize == 0) break;
115
116                 lock (m_bufferLock)
117                 {
118                     // Check if the query has been cancelled.
119                     if (cancelToken.IsCancellationRequested)
120                     {
121                         break;
122                     }
123
124                     for (int i = 0; i < lastChunkSize; i++)
125                     {
126                         m_buffers[m_partitionIndex].Enqueue(chunk[i]);
127                     }
128
129                     if (m_consumerWaiting[m_partitionIndex])
130                     {
131                         Monitor.Pulse(m_bufferLock);
132                         m_consumerWaiting[m_partitionIndex] = false;
133                     }
134
135                     // If the producer buffer is too large, wait.
136                     // Note: we already checked for cancellation after acquiring the lock on this producer.
137                     // That guarantees that the consumer will eventually wake up the producer.
138                     if (m_buffers[m_partitionIndex].Count >= OrderPreservingPipeliningMergeHelper<TOutput, TKey>.MAX_BUFFER_SIZE)
139                     {
140                         m_producerWaiting[m_partitionIndex] = true;
141                         Monitor.Wait(m_bufferLock);
142                     }
143                 }
144             } while (lastChunkSize == chunkSize);
145         }
146
147
148         /// <summary>
149         /// Creates and begins execution of a new set of spooling tasks.
150         /// </summary>
151         public static void Spool(
152             QueryTaskGroupState groupState, PartitionedStream<TOutput, TKey> partitions,
153             bool[] consumerWaiting, bool[] producerWaiting, bool[] producerDone, 
154             Queue<Pair<TKey,TOutput>>[] buffers, object[] bufferLocks,
155             TaskScheduler taskScheduler, bool autoBuffered)
156         {
157             Contract.Requires(groupState != null);
158             Contract.Requires(partitions != null);
159             Contract.Requires(producerDone != null && producerDone.Length == partitions.PartitionCount);
160             Contract.Requires(buffers != null && buffers.Length == partitions.PartitionCount);
161             Contract.Requires(bufferLocks != null);
162
163             int degreeOfParallelism = partitions.PartitionCount;
164
165             // Initialize the buffers and buffer locks.
166             for (int i = 0; i < degreeOfParallelism; i++)
167             {
168                 buffers[i] = new Queue<Pair<TKey, TOutput>>(OrderPreservingPipeliningMergeHelper<TOutput, TKey>.INITIAL_BUFFER_SIZE);
169                 bufferLocks[i] = new object();
170             }
171
172             // Ensure all tasks in this query are parented under a common root. Because this
173             // is a pipelined query, we detach it from the parent (to avoid blocking the calling
174             // thread), and run the query on a separate thread.
175             Task rootTask = new Task(
176                 () =>
177                 {
178                     for (int i = 0; i < degreeOfParallelism; i++)
179                     {
180                         QueryTask asyncTask = new OrderPreservingPipeliningSpoolingTask<TOutput, TKey>(
181                             partitions[i], groupState, consumerWaiting, producerWaiting, 
182                             producerDone, i, buffers, bufferLocks[i], taskScheduler, autoBuffered);
183                         asyncTask.RunAsynchronously(taskScheduler);
184                     }
185                 });
186
187             // Begin the query on the calling thread.
188             groupState.QueryBegin(rootTask);
189
190             // And schedule it for execution.  This is done after beginning to ensure no thread tries to
191             // end the query before its root task has been recorded properly.
192             rootTask.Start(taskScheduler);
193
194             // We don't call QueryEnd here; when we return, the query is still executing, and the
195             // last enumerator to be disposed of will call QueryEnd for us.
196         }
197
198         /// <summary>
199         /// Dispose the underlying enumerator and wake up the consumer if necessary.
200         /// </summary>
201         protected override void SpoolingFinally()
202         {
203             // Let the consumer know that this producer is done.
204             lock (m_bufferLock)
205             {
206                 m_producerDone[m_partitionIndex] = true;
207                 if (m_consumerWaiting[m_partitionIndex])
208                 {
209                     Monitor.Pulse(m_bufferLock);
210                     m_consumerWaiting[m_partitionIndex] = false;
211                 }
212             }
213
214             // Call the base implementation.
215             base.SpoolingFinally();
216
217             // Dispose of the source enumerator *after* signaling that the task is done.
218             // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock.
219             m_partition.Dispose();
220         }
221     }
222 }