3 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 using System.Threading;
28 using System.Collections.Concurrent;
30 #if INSIDE_MONO_PARALLEL
31 using System.Threading.Tasks;
32 using Watch = System.Diagnostics.Stopwatch;
34 namespace Mono.Threading.Tasks
36 namespace System.Threading.Tasks
39 #if INSIDE_MONO_PARALLEL
42 class ThreadWorker : IDisposable
46 /* This field is used when a TheadWorker have to call Task.Wait
47 * which bring him back here with the static WorkerMethod although
48 * it's more optimized for him to continue calling its own WorkerMethod
51 static ThreadWorker autoReference;
53 readonly IConcurrentDeque<Task> dDeque;
54 readonly ThreadWorker[] others;
55 readonly ManualResetEvent waitHandle;
56 readonly IProducerConsumerCollection<Task> sharedWorkQueue;
57 readonly ThreadPriority threadPriority;
59 // Flag to tell if workerThread is running
62 readonly int workerLength;
63 readonly int workerPosition;
64 const int maxRetry = 3;
66 const int sleepThreshold = 100;
67 int deepSleepTime = 8;
68 readonly Action<Task> adder;
72 public ThreadWorker (ThreadWorker[] others,
74 IProducerConsumerCollection<Task> sharedWorkQueue,
75 IConcurrentDeque<Task> dDeque,
76 ThreadPriority priority,
77 ManualResetEvent handle)
81 this.sharedWorkQueue = sharedWorkQueue;
82 this.workerLength = others.Length;
83 this.workerPosition = workerPosition;
84 this.waitHandle = handle;
85 this.threadPriority = priority;
86 this.adder = new Action<Task> (ChildWorkAdder);
88 InitializeUnderlyingThread ();
91 #if INSIDE_MONO_PARALLEL
94 void InitializeUnderlyingThread ()
96 this.workerThread = new Thread (WorkerMethodWrapper);
98 this.workerThread.IsBackground = true;
99 this.workerThread.Priority = threadPriority;
100 this.workerThread.Name = "ParallelFxThreadWorker";
103 #if INSIDE_MONO_PARALLEL
106 public void Dispose ()
109 if (workerThread.ThreadState != ThreadState.Stopped)
110 workerThread.Abort ();
113 #if INSIDE_MONO_PARALLEL
121 // If the thread was stopped then set it in use and restart it
122 int result = Interlocked.Exchange (ref started, 1);
126 if (this.workerThread.ThreadState != ThreadState.Unstarted) {
127 InitializeUnderlyingThread ();
130 workerThread.Start ();
133 #if INSIDE_MONO_PARALLEL
138 // Set the flag to stop so that the while in the thread will stop
139 // doing its infinite loop.
143 #if INSIDE_MONO_PARALLEL
146 // This is the actual method called in the Thread
147 void WorkerMethodWrapper ()
150 autoReference = this;
151 bool wasWokenUp = false;
154 while (started == 1) {
157 result = WorkerMethod ();
158 if (!result && wasWokenUp)
170 // If we are spinning too much, have a deeper sleep
171 if (++sleepTime > sleepThreshold && sharedWorkQueue.Count == 0) {
172 wasWokenUp = waitHandle.WaitOne ((deepSleepTime = deepSleepTime >= 0x4000 ? 0x4000 : deepSleepTime << 1));
179 #if INSIDE_MONO_PARALLEL
182 // Main method, used to do all the logic of retrieving, processing and stealing work.
186 bool hasStolenFromOther;
189 hasStolenFromOther = false;
193 // We fill up our work deque concurrently with other ThreadWorker
194 while (sharedWorkQueue.Count > 0) {
197 while (sharedWorkQueue.TryTake (out value)) {
198 dDeque.PushBottom (value);
201 // Now we process our work
202 while (dDeque.PopBottom (out value) == PopResult.Succeed) {
204 ExecuteTask (value, ref result);
208 // When we have finished, steal from other worker
211 // Repeat the operation a little so that we can let other things process.
212 for (int j = 0; j < maxRetry; ++j) {
213 int len = workerLength + workerPosition;
214 // Start stealing with the ThreadWorker at our right to minimize contention
215 for (int it = workerPosition + 1; it < len; ++it) {
216 int i = it % workerLength;
217 if ((other = others [i]) == null || other == this)
220 // Maybe make this steal more than one item at a time, see TODO.
221 while (other.dDeque.PopTop (out value) == PopResult.Succeed) {
222 if (!hasStolenFromOther)
225 hasStolenFromOther = true;
226 ExecuteTask (value, ref result);
230 } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
235 void ExecuteTask (Task value, ref bool result)
240 var saveCurrent = currentTask;
242 value.Execute (adder);
244 currentTask = saveCurrent;
247 #if !INSIDE_MONO_PARALLEL
248 // Almost same as above but with an added predicate and treating one item at a time.
249 // It's used by Scheduler Participate(...) method for special waiting case like
250 // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
251 // Predicate should be really fast and not blocking as it is called a good deal of time
252 // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
253 public static void ParticipativeWorkerMethod (Task self,
254 ManualResetEventSlim predicateEvt,
255 int millisecondsTimeout,
256 IProducerConsumerCollection<Task> sharedWorkQueue,
257 ThreadWorker[] others,
258 ManualResetEvent evt)
260 const int stage1 = 5, stage2 = 0;
262 WaitHandle[] handles = null;
263 Watch watch = Watch.StartNew ();
264 if (millisecondsTimeout == -1)
265 millisecondsTimeout = int.MaxValue;
266 bool aggressive = false;
267 bool hasAutoReference = autoReference != null;
268 Action<Task> adder = null;
270 while (!predicateEvt.IsSet && watch.ElapsedMilliseconds < millisecondsTimeout && !self.IsCompleted) {
271 // We try to execute the self task as it may be the simplest way to unlock
273 if (self.Status == TaskStatus.WaitingToRun) {
274 self.Execute (hasAutoReference ? autoReference.adder : (Action<Task>)null);
275 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
281 // If we are in fact a normal ThreadWorker, use our own deque
282 if (hasAutoReference) {
283 var enumerable = autoReference.dDeque.GetEnumerable ();
285 adder = hasAutoReference ? autoReference.adder : (Action<Task>)null;
287 if (enumerable != null) {
288 foreach (var t in enumerable) {
292 if (CheckTaskFitness (self, t))
295 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
301 int count = sharedWorkQueue.Count;
303 // Dequeue only one item as we have restriction
304 while (--count >= 0 && sharedWorkQueue.TryTake (out value) && value != null) {
306 if (CheckTaskFitness (self, value) || aggressive)
307 value.Execute (null);
309 if (autoReference == null)
310 sharedWorkQueue.TryAdd (value);
312 autoReference.dDeque.PushBottom (value);
316 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
320 // First check to see if we comply to predicate
321 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
324 // Try to complete other work by stealing since our desired tasks may be in other worker
326 for (int i = 0; i < others.Length; i++) {
327 if ((other = others [i]) == autoReference || other == null)
330 if (other.dDeque.PopTop (out value) == PopResult.Succeed && value != null) {
332 if (CheckTaskFitness (self, value) || aggressive)
333 value.Execute (null);
335 if (autoReference == null)
336 sharedWorkQueue.TryAdd (value);
338 autoReference.dDeque.PushBottom (value);
343 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
347 /* Waiting is split in 4 phases
348 * - until stage 1 we simply yield the thread to let others add data
349 * - between stage 1 and stage2 we use ManualResetEventSlim light waiting mechanism
350 * - after stage2 we fall back to the heavier WaitHandle waiting mechanism
351 * - if really the situation isn't evolving after a couple of sleep, we disable
352 * task fitness check altogether
354 if (--tries > stage1)
356 else if (tries >= stage2)
357 predicateEvt.Wait (ComputeTimeout (5, millisecondsTimeout, watch));
359 if (tries == stage2 - 1)
360 handles = new [] { predicateEvt.WaitHandle, evt };
361 WaitHandle.WaitAny (handles, ComputeTimeout (1000, millisecondsTimeout, watch));
362 if (tries == stage2 - 10)
368 static bool CheckTaskFitness (Task self, Task t)
370 return ((t.CreationOptions & TaskCreationOptions.LongRunning) == 0 && t.Id < self.Id)
373 || (autoReference != null && autoReference.currentTask != null && autoReference.currentTask == t.Parent);
376 public static ThreadWorker AutoReference {
378 return autoReference;
381 autoReference = value;
385 protected IConcurrentDeque<Task> Deque {
391 protected ThreadWorker[] Others {
397 protected ManualResetEvent WaitHandle {
403 protected ThreadPriority Priority {
405 return threadPriority;
409 protected int WorkerPosition {
411 return workerPosition;
416 #if INSIDE_MONO_PARALLEL
419 internal void ChildWorkAdder (Task t)
421 dDeque.PushBottom (t);
425 static int ComputeTimeout (int proposed, int timeout, Watch watch)
427 return timeout == int.MaxValue ? proposed : System.Math.Min (proposed, System.Math.Max (0, (int)(timeout - watch.ElapsedMilliseconds)));
430 public bool Finished {
438 return workerThread.ManagedThreadId;
442 public virtual bool Equals (ThreadWorker other)
444 return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);
447 public override bool Equals (object obj)
449 ThreadWorker temp = obj as ThreadWorker;
450 return temp == null ? false : Equals (temp);
453 public override int GetHashCode ()
455 return workerThread.ManagedThreadId.GetHashCode ();