2010-03-12 Jb Evain <jbevain@novell.com>
[mono.git] / mcs / class / corlib / System.Threading.Tasks / Internal / ThreadWorker.cs
1 #if NET_4_0
2 // ThreadWorker.cs
3 //
4 // Copyright (c) 2008 Jérémie "Garuma" Laval
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23 //
24 //
25
26 using System;
27 using System.Threading;
28 using System.Collections.Concurrent;
29
30 namespace System.Threading.Tasks
31 {
32         internal class ThreadWorker : IDisposable
33         {
34                 static Random r = new Random ();
35                 
36                 Thread workerThread;
37                 
38                 readonly          ThreadWorker[]        others;
39                 internal readonly IDequeOperations<Task>    dDeque;
40                 readonly          IProducerConsumerCollection<Task> sharedWorkQueue;
41                 readonly          Action<Task>          childWorkAdder;
42                 
43                 // Flag to tell if workerThread is running
44                 int started = 0; 
45                 
46                 readonly bool isLocal;
47                 readonly int  workerLength;
48                 readonly int  stealingStart;
49                 const    int  maxRetry = 5;
50                 
51                 #region Sleep related fields
52                 readonly SpinWait wait = new SpinWait ();
53                 const int sleepThreshold = 100000;
54                 #endregion
55                 
56                 Action threadInitializer;
57                 
58                 public ThreadWorker (IScheduler sched, ThreadWorker[] others, IProducerConsumerCollection<Task> sharedWorkQueue,
59                                      int maxStackSize, ThreadPriority priority)
60                         : this (sched, others, sharedWorkQueue, true, maxStackSize, priority)
61                 {
62                 }
63                 
64                 public ThreadWorker (IScheduler sched, ThreadWorker[] others, IProducerConsumerCollection<Task> sharedWorkQueue,
65                                      bool createThread, int maxStackSize, ThreadPriority priority)
66                 {
67                         this.others          = others;
68
69                         this.dDeque = new CyclicDeque<Task> ();
70                         
71                         this.sharedWorkQueue = sharedWorkQueue;
72                         this.workerLength    = others.Length;
73                         this.isLocal         = !createThread;
74                         
75                         this.childWorkAdder = delegate (Task t) { 
76                                 dDeque.PushBottom (t);
77                                 sched.PulseAll ();
78                         };
79                         
80                         // Find the stealing start index randomly (then the traversal
81                         // will be done in Round-Robin fashion)
82                         do {
83                                 this.stealingStart = r.Next(0, workerLength);
84                         } while (others[stealingStart] == this);
85                         
86                         InitializeUnderlyingThread (maxStackSize, priority);
87                 }
88                 
89                 void InitializeUnderlyingThread (int maxStackSize, ThreadPriority priority)
90                 {
91                         threadInitializer = delegate {
92                                 // Special case of the participant ThreadWorker
93                                 if (isLocal) {                  
94                                         this.workerThread = Thread.CurrentThread;
95                                         return;
96                                 }
97                                 
98                                 this.workerThread = (maxStackSize == 0) ? new Thread (WorkerMethodWrapper) :
99                                         new Thread (WorkerMethodWrapper, maxStackSize);
100         
101                                 this.workerThread.IsBackground = true;
102                                 this.workerThread.Priority = priority;
103                         };
104                         threadInitializer ();
105                 }
106
107                 public void Dispose ()
108                 {
109                         Stop ();
110                         if (!isLocal && workerThread.ThreadState != ThreadState.Stopped)
111                                 workerThread.Abort ();
112                 }
113                 
114                 public void Pulse ()
115                 {
116                         // If the thread was stopped then set it in use and restart it
117                         int result = Interlocked.Exchange (ref started, 1);
118                         if (result != 0)
119                                 return;
120                         if (!isLocal) {
121                                 if (this.workerThread.ThreadState != ThreadState.Unstarted) {
122                                         threadInitializer ();
123                                 }
124                                 workerThread.Start ();
125                         }
126                 }
127                 
128                 public void Stop ()
129                 {
130                         // Set the flag to stop so that the while in the thread will stop
131                         // doing its infinite loop.
132                         started = 0;
133                 }
134                 
135                 // This is the actual method called in the Thread
136                 void WorkerMethodWrapper ()
137                 {
138                         int sleepTime = 0;
139                         
140                         // Main loop
141                         while (started == 1) {
142                                 bool result = false;
143                                 try {
144                                         result = WorkerMethod ();
145                                 } catch (Exception e) {
146                                         Console.WriteLine (e.ToString ());
147                                 }
148                                 
149                                 // Wait a little and if the Thread has been more sleeping than working shut it down
150                                 wait.SpinOnce ();
151                                 if (result)
152                                         sleepTime = 0;
153                                 if (sleepTime++ > sleepThreshold) 
154                                         break;
155                         }
156
157                         started = 0;
158                 }
159                 
160                 // Main method, used to do all the logic of retrieving, processing and stealing work.
161                 bool WorkerMethod ()
162                 {               
163                         bool result = false;
164                         bool hasStolenFromOther;
165                         do {
166                                 hasStolenFromOther = false;
167                                 
168                                 Task value;
169                                 
170                                 // We fill up our work deque concurrently with other ThreadWorker
171                                 while (sharedWorkQueue.Count > 0) {
172                                         while (sharedWorkQueue.TryTake (out value)) {
173                                                 dDeque.PushBottom (value);
174                                         }
175                                         
176                                         // Now we process our work
177                                         while (dDeque.PopBottom (out value) == PopResult.Succeed) {
178                                                 if (value != null) {
179                                                         value.Execute (childWorkAdder);
180                                                         result = true;
181                                                 }
182                                         }
183                                 }
184                                 
185                                 // When we have finished, steal from other worker
186                                 ThreadWorker other;
187                                 
188                                 // Repeat the operation a little so that we can let other things process.
189                                 for (int j = 0; j < maxRetry; j++) {
190                                         // Start stealing with the ThreadWorker at our right to minimize contention
191                                         for (int it = stealingStart; it < stealingStart + workerLength; it++) {
192                                                 int i = it % workerLength;
193                                                 if ((other = others [i]) == null || other == this)
194                                                         continue;
195                                                 
196                                                 // Maybe make this steal more than one item at a time, see TODO.
197                                                 if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
198                                                         hasStolenFromOther = true;
199                                                         if (value != null) {
200                                                                 value.Execute (childWorkAdder);
201                                                                 result = true;
202                                                         }
203                                                 }
204                                         }
205                                 }
206                         } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
207                         
208                         return result;
209                 }
210                 
211                 // Almost same as above but with an added predicate and treating one item at a time. 
212                 // It's used by Scheduler Participate(...) method for special waiting case like
213                 // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
214                 // Predicate should be really fast and not blocking as it is called a good deal of time
215                 // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
216                 public static void WorkerMethod (Func<bool> predicate, IProducerConsumerCollection<Task> sharedWorkQueue,
217                                                  ThreadWorker[] others)
218                 {
219                         while (!predicate ()) {
220                                 Task value;
221                                 
222                                 // Dequeue only one item as we have restriction
223                                 if (sharedWorkQueue.TryTake (out value)) {
224                                         if (value != null) {
225                                                 if (CheckTaskFitness (value))
226                                                         value.Execute (null);
227                                                 else
228                                                         sharedWorkQueue.TryAdd (value);
229                                         }
230                                 }
231                                 
232                                 // First check to see if we comply to predicate
233                                 if (predicate ()) {
234                                         return;
235                                 }
236                                 
237                                 // Try to complete other work by stealing since our desired tasks may be in other worker
238                                 ThreadWorker other;
239                                 for (int i = 0; i < others.Length; i++) {
240                                         if ((other = others [i]) == null)
241                                                 continue;
242                                         
243                                         if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
244                                                 if (value != null) {
245                                                         if (CheckTaskFitness (value))
246                                                                 value.Execute (null);
247                                                         else
248                                                                 sharedWorkQueue.TryAdd (value);
249                                                 }
250                                         }
251                                         
252                                         if (predicate ()) {
253                                                 return;
254                                         }
255                                 }
256                         }
257                 }
258                 
259                 static bool CheckTaskFitness (Task t)
260                 {
261                         return (t.CreationOptions | TaskCreationOptions.LongRunning) > 0;
262                 }
263                 
264                 public bool Finished {
265                         get {
266                                 return started == 0;
267                         }
268                 }
269                 
270                 public bool IsLocal {
271                         get {
272                                 return isLocal;
273                         }
274                 }
275                 
276                 public int Id {
277                         get {
278                                 return workerThread.ManagedThreadId;
279                         }
280                 }
281                 
282                 public bool Equals (ThreadWorker other)
283                 {
284                         return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);    
285                 }
286                 
287                 public override bool Equals (object obj)
288                 {
289                         ThreadWorker temp = obj as ThreadWorker;
290                         return temp == null ? false : Equals (temp);
291                 }
292                 
293                 public override int GetHashCode ()
294                 {
295                         return workerThread.ManagedThreadId.GetHashCode ();
296                 }
297         }
298 }
299 #endif