1 // ConcurrentExclusiveSchedulerPair.cs
3 // Copyright (c) 2011 Jérémie "garuma" Laval
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 using System.Threading;
27 using System.Collections.Generic;
28 using System.Collections.Concurrent;
30 namespace System.Threading.Tasks
32 public class ConcurrentExclusiveSchedulerPair : IDisposable
34 readonly int maxConcurrencyLevel;
35 readonly int maxItemsPerTask;
37 readonly TaskScheduler target;
38 readonly TaskFactory factory;
39 readonly Action taskHandler;
41 readonly ConcurrentQueue<Task> concurrentTasks = new ConcurrentQueue<Task> ();
42 readonly ConcurrentQueue<Task> exclusiveTasks = new ConcurrentQueue<Task> ();
44 readonly ReaderWriterLockSlim rwl = new ReaderWriterLockSlim ();
45 readonly TaskCompletionSource<object> completion = new TaskCompletionSource<object> ();
46 readonly InnerTaskScheduler concurrent;
47 readonly InnerTaskScheduler exclusive;
51 class InnerTaskScheduler : TaskScheduler
53 readonly ConcurrentExclusiveSchedulerPair scheduler;
54 readonly ConcurrentQueue<Task> queue;
56 public InnerTaskScheduler (ConcurrentExclusiveSchedulerPair scheduler,
57 ConcurrentQueue<Task> queue)
59 this.scheduler = scheduler;
63 public override int MaximumConcurrencyLevel {
65 return scheduler.maxConcurrencyLevel;
69 protected override void QueueTask (Task t)
71 scheduler.DoQueue (t, queue);
74 protected override bool TryExecuteTaskInline (Task task, bool taskWasPreviouslyQueued)
76 if (task.Status != TaskStatus.Created)
79 task.RunSynchronously (scheduler.target);
83 public void Execute (Task t)
88 [MonoTODO ("Only useful for debugger support")]
89 protected override IEnumerable<Task> GetScheduledTasks ()
91 throw new NotImplementedException ();
95 public ConcurrentExclusiveSchedulerPair () : this (TaskScheduler.Current)
99 public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler) : this (taskScheduler, taskScheduler.MaximumConcurrencyLevel)
103 public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler, int maxConcurrencyLevel)
104 : this (taskScheduler, maxConcurrencyLevel, -1)
108 public ConcurrentExclusiveSchedulerPair (TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask)
110 this.target = taskScheduler;
111 this.maxConcurrencyLevel = maxConcurrencyLevel;
112 this.maxItemsPerTask = maxItemsPerTask;
113 this.factory = new TaskFactory (taskScheduler);
114 this.taskHandler = InternalTaskProcesser;
115 this.concurrent = new InnerTaskScheduler (this, concurrentTasks);
116 this.exclusive = new InnerTaskScheduler (this, exclusiveTasks);
119 public void Complete ()
121 completion.SetResult (null);
124 public TaskScheduler ConcurrentScheduler {
130 public TaskScheduler ExclusiveScheduler {
136 public Task Completion {
138 return completion.Task;
142 public void Dispose ()
148 protected virtual void Dispose (bool disposing)
152 void DoQueue (Task task, ConcurrentQueue<Task> queue)
154 queue.Enqueue (task);
158 void InternalTaskProcesser ()
162 const int lockWaitTime = 2;
164 while (!concurrentTasks.IsEmpty || !exclusiveTasks.IsEmpty) {
165 if (maxItemsPerTask != -1 && ++times == maxItemsPerTask)
171 if (!concurrentTasks.IsEmpty && rwl.TryEnterReadLock (lockWaitTime)) {
173 while (concurrentTasks.TryDequeue (out task)) {
185 if (!exclusiveTasks.IsEmpty && rwl.TryEnterWriteLock (lockWaitTime)) {
187 while (exclusiveTasks.TryDequeue (out task)) {
193 rwl.ExitWriteLock ();
197 // TODO: there's a race here, task adding + spinup check may be done while here
198 Interlocked.Decrement (ref numTask);
203 int currentTaskNumber;
205 currentTaskNumber = numTask;
206 if (currentTaskNumber >= maxConcurrencyLevel)
208 } while (Interlocked.CompareExchange (ref numTask, currentTaskNumber + 1, currentTaskNumber) != currentTaskNumber);
210 factory.StartNew (taskHandler);
213 void RunTask (Task task)
215 concurrent.Execute (task);