Merge pull request #980 from StephenMcConnel/bug-18638
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / ExecutingMessageBoxBase.cs
1 // ExecutingMessageBoxBase.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23
24 using System.Collections.Concurrent;
25
26 namespace System.Threading.Tasks.Dataflow {
27         /// <summary>
28         /// Base message box for execution blocks (synchronous and asynchrnous).
29         /// </summary>
30         /// <typeparam name="TInput">Type of the item the block is processing.</typeparam>
31         abstract class ExecutingMessageBoxBase<TInput> : MessageBox<TInput> {
32                 protected ExecutionDataflowBlockOptions Options { get; private set; }
33                 readonly Action outgoingQueueComplete;
34
35                 // even number: Task is waiting to run
36                 // odd number: Task is not waiting to run
37                 // invariant: dop / 2 Tasks are running or waiting
38                 int degreeOfParallelism = 1;
39
40                 protected ExecutingMessageBoxBase (
41                         ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
42                         CompletionHelper compHelper, Func<bool> externalCompleteTester,
43                         Action outgoingQueueComplete, ExecutionDataflowBlockOptions options)
44                         : base (
45                                 target, messageQueue, compHelper, externalCompleteTester,
46                                 options)
47                 {
48                         this.Options = options;
49                         this.outgoingQueueComplete = outgoingQueueComplete;
50                 }
51
52                 /// <summary>
53                 /// Makes sure the input queue is processed the way it needs to.
54                 /// </summary>
55                 /// <param name="newItem">Was new item just added?</param>
56                 protected override void EnsureProcessing (bool newItem)
57                 {
58                         StartProcessing ();
59                 }
60
61                 /// <summary>
62                 /// Starts processing queue on a task,
63                 /// assuming <see cref="ExecutionDataflowBlockOptions.MaxDegreeOfParallelism"/>
64                 /// was't reached yet.
65                 /// </summary>
66                 void StartProcessing ()
67                 {
68                         // atomically increase degreeOfParallelism by 1 only if it's odd
69                         // and low enough
70                         int startDegreeOfParallelism;
71                         int currentDegreeOfParallelism = degreeOfParallelism;
72                         do {
73                                 startDegreeOfParallelism = currentDegreeOfParallelism;
74                                 if (startDegreeOfParallelism % 2 == 0
75                                     || (Options.MaxDegreeOfParallelism != DataflowBlockOptions.Unbounded
76                                         && startDegreeOfParallelism / 2 >= Options.MaxDegreeOfParallelism))
77                                         return;
78                                 currentDegreeOfParallelism =
79                                         Interlocked.CompareExchange (ref degreeOfParallelism,
80                                                 startDegreeOfParallelism + 1, startDegreeOfParallelism);
81                         } while (startDegreeOfParallelism != currentDegreeOfParallelism);
82
83                         Task.Factory.StartNew (ProcessQueue, CancellationToken.None,
84                                 TaskCreationOptions.PreferFairness, Options.TaskScheduler);
85                 }
86
87                 /// <summary>
88                 /// Processes the input queue of the block.
89                 /// </summary>
90                 /// <remarks>
91                 /// Should first call <see cref="StartProcessQueue"/>,
92                 /// then process the queue and finally call <see cref="FinishProcessQueue"/>.
93                 /// </remarks>
94                 protected abstract void ProcessQueue ();
95
96                 /// <summary>
97                 /// Notifies that another processing task was started.
98                 /// Should be called right after <see cref="ProcessQueue"/> is actually executed.
99                 /// </summary>
100                 protected void StartProcessQueue ()
101                 {
102                         CompHelper.CanFaultOrCancelImmediatelly = false;
103
104                         int incrementedDegreeOfParallelism =
105                                 Interlocked.Increment (ref degreeOfParallelism);
106                         if ((Options.MaxDegreeOfParallelism == DataflowBlockOptions.Unbounded
107                              || incrementedDegreeOfParallelism / 2 < Options.MaxDegreeOfParallelism)
108                             && MessageQueue.Count > 1 && CompHelper.CanRun)
109                                 StartProcessing ();
110                 }
111
112                 /// <summary>
113                 /// Notifies that a processing task was finished.
114                 /// Should be called after <see cref="ProcessQueue"/> actually finishes processing.
115                 /// </summary>
116                 protected void FinishProcessQueue ()
117                 {
118                         int decrementedDegreeOfParallelism =
119                                 Interlocked.Add (ref degreeOfParallelism, -2);
120
121                         if (decrementedDegreeOfParallelism % 2 == 1) {
122                                 if (decrementedDegreeOfParallelism == 1) {
123                                         CompHelper.CanFaultOrCancelImmediatelly = true;
124                                         base.VerifyCompleteness ();
125                                         if (MessageQueue.IsCompleted)
126                                                 outgoingQueueComplete ();
127                                 }
128                                 if (MessageQueue.Count > 0 && CompHelper.CanRun)
129                                         StartProcessing ();
130                         }
131                 }
132
133                 /// <summary>
134                 /// Notifies that outgoing queue should be completed, if possible.
135                 /// </summary>
136                 protected override void OutgoingQueueComplete ()
137                 {
138                         if (MessageQueue.IsCompleted
139                             && Volatile.Read (ref degreeOfParallelism) == 1)
140                                 outgoingQueueComplete ();
141                 }
142
143                 /// <summary>
144                 /// Makes sure the block is completed if it should be.
145                 /// </summary>
146                 protected override void VerifyCompleteness ()
147                 {
148                         if (Volatile.Read (ref degreeOfParallelism) == 1)
149                                 base.VerifyCompleteness ();
150                 }
151
152                 /// <summary>
153                 /// Indicates whether a processing task can continue executing.
154                 /// </summary>
155                 /// <param name="iteration">The number of the iteration of the task, starting from 0.</param>
156                 protected bool CanRun (int iteration)
157                 {
158                         return CompHelper.CanRun
159                                && (Options.MaxMessagesPerTask == DataflowBlockOptions.Unbounded
160                                    || iteration < Options.MaxMessagesPerTask);
161                 }
162         }
163 }