3 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 #if NET_4_0 || BOOTSTRAP_NET_4_0
27 using System.Threading;
28 using System.Collections.Concurrent;
30 namespace System.Threading.Tasks
32 internal class ThreadWorker : IDisposable
36 readonly IDequeOperations<Task> dDeque;
37 readonly ThreadWorker[] others;
38 readonly EventWaitHandle waitHandle;
39 readonly IProducerConsumerCollection<Task> sharedWorkQueue;
40 readonly IScheduler sched;
41 readonly ThreadPriority threadPriority;
43 // Flag to tell if workerThread is running
46 readonly int workerLength;
47 readonly int workerPosition;
48 const int maxRetry = 5;
50 const int sleepThreshold = 100;
51 const int deepSleepTime = 10;
53 public ThreadWorker (IScheduler sched,
54 ThreadWorker[] others,
56 IProducerConsumerCollection<Task> sharedWorkQueue,
57 ThreadPriority priority,
58 EventWaitHandle handle)
61 this.dDeque = new CyclicDeque<Task> ();
63 this.sharedWorkQueue = sharedWorkQueue;
64 this.workerLength = others.Length;
65 this.workerPosition = workerPosition;
66 this.waitHandle = handle;
67 this.threadPriority = priority;
69 InitializeUnderlyingThread ();
72 void InitializeUnderlyingThread ()
74 this.workerThread = new Thread (WorkerMethodWrapper);
76 this.workerThread.IsBackground = true;
77 this.workerThread.Priority = threadPriority;
78 this.workerThread.Name = "ParallelFxThreadWorker";
81 public void Dispose ()
84 if (workerThread.ThreadState != ThreadState.Stopped)
85 workerThread.Abort ();
93 // If the thread was stopped then set it in use and restart it
94 int result = Interlocked.Exchange (ref started, 1);
98 if (this.workerThread.ThreadState != ThreadState.Unstarted) {
99 InitializeUnderlyingThread ();
102 workerThread.Start ();
107 // Set the flag to stop so that the while in the thread will stop
108 // doing its infinite loop.
112 // This is the actual method called in the Thread
113 void WorkerMethodWrapper ()
116 SpinWait wait = new SpinWait ();
119 while (started == 1) {
122 result = WorkerMethod ();
126 wait = new SpinWait ();
129 // Wait a little and if the Thread has been more sleeping than working shut it down
132 // If we are spinning too much, have a deeper sleep
133 if (sleepTime++ > sleepThreshold)
134 waitHandle.WaitOne (deepSleepTime);
140 // Main method, used to do all the logic of retrieving, processing and stealing work.
144 bool hasStolenFromOther;
146 hasStolenFromOther = false;
150 // We fill up our work deque concurrently with other ThreadWorker
151 while (sharedWorkQueue.Count > 0) {
152 while (sharedWorkQueue.TryTake (out value)) {
153 dDeque.PushBottom (value);
156 // Now we process our work
157 while (dDeque.PopBottom (out value) == PopResult.Succeed) {
159 value.Execute (ChildWorkAdder);
165 // When we have finished, steal from other worker
168 // Repeat the operation a little so that we can let other things process.
169 for (int j = 0; j < maxRetry; ++j) {
170 int len = workerLength + workerPosition;
171 // Start stealing with the ThreadWorker at our right to minimize contention
172 for (int it = workerPosition + 1; it < len; ++it) {
173 int i = it % workerLength;
174 if ((other = others [i]) == null || other == this)
177 // Maybe make this steal more than one item at a time, see TODO.
178 if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
179 hasStolenFromOther = true;
181 value.Execute (ChildWorkAdder);
187 } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
192 // Almost same as above but with an added predicate and treating one item at a time.
193 // It's used by Scheduler Participate(...) method for special waiting case like
194 // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
195 // Predicate should be really fast and not blocking as it is called a good deal of time
196 // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
197 public static void WorkerMethod (Func<bool> predicate, IProducerConsumerCollection<Task> sharedWorkQueue,
198 ThreadWorker[] others)
200 SpinWait wait = new SpinWait ();
202 while (!predicate ()) {
205 // Dequeue only one item as we have restriction
206 if (sharedWorkQueue.TryTake (out value)) {
208 if (CheckTaskFitness (value))
209 value.Execute (null);
211 sharedWorkQueue.TryAdd (value);
215 // First check to see if we comply to predicate
219 // Try to complete other work by stealing since our desired tasks may be in other worker
221 for (int i = 0; i < others.Length; i++) {
222 if ((other = others [i]) == null)
225 if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
227 if (CheckTaskFitness (value))
228 value.Execute (null);
230 sharedWorkQueue.TryAdd (value);
242 internal void ChildWorkAdder (Task t)
244 dDeque.PushBottom (t);
248 static bool CheckTaskFitness (Task t)
250 return (t.CreationOptions | TaskCreationOptions.LongRunning) > 0;
253 public bool Finished {
261 return workerThread.ManagedThreadId;
265 public bool Equals (ThreadWorker other)
267 return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);
270 public override bool Equals (object obj)
272 ThreadWorker temp = obj as ThreadWorker;
273 return temp == null ? false : Equals (temp);
276 public override int GetHashCode ()
278 return workerThread.ManagedThreadId.GetHashCode ();