Merge branch 'xml-fixes' of https://github.com/myeisha/mono into myeisha-xml-fixes
[mono.git] / mcs / class / corlib / System.Threading.Tasks / ThreadWorker.cs
1 // ThreadWorker.cs
2 //
3 // Copyright (c) 2008 Jérémie "Garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 #if NET_4_0 || BOOTSTRAP_NET_4_0
26 using System;
27 using System.Threading;
28 using System.Collections.Concurrent;
29
30 namespace System.Threading.Tasks
31 {
32         internal class ThreadWorker : IDisposable
33         {
34                 Thread workerThread;
35                 
36                 readonly IDequeOperations<Task> dDeque;
37                 readonly ThreadWorker[]         others;
38                 readonly EventWaitHandle        waitHandle;
39                 readonly IProducerConsumerCollection<Task> sharedWorkQueue;
40                 readonly IScheduler             sched;
41                 readonly ThreadPriority         threadPriority;
42
43                 // Flag to tell if workerThread is running
44                 int started = 0; 
45                 
46                 readonly int  workerLength;
47                 readonly int  workerPosition;
48                 const    int  maxRetry = 5;
49                 
50                 const int sleepThreshold = 100;
51                 const int deepSleepTime = 10;
52                 
53                 public ThreadWorker (IScheduler sched,
54                                      ThreadWorker[] others,
55                                      int workerPosition,
56                                      IProducerConsumerCollection<Task> sharedWorkQueue,
57                                      ThreadPriority priority,
58                                      EventWaitHandle handle)
59                 {
60                         this.others          = others;
61                         this.dDeque          = new CyclicDeque<Task> ();
62                         this.sched           = sched;
63                         this.sharedWorkQueue = sharedWorkQueue;
64                         this.workerLength    = others.Length;
65                         this.workerPosition  = workerPosition;
66                         this.waitHandle      = handle;
67                         this.threadPriority  = priority;
68
69                         InitializeUnderlyingThread ();
70                 }
71                 
72                 void InitializeUnderlyingThread ()
73                 {
74                         this.workerThread = new Thread (WorkerMethodWrapper);
75         
76                         this.workerThread.IsBackground = true;
77                         this.workerThread.Priority = threadPriority;
78                         this.workerThread.Name = "ParallelFxThreadWorker";
79                 }
80
81                 public void Dispose ()
82                 {
83                         Stop ();
84                         if (workerThread.ThreadState != ThreadState.Stopped)
85                                 workerThread.Abort ();
86                 }
87                 
88                 public void Pulse ()
89                 {
90                         if (started == 1)
91                                 return;
92
93                         // If the thread was stopped then set it in use and restart it
94                         int result = Interlocked.Exchange (ref started, 1);
95                         if (result != 0)
96                                 return;
97
98                         if (this.workerThread.ThreadState != ThreadState.Unstarted) {
99                                 InitializeUnderlyingThread ();
100                         }
101
102                         workerThread.Start ();
103                 }
104                 
105                 public void Stop ()
106                 {
107                         // Set the flag to stop so that the while in the thread will stop
108                         // doing its infinite loop.
109                         started = 0;
110                 }
111                 
112                 // This is the actual method called in the Thread
113                 void WorkerMethodWrapper ()
114                 {
115                         int sleepTime = 0;
116                         SpinWait wait = new SpinWait ();
117                         
118                         // Main loop
119                         while (started == 1) {
120                                 bool result = false;
121
122                                 result = WorkerMethod ();
123                                 
124                                 if (result) {
125                                         sleepTime = 0;
126                                         wait = new SpinWait ();
127                                 }
128
129                                 // Wait a little and if the Thread has been more sleeping than working shut it down
130                                 wait.SpinOnce ();
131
132                                 // If we are spinning too much, have a deeper sleep
133                                 if (sleepTime++ > sleepThreshold)
134                                         waitHandle.WaitOne (deepSleepTime);
135                         }
136
137                         started = 0;
138                 }
139                 
140                 // Main method, used to do all the logic of retrieving, processing and stealing work.
141                 bool WorkerMethod ()
142                 {               
143                         bool result = false;
144                         bool hasStolenFromOther;
145                         do {
146                                 hasStolenFromOther = false;
147                                 
148                                 Task value;
149                                 
150                                 // We fill up our work deque concurrently with other ThreadWorker
151                                 while (sharedWorkQueue.Count > 0) {
152                                         while (sharedWorkQueue.TryTake (out value)) {
153                                                 dDeque.PushBottom (value);
154                                         }
155                                         
156                                         // Now we process our work
157                                         while (dDeque.PopBottom (out value) == PopResult.Succeed) {
158                                                 if (value != null) {
159                                                         value.Execute (ChildWorkAdder);
160                                                         result = true;
161                                                 }
162                                         }
163                                 }
164                                 
165                                 // When we have finished, steal from other worker
166                                 ThreadWorker other;
167                                 
168                                 // Repeat the operation a little so that we can let other things process.
169                                 for (int j = 0; j < maxRetry; ++j) {
170                                         int len = workerLength + workerPosition;
171                                         // Start stealing with the ThreadWorker at our right to minimize contention
172                                         for (int it = workerPosition + 1; it < len; ++it) {
173                                                 int i = it % workerLength;
174                                                 if ((other = others [i]) == null || other == this)
175                                                         continue;
176                                                 
177                                                 // Maybe make this steal more than one item at a time, see TODO.
178                                                 if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
179                                                         hasStolenFromOther = true;
180                                                         if (value != null) {
181                                                                 value.Execute (ChildWorkAdder);
182                                                                 result = true;
183                                                         }
184                                                 }
185                                         }
186                                 }
187                         } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
188                         
189                         return result;
190                 }
191                 
192                 // Almost same as above but with an added predicate and treating one item at a time. 
193                 // It's used by Scheduler Participate(...) method for special waiting case like
194                 // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
195                 // Predicate should be really fast and not blocking as it is called a good deal of time
196                 // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
197                 public static void WorkerMethod (Func<bool> predicate, IProducerConsumerCollection<Task> sharedWorkQueue,
198                                                  ThreadWorker[] others)
199                 {
200                         SpinWait wait = new SpinWait ();
201
202                         while (!predicate ()) {
203                                 Task value;
204                                 
205                                 // Dequeue only one item as we have restriction
206                                 if (sharedWorkQueue.TryTake (out value)) {
207                                         if (value != null) {
208                                                 if (CheckTaskFitness (value))
209                                                         value.Execute (null);
210                                                 else
211                                                         sharedWorkQueue.TryAdd (value);
212                                         }
213                                 }
214                                 
215                                 // First check to see if we comply to predicate
216                                 if (predicate ())
217                                         return;
218                                 
219                                 // Try to complete other work by stealing since our desired tasks may be in other worker
220                                 ThreadWorker other;
221                                 for (int i = 0; i < others.Length; i++) {
222                                         if ((other = others [i]) == null)
223                                                 continue;
224                                         
225                                         if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
226                                                 if (value != null) {
227                                                         if (CheckTaskFitness (value))
228                                                                 value.Execute (null);
229                                                         else
230                                                                 sharedWorkQueue.TryAdd (value);
231                                                 }
232                                         }
233                                         
234                                         if (predicate ())
235                                                 return;
236                                 }
237
238                                 wait.SpinOnce ();
239                         }
240                 }
241
242                 internal void ChildWorkAdder (Task t)
243                 {
244                         dDeque.PushBottom (t);
245                         sched.PulseAll ();
246                 }
247                 
248                 static bool CheckTaskFitness (Task t)
249                 {
250                         return (t.CreationOptions | TaskCreationOptions.LongRunning) > 0;
251                 }
252                 
253                 public bool Finished {
254                         get {
255                                 return started == 0;
256                         }
257                 }
258
259                 public int Id {
260                         get {
261                                 return workerThread.ManagedThreadId;
262                         }
263                 }
264                 
265                 public bool Equals (ThreadWorker other)
266                 {
267                         return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);    
268                 }
269                 
270                 public override bool Equals (object obj)
271                 {
272                         ThreadWorker temp = obj as ThreadWorker;
273                         return temp == null ? false : Equals (temp);
274                 }
275                 
276                 public override int GetHashCode ()
277                 {
278                         return workerThread.ManagedThreadId.GetHashCode ();
279                 }
280         }
281 }
282 #endif