Few 4.0 profile corcompare changes
[mono.git] / mcs / class / corlib / System.Threading.Tasks / ThreadWorker.cs
1 // ThreadWorker.cs
2 //
3 // Copyright (c) 2008 Jérémie "Garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 #if NET_4_0 || MOBILE
26 using System;
27 using System.Threading;
28 using System.Collections.Concurrent;
29
30 #if INSIDE_MONO_PARALLEL
31 using System.Threading.Tasks;
32 using Watch = System.Diagnostics.Stopwatch;
33
34 namespace Mono.Threading.Tasks
35 #else
36 namespace System.Threading.Tasks
37 #endif
38 {
39 #if INSIDE_MONO_PARALLEL
40         public
41 #endif
42         class ThreadWorker : IDisposable
43         {
44                 Thread workerThread;
45
46                 /* This field is used when a TheadWorker have to call Task.Wait
47                  * which bring him back here with the static WorkerMethod although
48                  * it's more optimized for him to continue calling its own WorkerMethod
49                  */
50                 [ThreadStatic]
51                 static ThreadWorker autoReference;
52                 
53                 readonly IConcurrentDeque<Task> dDeque;
54                 readonly ThreadWorker[]         others;
55                 readonly ManualResetEvent       waitHandle;
56                 readonly IProducerConsumerCollection<Task> sharedWorkQueue;
57                 readonly ThreadPriority         threadPriority;
58
59                 // Flag to tell if workerThread is running
60                 int started = 0; 
61                 
62                 readonly int  workerLength;
63                 readonly int  workerPosition;
64                 const    int  maxRetry = 3;
65                 
66                 const int sleepThreshold = 100;
67                 int deepSleepTime = 8;
68                 readonly Action<Task> adder;
69
70                 Task currentTask;
71                 
72                 public ThreadWorker (ThreadWorker[] others,
73                                      int workerPosition,
74                                      IProducerConsumerCollection<Task> sharedWorkQueue,
75                                      IConcurrentDeque<Task> dDeque,
76                                      ThreadPriority priority,
77                                      ManualResetEvent handle)
78                 {
79                         this.others          = others;
80                         this.dDeque          = dDeque;
81                         this.sharedWorkQueue = sharedWorkQueue;
82                         this.workerLength    = others.Length;
83                         this.workerPosition  = workerPosition;
84                         this.waitHandle      = handle;
85                         this.threadPriority  = priority;
86                         this.adder           = new Action<Task> (ChildWorkAdder);
87
88                         InitializeUnderlyingThread ();
89                 }
90
91 #if INSIDE_MONO_PARALLEL
92                 protected virtual
93 #endif
94                 void InitializeUnderlyingThread ()
95                 {
96                         this.workerThread = new Thread (WorkerMethodWrapper);
97         
98                         this.workerThread.IsBackground = true;
99                         this.workerThread.Priority = threadPriority;
100                         this.workerThread.Name = "ParallelFxThreadWorker";
101                 }
102
103 #if INSIDE_MONO_PARALLEL
104                 virtual
105 #endif
106                 public void Dispose ()
107                 {
108                         Stop ();
109                         if (workerThread.ThreadState != ThreadState.Stopped)
110                                 workerThread.Abort ();
111                 }
112
113 #if INSIDE_MONO_PARALLEL
114                 virtual
115 #endif
116                 public void Pulse ()
117                 {
118                         if (started == 1)
119                                 return;
120
121                         // If the thread was stopped then set it in use and restart it
122                         int result = Interlocked.Exchange (ref started, 1);
123                         if (result != 0)
124                                 return;
125
126                         if (this.workerThread.ThreadState != ThreadState.Unstarted) {
127                                 InitializeUnderlyingThread ();
128                         }
129
130                         workerThread.Start ();
131                 }
132
133 #if INSIDE_MONO_PARALLEL
134                 virtual
135 #endif
136                 public void Stop ()
137                 {
138                         // Set the flag to stop so that the while in the thread will stop
139                         // doing its infinite loop.
140                         started = 0;
141                 }
142                 
143 #if INSIDE_MONO_PARALLEL
144                 protected virtual
145 #endif
146                 // This is the actual method called in the Thread
147                 void WorkerMethodWrapper ()
148                 {
149                         int sleepTime = 0;
150                         autoReference = this;
151                         bool wasWokenUp = false;
152                         
153                         // Main loop
154                         while (started == 1) {
155                                 bool result = false;
156
157                                 result = WorkerMethod ();
158                                 if (!result && wasWokenUp)
159                                         waitHandle.Reset ();
160                                 wasWokenUp = false;
161
162                                 Thread.Yield ();
163
164                                 if (result) {
165                                         deepSleepTime = 8;
166                                         sleepTime = 0;
167                                         continue;
168                                 }
169
170                                 // If we are spinning too much, have a deeper sleep
171                                 if (++sleepTime > sleepThreshold && sharedWorkQueue.Count == 0) {
172                                         wasWokenUp = waitHandle.WaitOne ((deepSleepTime = deepSleepTime >= 0x4000 ? 0x4000 : deepSleepTime << 1));
173                                 }
174                         }
175
176                         started = 0;
177                 }
178
179 #if INSIDE_MONO_PARALLEL
180                 protected virtual
181 #endif
182                 // Main method, used to do all the logic of retrieving, processing and stealing work.
183                 bool WorkerMethod ()
184                 {
185                         bool result = false;
186                         bool hasStolenFromOther;
187
188                         do {
189                                 hasStolenFromOther = false;
190                                 
191                                 Task value;
192                                 
193                                 // We fill up our work deque concurrently with other ThreadWorker
194                                 while (sharedWorkQueue.Count > 0) {
195                                         waitHandle.Set ();
196
197                                         while (sharedWorkQueue.TryTake (out value)) {
198                                                 dDeque.PushBottom (value);
199                                         }
200
201                                         // Now we process our work
202                                         while (dDeque.PopBottom (out value) == PopResult.Succeed) {
203                                                 waitHandle.Set ();
204                                                 ExecuteTask (value, ref result);
205                                         }
206                                 }
207
208                                 // When we have finished, steal from other worker
209                                 ThreadWorker other;
210                                 
211                                 // Repeat the operation a little so that we can let other things process.
212                                 for (int j = 0; j < maxRetry; ++j) {
213                                         int len = workerLength + workerPosition;
214                                         // Start stealing with the ThreadWorker at our right to minimize contention
215                                         for (int it = workerPosition + 1; it < len; ++it) {
216                                                 int i = it % workerLength;
217                                                 if ((other = others [i]) == null || other == this)
218                                                         continue;
219                                                 
220                                                 // Maybe make this steal more than one item at a time, see TODO.
221                                                 while (other.dDeque.PopTop (out value) == PopResult.Succeed) {
222                                                         if (!hasStolenFromOther)
223                                                                 waitHandle.Set ();
224
225                                                         hasStolenFromOther = true;
226                                                         ExecuteTask (value, ref result);
227                                                 }
228                                         }
229                                 }
230                         } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
231                         
232                         return result;
233                 }
234
235                 void ExecuteTask (Task value, ref bool result)
236                 {
237                         if (value == null)
238                                 return;
239
240                         var saveCurrent = currentTask;
241                         currentTask = value;
242                         value.Execute (adder);
243                         result = true;
244                         currentTask = saveCurrent;
245                 }
246
247 #if !INSIDE_MONO_PARALLEL
248                 // Almost same as above but with an added predicate and treating one item at a time. 
249                 // It's used by Scheduler Participate(...) method for special waiting case like
250                 // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
251                 // Predicate should be really fast and not blocking as it is called a good deal of time
252                 // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
253                 public static void ParticipativeWorkerMethod (Task self,
254                                                               ManualResetEventSlim predicateEvt,
255                                                               int millisecondsTimeout,
256                                                               IProducerConsumerCollection<Task> sharedWorkQueue,
257                                                               ThreadWorker[] others,
258                                                               ManualResetEvent evt)
259                 {
260                         const int stage1 = 5, stage2 = 0;
261                         int tries = 50;
262                         WaitHandle[] handles = null;
263                         Watch watch = Watch.StartNew ();
264                         if (millisecondsTimeout == -1)
265                                 millisecondsTimeout = int.MaxValue;
266                         bool aggressive = false;
267                         bool hasAutoReference = autoReference != null;
268                         Action<Task> adder = null;
269
270                         while (!predicateEvt.IsSet && watch.ElapsedMilliseconds < millisecondsTimeout && !self.IsCompleted) {
271                                 // We try to execute the self task as it may be the simplest way to unlock
272                                 // the situation
273                                 if (self.Status == TaskStatus.WaitingToRun) {
274                                         self.Execute (hasAutoReference ? autoReference.adder : (Action<Task>)null);
275                                         if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
276                                                 return;
277                                 }
278
279                                 Task value;
280                                 
281                                 // If we are in fact a normal ThreadWorker, use our own deque
282                                 if (hasAutoReference) {
283                                         var enumerable = autoReference.dDeque.GetEnumerable ();
284                                         if (adder == null)
285                                                 adder = hasAutoReference ? autoReference.adder : (Action<Task>)null;
286
287                                         if (enumerable != null) {
288                                                 foreach (var t in enumerable) {
289                                                         if (t == null)
290                                                                 continue;
291
292                                                         if (CheckTaskFitness (self, t))
293                                                                 t.Execute (adder);
294
295                                                         if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
296                                                                 return;
297                                                 }
298                                         }
299                                 }
300
301                                 int count = sharedWorkQueue.Count;
302
303                                 // Dequeue only one item as we have restriction
304                                 while (--count >= 0 && sharedWorkQueue.TryTake (out value) && value != null) {
305                                         evt.Set ();
306                                         if (CheckTaskFitness (self, value) || aggressive)
307                                                 value.Execute (null);
308                                         else {
309                                                 if (autoReference == null)
310                                                         sharedWorkQueue.TryAdd (value);
311                                                 else
312                                                         autoReference.dDeque.PushBottom (value);
313                                                 evt.Set ();
314                                         }
315
316                                         if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
317                                                 return;
318                                 }
319
320                                 // First check to see if we comply to predicate
321                                 if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
322                                         return;
323                                 
324                                 // Try to complete other work by stealing since our desired tasks may be in other worker
325                                 ThreadWorker other;
326                                 for (int i = 0; i < others.Length; i++) {
327                                         if ((other = others [i]) == autoReference || other == null)
328                                                 continue;
329
330                                         if (other.dDeque.PopTop (out value) == PopResult.Succeed && value != null) {
331                                                 evt.Set ();
332                                                 if (CheckTaskFitness (self, value) || aggressive)
333                                                         value.Execute (null);
334                                                 else {
335                                                         if (autoReference == null)
336                                                                 sharedWorkQueue.TryAdd (value);
337                                                         else
338                                                                 autoReference.dDeque.PushBottom (value);
339                                                         evt.Set ();
340                                                 }
341                                         }
342
343                                         if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
344                                                 return;
345                                 }
346
347                                 /* Waiting is split in 4 phases
348                                  *   - until stage 1 we simply yield the thread to let others add data
349                                  *   - between stage 1 and stage2 we use ManualResetEventSlim light waiting mechanism
350                                  *   - after stage2 we fall back to the heavier WaitHandle waiting mechanism
351                                  *   - if really the situation isn't evolving after a couple of sleep, we disable
352                                  *     task fitness check altogether
353                                  */
354                                 if (--tries > stage1)
355                                         Thread.Yield ();
356                                 else if (tries >= stage2)
357                                         predicateEvt.Wait (ComputeTimeout (5, millisecondsTimeout, watch));
358                                 else {
359                                         if (tries == stage2 - 1)
360                                                 handles = new [] { predicateEvt.WaitHandle, evt };
361                                         WaitHandle.WaitAny (handles, ComputeTimeout (1000, millisecondsTimeout, watch));
362                                         if (tries == stage2 - 10)
363                                                 aggressive = true;
364                                 }
365                         }
366                 }
367
368                 static bool CheckTaskFitness (Task self, Task t)
369                 {
370                         return ((t.CreationOptions & TaskCreationOptions.LongRunning) == 0 && t.Id < self.Id)
371                                 || t.Parent == self
372                                 || t == self
373                                 || (autoReference != null && autoReference.currentTask != null && autoReference.currentTask == t.Parent);
374                 }
375 #else
376                 public static ThreadWorker AutoReference {
377                         get {
378                                 return autoReference;
379                         }
380                         set {
381                                 autoReference = value;
382                         }
383                 }
384
385                 protected IConcurrentDeque<Task> Deque {
386                         get {
387                                 return dDeque;
388                         }
389                 }
390
391                 protected ThreadWorker[] Others {
392                         get {
393                                 return others;
394                         }
395                 }
396
397                 protected ManualResetEvent WaitHandle {
398                         get {
399                                 return waitHandle;
400                         }
401                 }
402
403                 protected ThreadPriority Priority {
404                         get {
405                                 return threadPriority;
406                         }
407                 }
408
409                 protected int WorkerPosition {
410                         get {
411                                 return workerPosition;
412                         }
413                 }
414 #endif
415
416 #if INSIDE_MONO_PARALLEL
417                 protected virtual
418 #endif
419                 internal void ChildWorkAdder (Task t)
420                 {
421                         dDeque.PushBottom (t);
422                         waitHandle.Set ();
423                 }
424
425                 static int ComputeTimeout (int proposed, int timeout, Watch watch)
426                 {
427                         return timeout == int.MaxValue ? proposed : System.Math.Min (proposed, System.Math.Max (0, (int)(timeout - watch.ElapsedMilliseconds)));
428                 }
429                 
430                 public bool Finished {
431                         get {
432                                 return started == 0;
433                         }
434                 }
435
436                 public int Id {
437                         get {
438                                 return workerThread.ManagedThreadId;
439                         }
440                 }
441                 
442                 public virtual bool Equals (ThreadWorker other)
443                 {
444                         return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);    
445                 }
446                 
447                 public override bool Equals (object obj)
448                 {
449                         ThreadWorker temp = obj as ThreadWorker;
450                         return temp == null ? false : Equals (temp);
451                 }
452                 
453                 public override int GetHashCode ()
454                 {
455                         return workerThread.ManagedThreadId.GetHashCode ();
456                 }
457         }
458 }
459 #endif