Merge branch 'master' of github.com:tgiphil/mono
[mono.git] / mcs / class / corlib / System.Threading.Tasks / Internal / ThreadWorker.cs
1 #if NET_4_0 || BOOTSTRAP_NET_4_0
2 // ThreadWorker.cs
3 //
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23 //
24 //
25
26 using System;
27 using System.Threading;
28 using System.Collections.Concurrent;
29
30 namespace System.Threading.Tasks
31 {
32         internal class ThreadWorker : IDisposable
33         {
34                 static Random r = new Random ();
35                 
36                 Thread workerThread;
37                 
38                 readonly          ThreadWorker[]        others;
39                 internal readonly IDequeOperations<Task>    dDeque;
40                 readonly          IProducerConsumerCollection<Task> sharedWorkQueue;
41                 readonly          Action<Task>          childWorkAdder;
42                 
43                 // Flag to tell if workerThread is running
44                 int started = 0; 
45                 
46                 readonly bool isLocal;
47                 readonly int  workerLength;
48                 readonly int  stealingStart;
49                 const    int  maxRetry = 5;
50                 
51                 const int sleepThreshold = 100000;
52                 
53                 Action threadInitializer;
54                 
55                 public ThreadWorker (IScheduler sched, ThreadWorker[] others, IProducerConsumerCollection<Task> sharedWorkQueue,
56                                      int maxStackSize, ThreadPriority priority)
57                         : this (sched, others, sharedWorkQueue, true, maxStackSize, priority)
58                 {
59                 }
60                 
61                 public ThreadWorker (IScheduler sched, ThreadWorker[] others, IProducerConsumerCollection<Task> sharedWorkQueue,
62                                      bool createThread, int maxStackSize, ThreadPriority priority)
63                 {
64                         this.others          = others;
65
66                         this.dDeque = new CyclicDeque<Task> ();
67                         
68                         this.sharedWorkQueue = sharedWorkQueue;
69                         this.workerLength    = others.Length;
70                         this.isLocal         = !createThread;
71                         
72                         this.childWorkAdder = delegate (Task t) { 
73                                 dDeque.PushBottom (t);
74                                 sched.PulseAll ();
75                         };
76                         
77                         // Find the stealing start index randomly (then the traversal
78                         // will be done in Round-Robin fashion)
79                         do {
80                                 this.stealingStart = r.Next(0, workerLength);
81                         } while (others[stealingStart] == this);
82                         
83                         InitializeUnderlyingThread (maxStackSize, priority);
84                 }
85                 
86                 void InitializeUnderlyingThread (int maxStackSize, ThreadPriority priority)
87                 {
88                         threadInitializer = delegate {
89                                 // Special case of the participant ThreadWorker
90                                 if (isLocal) {                  
91                                         this.workerThread = Thread.CurrentThread;
92                                         return;
93                                 }
94                                 
95                                 this.workerThread = (maxStackSize == 0) ? new Thread (WorkerMethodWrapper) :
96                                         new Thread (WorkerMethodWrapper, maxStackSize);
97         
98                                 this.workerThread.IsBackground = true;
99                                 this.workerThread.Priority = priority;
100                                 this.workerThread.Name = "ParallelFxThreadWorker";
101                         };
102                         threadInitializer ();
103                 }
104
105                 public void Dispose ()
106                 {
107                         Stop ();
108                         if (!isLocal && workerThread.ThreadState != ThreadState.Stopped)
109                                 workerThread.Abort ();
110                 }
111                 
112                 public void Pulse ()
113                 {
114                         // If the thread was stopped then set it in use and restart it
115                         int result = Interlocked.Exchange (ref started, 1);
116                         if (result != 0)
117                                 return;
118                         if (!isLocal) {
119                                 if (this.workerThread.ThreadState != ThreadState.Unstarted) {
120                                         threadInitializer ();
121                                 }
122                                 workerThread.Start ();
123                         }
124                 }
125                 
126                 public void Stop ()
127                 {
128                         // Set the flag to stop so that the while in the thread will stop
129                         // doing its infinite loop.
130                         started = 0;
131                 }
132                 
133                 // This is the actual method called in the Thread
134                 void WorkerMethodWrapper ()
135                 {
136                         int sleepTime = 0;
137                         SpinWait wait = new SpinWait ();
138                         
139                         // Main loop
140                         while (started == 1) {
141                                 bool result = false;
142
143                                 result = WorkerMethod ();
144                                 
145                                 // Wait a little and if the Thread has been more sleeping than working shut it down
146                                 wait.SpinOnce ();
147                                 if (result)
148                                         sleepTime = 0;
149                                 if (sleepTime++ > sleepThreshold) 
150                                         break;
151                         }
152
153                         started = 0;
154                 }
155                 
156                 // Main method, used to do all the logic of retrieving, processing and stealing work.
157                 bool WorkerMethod ()
158                 {               
159                         bool result = false;
160                         bool hasStolenFromOther;
161                         do {
162                                 hasStolenFromOther = false;
163                                 
164                                 Task value;
165                                 
166                                 // We fill up our work deque concurrently with other ThreadWorker
167                                 while (sharedWorkQueue.Count > 0) {
168                                         while (sharedWorkQueue.TryTake (out value)) {
169                                                 dDeque.PushBottom (value);
170                                         }
171                                         
172                                         // Now we process our work
173                                         while (dDeque.PopBottom (out value) == PopResult.Succeed) {
174                                                 if (value != null) {
175                                                         value.Execute (childWorkAdder);
176                                                         result = true;
177                                                 }
178                                         }
179                                 }
180                                 
181                                 // When we have finished, steal from other worker
182                                 ThreadWorker other;
183                                 
184                                 // Repeat the operation a little so that we can let other things process.
185                                 for (int j = 0; j < maxRetry; j++) {
186                                         // Start stealing with the ThreadWorker at our right to minimize contention
187                                         for (int it = stealingStart; it < stealingStart + workerLength; it++) {
188                                                 int i = it % workerLength;
189                                                 if ((other = others [i]) == null || other == this)
190                                                         continue;
191                                                 
192                                                 // Maybe make this steal more than one item at a time, see TODO.
193                                                 if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
194                                                         hasStolenFromOther = true;
195                                                         if (value != null) {
196                                                                 value.Execute (childWorkAdder);
197                                                                 result = true;
198                                                         }
199                                                 }
200                                         }
201                                 }
202                         } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
203                         
204                         return result;
205                 }
206                 
207                 // Almost same as above but with an added predicate and treating one item at a time. 
208                 // It's used by Scheduler Participate(...) method for special waiting case like
209                 // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
210                 // Predicate should be really fast and not blocking as it is called a good deal of time
211                 // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
212                 public static void WorkerMethod (Func<bool> predicate, IProducerConsumerCollection<Task> sharedWorkQueue,
213                                                  ThreadWorker[] others)
214                 {
215                         while (!predicate ()) {
216                                 Task value;
217                                 
218                                 // Dequeue only one item as we have restriction
219                                 if (sharedWorkQueue.TryTake (out value)) {
220                                         if (value != null) {
221                                                 if (CheckTaskFitness (value))
222                                                         value.Execute (null);
223                                                 else
224                                                         sharedWorkQueue.TryAdd (value);
225                                         }
226                                 }
227                                 
228                                 // First check to see if we comply to predicate
229                                 if (predicate ()) {
230                                         return;
231                                 }
232                                 
233                                 // Try to complete other work by stealing since our desired tasks may be in other worker
234                                 ThreadWorker other;
235                                 for (int i = 0; i < others.Length; i++) {
236                                         if ((other = others [i]) == null)
237                                                 continue;
238                                         
239                                         if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
240                                                 if (value != null) {
241                                                         if (CheckTaskFitness (value))
242                                                                 value.Execute (null);
243                                                         else
244                                                                 sharedWorkQueue.TryAdd (value);
245                                                 }
246                                         }
247                                         
248                                         if (predicate ()) {
249                                                 return;
250                                         }
251                                 }
252                         }
253                 }
254                 
255                 static bool CheckTaskFitness (Task t)
256                 {
257                         return (t.CreationOptions | TaskCreationOptions.LongRunning) > 0;
258                 }
259                 
260                 public bool Finished {
261                         get {
262                                 return started == 0;
263                         }
264                 }
265                 
266                 public bool IsLocal {
267                         get {
268                                 return isLocal;
269                         }
270                 }
271                 
272                 public int Id {
273                         get {
274                                 return workerThread.ManagedThreadId;
275                         }
276                 }
277                 
278                 public bool Equals (ThreadWorker other)
279                 {
280                         return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);    
281                 }
282                 
283                 public override bool Equals (object obj)
284                 {
285                         ThreadWorker temp = obj as ThreadWorker;
286                         return temp == null ? false : Equals (temp);
287                 }
288                 
289                 public override int GetHashCode ()
290                 {
291                         return workerThread.ManagedThreadId.GetHashCode ();
292                 }
293         }
294 }
295 #endif