1 #if NET_4_0 || BOOTSTRAP_NET_4_0
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 using System.Threading;
28 using System.Collections.Concurrent;
30 namespace System.Threading.Tasks
32 internal class ThreadWorker : IDisposable
34 static Random r = new Random ();
38 readonly ThreadWorker[] others;
39 internal readonly IDequeOperations<Task> dDeque;
40 readonly IProducerConsumerCollection<Task> sharedWorkQueue;
41 readonly Action<Task> childWorkAdder;
43 // Flag to tell if workerThread is running
46 readonly bool isLocal;
47 readonly int workerLength;
48 readonly int stealingStart;
49 const int maxRetry = 5;
51 const int sleepThreshold = 100000;
53 Action threadInitializer;
55 public ThreadWorker (IScheduler sched, ThreadWorker[] others, IProducerConsumerCollection<Task> sharedWorkQueue,
56 int maxStackSize, ThreadPriority priority)
57 : this (sched, others, sharedWorkQueue, true, maxStackSize, priority)
61 public ThreadWorker (IScheduler sched, ThreadWorker[] others, IProducerConsumerCollection<Task> sharedWorkQueue,
62 bool createThread, int maxStackSize, ThreadPriority priority)
66 this.dDeque = new CyclicDeque<Task> ();
68 this.sharedWorkQueue = sharedWorkQueue;
69 this.workerLength = others.Length;
70 this.isLocal = !createThread;
72 this.childWorkAdder = delegate (Task t) {
73 dDeque.PushBottom (t);
77 // Find the stealing start index randomly (then the traversal
78 // will be done in Round-Robin fashion)
80 this.stealingStart = r.Next(0, workerLength);
81 } while (others[stealingStart] == this);
83 InitializeUnderlyingThread (maxStackSize, priority);
86 void InitializeUnderlyingThread (int maxStackSize, ThreadPriority priority)
88 threadInitializer = delegate {
89 // Special case of the participant ThreadWorker
91 this.workerThread = Thread.CurrentThread;
95 this.workerThread = (maxStackSize == 0) ? new Thread (WorkerMethodWrapper) :
96 new Thread (WorkerMethodWrapper, maxStackSize);
98 this.workerThread.IsBackground = true;
99 this.workerThread.Priority = priority;
100 this.workerThread.Name = "ParallelFxThreadWorker";
102 threadInitializer ();
105 public void Dispose ()
108 if (!isLocal && workerThread.ThreadState != ThreadState.Stopped)
109 workerThread.Abort ();
114 // If the thread was stopped then set it in use and restart it
115 int result = Interlocked.Exchange (ref started, 1);
119 if (this.workerThread.ThreadState != ThreadState.Unstarted) {
120 threadInitializer ();
122 workerThread.Start ();
128 // Set the flag to stop so that the while in the thread will stop
129 // doing its infinite loop.
133 // This is the actual method called in the Thread
134 void WorkerMethodWrapper ()
137 SpinWait wait = new SpinWait ();
140 while (started == 1) {
143 result = WorkerMethod ();
145 // Wait a little and if the Thread has been more sleeping than working shut it down
149 if (sleepTime++ > sleepThreshold)
156 // Main method, used to do all the logic of retrieving, processing and stealing work.
160 bool hasStolenFromOther;
162 hasStolenFromOther = false;
166 // We fill up our work deque concurrently with other ThreadWorker
167 while (sharedWorkQueue.Count > 0) {
168 while (sharedWorkQueue.TryTake (out value)) {
169 dDeque.PushBottom (value);
172 // Now we process our work
173 while (dDeque.PopBottom (out value) == PopResult.Succeed) {
175 value.Execute (childWorkAdder);
181 // When we have finished, steal from other worker
184 // Repeat the operation a little so that we can let other things process.
185 for (int j = 0; j < maxRetry; j++) {
186 // Start stealing with the ThreadWorker at our right to minimize contention
187 for (int it = stealingStart; it < stealingStart + workerLength; it++) {
188 int i = it % workerLength;
189 if ((other = others [i]) == null || other == this)
192 // Maybe make this steal more than one item at a time, see TODO.
193 if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
194 hasStolenFromOther = true;
196 value.Execute (childWorkAdder);
202 } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
207 // Almost same as above but with an added predicate and treating one item at a time.
208 // It's used by Scheduler Participate(...) method for special waiting case like
209 // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
210 // Predicate should be really fast and not blocking as it is called a good deal of time
211 // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
212 public static void WorkerMethod (Func<bool> predicate, IProducerConsumerCollection<Task> sharedWorkQueue,
213 ThreadWorker[] others)
215 while (!predicate ()) {
218 // Dequeue only one item as we have restriction
219 if (sharedWorkQueue.TryTake (out value)) {
221 if (CheckTaskFitness (value))
222 value.Execute (null);
224 sharedWorkQueue.TryAdd (value);
228 // First check to see if we comply to predicate
233 // Try to complete other work by stealing since our desired tasks may be in other worker
235 for (int i = 0; i < others.Length; i++) {
236 if ((other = others [i]) == null)
239 if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
241 if (CheckTaskFitness (value))
242 value.Execute (null);
244 sharedWorkQueue.TryAdd (value);
255 static bool CheckTaskFitness (Task t)
257 return (t.CreationOptions | TaskCreationOptions.LongRunning) > 0;
260 public bool Finished {
266 public bool IsLocal {
274 return workerThread.ManagedThreadId;
278 public bool Equals (ThreadWorker other)
280 return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);
283 public override bool Equals (object obj)
285 ThreadWorker temp = obj as ThreadWorker;
286 return temp == null ? false : Equals (temp);
289 public override int GetHashCode ()
291 return workerThread.ManagedThreadId.GetHashCode ();