System.Drawing: added email to icon and test file headers
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks / ConcurrentExclusiveSchedulerPair.cs
1 // ConcurrentExclusiveSchedulerPair.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 using System;
26 using System.Threading;
27 using System.Collections.Generic;
28 using System.Collections.Concurrent;
29
30 namespace System.Threading.Tasks
31 {
32         public class ConcurrentExclusiveSchedulerPair : IDisposable
33         {
34                 readonly int maxConcurrencyLevel;
35                 readonly int maxItemsPerTask;
36
37                 readonly TaskScheduler target;
38                 readonly TaskFactory factory;
39                 readonly Action taskHandler;
40
41                 readonly ConcurrentQueue<Task> concurrentTasks = new ConcurrentQueue<Task> ();
42                 readonly ConcurrentQueue<Task> exclusiveTasks = new ConcurrentQueue<Task> ();
43
44                 readonly ReaderWriterLockSlim rwl = new ReaderWriterLockSlim ();
45                 readonly TaskCompletionSource<object> completion = new TaskCompletionSource<object> ();
46                 readonly InnerTaskScheduler concurrent;
47                 readonly InnerTaskScheduler exclusive;
48
49                 int numTask;
50
51                 class InnerTaskScheduler : TaskScheduler
52                 {
53                         readonly ConcurrentExclusiveSchedulerPair scheduler;
54                         readonly ConcurrentQueue<Task> queue;
55
56                         public InnerTaskScheduler (ConcurrentExclusiveSchedulerPair scheduler,
57                                                        ConcurrentQueue<Task> queue)
58                         {
59                                 this.scheduler = scheduler;
60                                 this.queue = queue;
61                         }
62
63                         public override int MaximumConcurrencyLevel {
64                                 get {
65                                         return scheduler.maxConcurrencyLevel;
66                                 }
67                         }
68
69                         protected override void QueueTask (Task t)
70                         {
71                                 scheduler.DoQueue (t, queue);
72                         }
73
74                         protected override bool TryExecuteTaskInline (Task task, bool taskWasPreviouslyQueued)
75                         {
76                                 if (task.Status != TaskStatus.Created)
77                                         return false;
78
79                                 task.RunSynchronously (scheduler.target);
80                                 return true;
81                         }
82
83                         public void Execute (Task t)
84                         {
85                                 TryExecuteTask (t);
86                         }
87
88                         [MonoTODO ("Only useful for debugger support")]
89                         protected override IEnumerable<Task> GetScheduledTasks ()
90                         {
91                                 throw new NotImplementedException ();
92                         }
93                 }
94
95                 public ConcurrentExclusiveSchedulerPair () : this (TaskScheduler.Current)
96                 {
97                 }
98
99                 public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler) : this (taskScheduler, taskScheduler.MaximumConcurrencyLevel)
100                 {
101                 }
102
103                 public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler, int maxConcurrencyLevel)
104                         : this (taskScheduler, maxConcurrencyLevel, -1)
105                 {
106                 }
107
108                 public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask)
109                 {
110                         this.target = taskScheduler;
111                         this.maxConcurrencyLevel = maxConcurrencyLevel;
112                         this.maxItemsPerTask = maxItemsPerTask;
113                         this.factory = new TaskFactory (taskScheduler);
114                         this.taskHandler = InternalTaskProcesser;
115                         this.concurrent = new InnerTaskScheduler (this, concurrentTasks);
116                         this.exclusive = new InnerTaskScheduler (this, exclusiveTasks);
117                 }
118
119                 public void Complete ()
120                 {
121                         completion.SetResult (null);
122                 }
123
124                 public TaskScheduler ConcurrentScheduler {
125                         get {
126                                 return concurrent;
127                         }
128                 }
129
130                 public TaskScheduler ExclusiveScheduler {
131                         get {
132                                 return exclusive;
133                         }
134                 }
135
136                 public Task Completion {
137                         get {
138                                 return completion.Task;
139                         }
140                 }
141
142                 public void Dispose ()
143                 {
144                         Dispose (true);
145                 }
146
147                 [MonoTODO]
148                 protected virtual void Dispose (bool disposing)
149                 {
150                 }
151
152                 void DoQueue (Task task, ConcurrentQueue<Task> queue)
153                 {
154                         queue.Enqueue (task);
155                         SpinUpTasks ();
156                 }
157
158                 void InternalTaskProcesser ()
159                 {
160                         Task task;
161                         int times = 0;
162                         const int lockWaitTime = 2;
163
164                         while (!concurrentTasks.IsEmpty || !exclusiveTasks.IsEmpty) {
165                                 if (maxItemsPerTask != -1 && ++times == maxItemsPerTask)
166                                         break;
167
168                                 bool locked = false;
169
170                                 try {
171                                         if (!concurrentTasks.IsEmpty && rwl.TryEnterReadLock (lockWaitTime)) {
172                                                 locked = true;
173                                                 while (concurrentTasks.TryDequeue (out task)) {
174                                                         RunTask (task);
175                                                 }
176                                         }
177                                 } finally {
178                                         if (locked) {
179                                                 rwl.ExitReadLock ();
180                                                 locked = false;
181                                         }
182                                 }
183
184                                 try {
185                                         if (!exclusiveTasks.IsEmpty && rwl.TryEnterWriteLock (lockWaitTime)) {
186                                                 locked = true;
187                                                 while (exclusiveTasks.TryDequeue (out task)) {
188                                                         RunTask (task);
189                                                 }
190                                         }
191                                 } finally {
192                                         if (locked) {
193                                                 rwl.ExitWriteLock ();
194                                         }
195                                 }
196                         }
197                         // TODO: there's a race here, task adding + spinup check may be done while here
198                         Interlocked.Decrement (ref numTask);
199                 }
200
201                 void SpinUpTasks ()
202                 {
203                         int currentTaskNumber;
204                         do {
205                                 currentTaskNumber = numTask;
206                                 if (currentTaskNumber >= maxConcurrencyLevel)
207                                         return;
208                         } while (Interlocked.CompareExchange (ref numTask, currentTaskNumber + 1, currentTaskNumber) != currentTaskNumber);
209
210                         factory.StartNew (taskHandler);
211                 }
212
213                 void RunTask (Task task)
214                 {
215                         concurrent.Execute (task);
216                 }
217         }
218 }