3 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 using System.Threading;
28 using System.Collections.Concurrent;
29 using System.Threading.Tasks;
30 using Watch = System.Diagnostics.Stopwatch;
32 namespace Mono.Threading.Tasks
34 public class ThreadWorker : IDisposable
38 /* This field is used when a TheadWorker have to call Task.Wait
39 * which bring him back here with the static WorkerMethod although
40 * it's more optimized for him to continue calling its own WorkerMethod
43 static ThreadWorker autoReference;
45 readonly IConcurrentDeque<Task> dDeque;
46 readonly ThreadWorker[] others;
47 readonly ManualResetEvent waitHandle;
48 readonly IProducerConsumerCollection<Task> sharedWorkQueue;
49 readonly ThreadPriority threadPriority;
51 // Flag to tell if workerThread is running
54 readonly int workerLength;
55 readonly int workerPosition;
56 const int maxRetry = 3;
58 const int sleepThreshold = 100;
59 int deepSleepTime = 8;
60 readonly Action<Task> adder;
64 public ThreadWorker (ThreadWorker[] others,
66 IProducerConsumerCollection<Task> sharedWorkQueue,
67 IConcurrentDeque<Task> dDeque,
68 ThreadPriority priority,
69 ManualResetEvent handle)
73 this.sharedWorkQueue = sharedWorkQueue;
74 this.workerLength = others.Length;
75 this.workerPosition = workerPosition;
76 this.waitHandle = handle;
77 this.threadPriority = priority;
78 this.adder = new Action<Task> (ChildWorkAdder);
80 InitializeUnderlyingThread ();
83 protected virtual void InitializeUnderlyingThread ()
85 this.workerThread = new Thread (WorkerMethodWrapper);
87 this.workerThread.IsBackground = true;
88 this.workerThread.Priority = threadPriority;
89 this.workerThread.Name = "ParallelFxThreadWorker";
92 public virtual void Dispose ()
95 if (workerThread.ThreadState != ThreadState.Stopped)
96 workerThread.Abort ();
99 public virtual void Pulse ()
104 // If the thread was stopped then set it in use and restart it
105 int result = Interlocked.Exchange (ref started, 1);
109 if (this.workerThread.ThreadState != ThreadState.Unstarted) {
110 InitializeUnderlyingThread ();
113 workerThread.Start ();
116 public virtual void Stop ()
118 // Set the flag to stop so that the while in the thread will stop
119 // doing its infinite loop.
123 // This is the actual method called in the Thread
124 protected virtual void WorkerMethodWrapper ()
127 autoReference = this;
128 bool wasWokenUp = false;
131 while (started == 1) {
134 result = WorkerMethod ();
135 if (!result && wasWokenUp)
147 // If we are spinning too much, have a deeper sleep
148 if (++sleepTime > sleepThreshold && sharedWorkQueue.Count == 0) {
149 wasWokenUp = waitHandle.WaitOne ((deepSleepTime = deepSleepTime >= 0x4000 ? 0x4000 : deepSleepTime << 1));
156 // Main method, used to do all the logic of retrieving, processing and stealing work.
157 protected virtual bool WorkerMethod ()
160 bool hasStolenFromOther;
163 hasStolenFromOther = false;
167 // We fill up our work deque concurrently with other ThreadWorker
168 while (sharedWorkQueue.Count > 0) {
171 while (sharedWorkQueue.TryTake (out value)) {
172 dDeque.PushBottom (value);
175 // Now we process our work
176 while (dDeque.PopBottom (out value) == PopResult.Succeed) {
178 ExecuteTask (value, ref result);
182 // When we have finished, steal from other worker
185 // Repeat the operation a little so that we can let other things process.
186 for (int j = 0; j < maxRetry; ++j) {
187 int len = workerLength + workerPosition;
188 // Start stealing with the ThreadWorker at our right to minimize contention
189 for (int it = workerPosition + 1; it < len; ++it) {
190 int i = it % workerLength;
191 if ((other = others [i]) == null || other == this)
194 // Maybe make this steal more than one item at a time, see TODO.
195 while (other.dDeque.PopTop (out value) == PopResult.Succeed) {
196 if (!hasStolenFromOther)
199 hasStolenFromOther = true;
200 ExecuteTask (value, ref result);
204 } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
209 void ExecuteTask (Task value, ref bool result)
214 var saveCurrent = currentTask;
216 value.Execute (adder);
218 currentTask = saveCurrent;
221 // Almost same as above but with an added predicate and treating one item at a time.
222 // It's used by Scheduler Participate(...) method for special waiting case like
223 // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
224 // Predicate should be really fast and not blocking as it is called a good deal of time
225 // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
226 public static void ParticipativeWorkerMethod (Task self,
227 ManualResetEventSlim predicateEvt,
228 int millisecondsTimeout,
229 IProducerConsumerCollection<Task> sharedWorkQueue,
230 ThreadWorker[] others,
231 ManualResetEvent evt,
232 Func<Task, Task, bool> checkTaskFitness)
234 const int stage1 = 5, stage2 = 0;
236 WaitHandle[] handles = null;
237 Watch watch = Watch.StartNew ();
238 if (millisecondsTimeout == -1)
239 millisecondsTimeout = int.MaxValue;
240 bool aggressive = false;
241 bool hasAutoReference = autoReference != null;
242 Action<Task> adder = null;
244 while (!predicateEvt.IsSet && watch.ElapsedMilliseconds < millisecondsTimeout && !self.IsCompleted) {
245 // We try to execute the self task as it may be the simplest way to unlock
247 if (self.Status == TaskStatus.WaitingToRun) {
248 self.Execute (hasAutoReference ? autoReference.adder : (Action<Task>)null);
249 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
255 // If we are in fact a normal ThreadWorker, use our own deque
256 if (hasAutoReference) {
257 var enumerable = autoReference.dDeque.GetEnumerable ();
259 adder = hasAutoReference ? autoReference.adder : (Action<Task>)null;
261 if (enumerable != null) {
262 foreach (var t in enumerable) {
266 if (checkTaskFitness (self, t))
269 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
275 int count = sharedWorkQueue.Count;
277 // Dequeue only one item as we have restriction
278 while (--count >= 0 && sharedWorkQueue.TryTake (out value) && value != null) {
280 if (checkTaskFitness (self, value) || aggressive)
281 value.Execute (null);
283 if (autoReference == null)
284 sharedWorkQueue.TryAdd (value);
286 autoReference.dDeque.PushBottom (value);
290 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
294 // First check to see if we comply to predicate
295 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
298 // Try to complete other work by stealing since our desired tasks may be in other worker
300 for (int i = 0; i < others.Length; i++) {
301 if ((other = others [i]) == autoReference || other == null)
304 if (other.dDeque.PopTop (out value) == PopResult.Succeed && value != null) {
306 if (checkTaskFitness (self, value) || aggressive)
307 value.Execute (null);
309 if (autoReference == null)
310 sharedWorkQueue.TryAdd (value);
312 autoReference.dDeque.PushBottom (value);
317 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
321 /* Waiting is split in 4 phases
322 * - until stage 1 we simply yield the thread to let others add data
323 * - between stage 1 and stage2 we use ManualResetEventSlim light waiting mechanism
324 * - after stage2 we fall back to the heavier WaitHandle waiting mechanism
325 * - if really the situation isn't evolving after a couple of sleep, we disable
326 * task fitness check altogether
328 if (--tries > stage1)
330 else if (tries >= stage2)
331 predicateEvt.Wait (ComputeTimeout (5, millisecondsTimeout, watch));
333 if (tries == stage2 - 1)
334 handles = new [] { predicateEvt.WaitHandle, evt };
335 System.Threading.WaitHandle.WaitAny (handles, ComputeTimeout (1000, millisecondsTimeout, watch));
336 if (tries == stage2 - 10)
342 public static ThreadWorker AutoReference {
344 return autoReference;
347 autoReference = value;
351 protected IConcurrentDeque<Task> Deque {
357 protected ThreadWorker[] Others {
363 protected ManualResetEvent WaitHandle {
369 protected ThreadPriority Priority {
371 return threadPriority;
375 protected int WorkerPosition {
377 return workerPosition;
381 protected virtual void ChildWorkAdder (Task t)
383 dDeque.PushBottom (t);
387 static int ComputeTimeout (int proposed, int timeout, Watch watch)
389 return timeout == int.MaxValue ? proposed : System.Math.Min (proposed, System.Math.Max (0, (int)(timeout - watch.ElapsedMilliseconds)));
392 public bool Finished {
400 return workerThread.ManagedThreadId;
404 public virtual bool Equals (ThreadWorker other)
406 return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);
409 public override bool Equals (object obj)
411 ThreadWorker temp = obj as ThreadWorker;
412 return temp == null ? false : Equals (temp);
415 public override int GetHashCode ()
417 return workerThread.ManagedThreadId.GetHashCode ();