Merge pull request #1074 from esdrubal/bug18421
[mono.git] / mcs / class / Mono.Parallel / Mono.Threading.Tasks / ThreadWorker.cs
1 // ThreadWorker.cs
2 //
3 // Copyright (c) 2008 Jérémie "Garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 #if NET_4_0
26 using System;
27 using System.Threading;
28 using System.Collections.Concurrent;
29 using System.Threading.Tasks;
30 using Watch = System.Diagnostics.Stopwatch;
31
32 namespace Mono.Threading.Tasks
33 {
34         public class ThreadWorker : IDisposable
35         {
36                 Thread workerThread;
37
38                 /* This field is used when a TheadWorker have to call Task.Wait
39                  * which bring him back here with the static WorkerMethod although
40                  * it's more optimized for him to continue calling its own WorkerMethod
41                  */
42                 [ThreadStatic]
43                 static ThreadWorker autoReference;
44                 
45                 readonly IConcurrentDeque<Task> dDeque;
46                 readonly ThreadWorker[]         others;
47                 readonly ManualResetEvent       waitHandle;
48                 readonly IProducerConsumerCollection<Task> sharedWorkQueue;
49                 readonly ThreadPriority         threadPriority;
50
51                 // Flag to tell if workerThread is running
52                 int started = 0; 
53                 
54                 readonly int  workerLength;
55                 readonly int  workerPosition;
56                 const    int  maxRetry = 3;
57                 
58                 const int sleepThreshold = 100;
59                 int deepSleepTime = 8;
60                 readonly Action<Task> adder;
61
62                 Task currentTask;
63                 
64                 public ThreadWorker (ThreadWorker[] others,
65                                      int workerPosition,
66                                      IProducerConsumerCollection<Task> sharedWorkQueue,
67                                      IConcurrentDeque<Task> dDeque,
68                                      ThreadPriority priority,
69                                      ManualResetEvent handle)
70                 {
71                         this.others          = others;
72                         this.dDeque          = dDeque;
73                         this.sharedWorkQueue = sharedWorkQueue;
74                         this.workerLength    = others.Length;
75                         this.workerPosition  = workerPosition;
76                         this.waitHandle      = handle;
77                         this.threadPriority  = priority;
78                         this.adder           = new Action<Task> (ChildWorkAdder);
79
80                         InitializeUnderlyingThread ();
81                 }
82
83                 protected virtual void InitializeUnderlyingThread ()
84                 {
85                         this.workerThread = new Thread (WorkerMethodWrapper);
86         
87                         this.workerThread.IsBackground = true;
88                         this.workerThread.Priority = threadPriority;
89                         this.workerThread.Name = "ParallelFxThreadWorker";
90                 }
91
92                 public virtual void Dispose ()
93                 {
94                         Stop ();
95                         if (workerThread.ThreadState != ThreadState.Stopped)
96                                 workerThread.Abort ();
97                 }
98
99                 public virtual void Pulse ()
100                 {
101                         if (started == 1)
102                                 return;
103
104                         // If the thread was stopped then set it in use and restart it
105                         int result = Interlocked.Exchange (ref started, 1);
106                         if (result != 0)
107                                 return;
108
109                         if (this.workerThread.ThreadState != ThreadState.Unstarted) {
110                                 InitializeUnderlyingThread ();
111                         }
112
113                         workerThread.Start ();
114                 }
115
116                 public virtual void Stop ()
117                 {
118                         // Set the flag to stop so that the while in the thread will stop
119                         // doing its infinite loop.
120                         started = 0;
121                 }
122
123                 // This is the actual method called in the Thread
124                 protected virtual void WorkerMethodWrapper ()
125                 {
126                         int sleepTime = 0;
127                         autoReference = this;
128                         bool wasWokenUp = false;
129                         
130                         // Main loop
131                         while (started == 1) {
132                                 bool result = false;
133
134                                 result = WorkerMethod ();
135                                 if (!result && wasWokenUp)
136                                         waitHandle.Reset ();
137                                 wasWokenUp = false;
138
139                                 Thread.Yield ();
140
141                                 if (result) {
142                                         deepSleepTime = 8;
143                                         sleepTime = 0;
144                                         continue;
145                                 }
146
147                                 // If we are spinning too much, have a deeper sleep
148                                 if (++sleepTime > sleepThreshold && sharedWorkQueue.Count == 0) {
149                                         wasWokenUp = waitHandle.WaitOne ((deepSleepTime = deepSleepTime >= 0x4000 ? 0x4000 : deepSleepTime << 1));
150                                 }
151                         }
152
153                         started = 0;
154                 }
155
156                 // Main method, used to do all the logic of retrieving, processing and stealing work.
157                 protected virtual bool WorkerMethod ()
158                 {
159                         bool result = false;
160                         bool hasStolenFromOther;
161
162                         do {
163                                 hasStolenFromOther = false;
164                                 
165                                 Task value;
166                                 
167                                 // We fill up our work deque concurrently with other ThreadWorker
168                                 while (sharedWorkQueue.Count > 0) {
169                                         waitHandle.Set ();
170
171                                         while (sharedWorkQueue.TryTake (out value)) {
172                                                 dDeque.PushBottom (value);
173                                         }
174
175                                         // Now we process our work
176                                         while (dDeque.PopBottom (out value) == PopResult.Succeed) {
177                                                 waitHandle.Set ();
178                                                 ExecuteTask (value, ref result);
179                                         }
180                                 }
181
182                                 // When we have finished, steal from other worker
183                                 ThreadWorker other;
184                                 
185                                 // Repeat the operation a little so that we can let other things process.
186                                 for (int j = 0; j < maxRetry; ++j) {
187                                         int len = workerLength + workerPosition;
188                                         // Start stealing with the ThreadWorker at our right to minimize contention
189                                         for (int it = workerPosition + 1; it < len; ++it) {
190                                                 int i = it % workerLength;
191                                                 if ((other = others [i]) == null || other == this)
192                                                         continue;
193                                                 
194                                                 // Maybe make this steal more than one item at a time, see TODO.
195                                                 while (other.dDeque.PopTop (out value) == PopResult.Succeed) {
196                                                         if (!hasStolenFromOther)
197                                                                 waitHandle.Set ();
198
199                                                         hasStolenFromOther = true;
200                                                         ExecuteTask (value, ref result);
201                                                 }
202                                         }
203                                 }
204                         } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
205                         
206                         return result;
207                 }
208
209                 void ExecuteTask (Task value, ref bool result)
210                 {
211                         if (value == null)
212                                 return;
213
214                         var saveCurrent = currentTask;
215                         currentTask = value;
216                         value.Execute (adder);
217                         result = true;
218                         currentTask = saveCurrent;
219                 }
220
221                 // Almost same as above but with an added predicate and treating one item at a time. 
222                 // It's used by Scheduler Participate(...) method for special waiting case like
223                 // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
224                 // Predicate should be really fast and not blocking as it is called a good deal of time
225                 // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
226                 public static void ParticipativeWorkerMethod (Task self,
227                                                               ManualResetEventSlim predicateEvt,
228                                                               int millisecondsTimeout,
229                                                               IProducerConsumerCollection<Task> sharedWorkQueue,
230                                                               ThreadWorker[] others,
231                                                               ManualResetEvent evt,
232                                                               Func<Task, Task, bool> checkTaskFitness)
233                 {
234                         const int stage1 = 5, stage2 = 0;
235                         int tries = 50;
236                         WaitHandle[] handles = null;
237                         Watch watch = Watch.StartNew ();
238                         if (millisecondsTimeout == -1)
239                                 millisecondsTimeout = int.MaxValue;
240                         bool aggressive = false;
241                         bool hasAutoReference = autoReference != null;
242                         Action<Task> adder = null;
243
244                         while (!predicateEvt.IsSet && watch.ElapsedMilliseconds < millisecondsTimeout && !self.IsCompleted) {
245                                 // We try to execute the self task as it may be the simplest way to unlock
246                                 // the situation
247                                 if (self.Status == TaskStatus.WaitingToRun) {
248                                         self.Execute (hasAutoReference ? autoReference.adder : (Action<Task>)null);
249                                         if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
250                                                 return;
251                                 }
252
253                                 Task value;
254                                 
255                                 // If we are in fact a normal ThreadWorker, use our own deque
256                                 if (hasAutoReference) {
257                                         var enumerable = autoReference.dDeque.GetEnumerable ();
258                                         if (adder == null)
259                                                 adder = hasAutoReference ? autoReference.adder : (Action<Task>)null;
260
261                                         if (enumerable != null) {
262                                                 foreach (var t in enumerable) {
263                                                         if (t == null)
264                                                                 continue;
265
266                                                         if (checkTaskFitness (self, t))
267                                                                 t.Execute (adder);
268
269                                                         if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
270                                                                 return;
271                                                 }
272                                         }
273                                 }
274
275                                 int count = sharedWorkQueue.Count;
276
277                                 // Dequeue only one item as we have restriction
278                                 while (--count >= 0 && sharedWorkQueue.TryTake (out value) && value != null) {
279                                         evt.Set ();
280                                         if (checkTaskFitness (self, value) || aggressive)
281                                                 value.Execute (null);
282                                         else {
283                                                 if (autoReference == null)
284                                                         sharedWorkQueue.TryAdd (value);
285                                                 else
286                                                         autoReference.dDeque.PushBottom (value);
287                                                 evt.Set ();
288                                         }
289
290                                         if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
291                                                 return;
292                                 }
293
294                                 // First check to see if we comply to predicate
295                                 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
296                                         return;
297                                 
298                                 // Try to complete other work by stealing since our desired tasks may be in other worker
299                                 ThreadWorker other;
300                                 for (int i = 0; i < others.Length; i++) {
301                                         if ((other = others [i]) == autoReference || other == null)
302                                                 continue;
303
304                                         if (other.dDeque.PopTop (out value) == PopResult.Succeed && value != null) {
305                                                 evt.Set ();
306                                                 if (checkTaskFitness (self, value) || aggressive)
307                                                         value.Execute (null);
308                                                 else {
309                                                         if (autoReference == null)
310                                                                 sharedWorkQueue.TryAdd (value);
311                                                         else
312                                                                 autoReference.dDeque.PushBottom (value);
313                                                         evt.Set ();
314                                                 }
315                                         }
316
317                                         if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
318                                                 return;
319                                 }
320
321                                 /* Waiting is split in 4 phases
322                                  *   - until stage 1 we simply yield the thread to let others add data
323                                  *   - between stage 1 and stage2 we use ManualResetEventSlim light waiting mechanism
324                                  *   - after stage2 we fall back to the heavier WaitHandle waiting mechanism
325                                  *   - if really the situation isn't evolving after a couple of sleep, we disable
326                                  *     task fitness check altogether
327                                  */
328                                 if (--tries > stage1)
329                                         Thread.Yield ();
330                                 else if (tries >= stage2)
331                                         predicateEvt.Wait (ComputeTimeout (5, millisecondsTimeout, watch));
332                                 else {
333                                         if (tries == stage2 - 1)
334                                                 handles = new [] { predicateEvt.WaitHandle, evt };
335                                         System.Threading.WaitHandle.WaitAny (handles, ComputeTimeout (1000, millisecondsTimeout, watch));
336                                         if (tries == stage2 - 10)
337                                                 aggressive = true;
338                                 }
339                         }
340                 }
341
342                 public static ThreadWorker AutoReference {
343                         get {
344                                 return autoReference;
345                         }
346                         set {
347                                 autoReference = value;
348                         }
349                 }
350
351                 protected IConcurrentDeque<Task> Deque {
352                         get {
353                                 return dDeque;
354                         }
355                 }
356
357                 protected ThreadWorker[] Others {
358                         get {
359                                 return others;
360                         }
361                 }
362
363                 protected ManualResetEvent WaitHandle {
364                         get {
365                                 return waitHandle;
366                         }
367                 }
368
369                 protected ThreadPriority Priority {
370                         get {
371                                 return threadPriority;
372                         }
373                 }
374
375                 protected int WorkerPosition {
376                         get {
377                                 return workerPosition;
378                         }
379                 }
380
381                 protected virtual void ChildWorkAdder (Task t)
382                 {
383                         dDeque.PushBottom (t);
384                         waitHandle.Set ();
385                 }
386
387                 static int ComputeTimeout (int proposed, int timeout, Watch watch)
388                 {
389                         return timeout == int.MaxValue ? proposed : System.Math.Min (proposed, System.Math.Max (0, (int)(timeout - watch.ElapsedMilliseconds)));
390                 }
391                 
392                 public bool Finished {
393                         get {
394                                 return started == 0;
395                         }
396                 }
397
398                 public int Id {
399                         get {
400                                 return workerThread.ManagedThreadId;
401                         }
402                 }
403                 
404                 public virtual bool Equals (ThreadWorker other)
405                 {
406                         return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);    
407                 }
408                 
409                 public override bool Equals (object obj)
410                 {
411                         ThreadWorker temp = obj as ThreadWorker;
412                         return temp == null ? false : Equals (temp);
413                 }
414                 
415                 public override int GetHashCode ()
416                 {
417                         return workerThread.ManagedThreadId.GetHashCode ();
418                 }
419         }
420 }
421 #endif