Merge pull request #2228 from lambdageek/dev/sgen-timeouts
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / JoinBlock`3.cs
1 // JoinBlock`3.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23
24 using System.Collections.Generic;
25
26 namespace System.Threading.Tasks.Dataflow
27 {
28         public sealed class JoinBlock<T1, T2, T3> : IReceivableSourceBlock<Tuple<T1, T2, T3>>
29         {
30                 readonly CompletionHelper compHelper;
31                 readonly GroupingDataflowBlockOptions dataflowBlockOptions;
32                 readonly OutgoingQueue<Tuple<T1, T2, T3>> outgoing;
33
34                 readonly JoinTarget<T1> target1;
35                 readonly JoinTarget<T2> target2;
36                 readonly JoinTarget<T3> target3;
37
38                 SpinLock targetLock = new SpinLock (false);
39                 readonly AtomicBoolean nonGreedyProcessing = new AtomicBoolean ();
40
41                 long target1Count;
42                 long target2Count;
43                 long target3Count;
44                 long numberOfGroups;
45
46                 public JoinBlock () : this (GroupingDataflowBlockOptions.Default)
47                 {
48                 }
49
50                 public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
51                 {
52                         if (dataflowBlockOptions == null)
53                                 throw new ArgumentNullException ("dataflowBlockOptions");
54
55                         this.dataflowBlockOptions = dataflowBlockOptions;
56                         this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
57
58                         target1 = new JoinTarget<T1> (this, SignalArrivalTarget, compHelper,
59                                 () => outgoing.IsCompleted, dataflowBlockOptions,
60                                 dataflowBlockOptions.Greedy, TryAdd1);
61                         target2 = new JoinTarget<T2> (this, SignalArrivalTarget, compHelper,
62                                 () => outgoing.IsCompleted, dataflowBlockOptions,
63                                 dataflowBlockOptions.Greedy, TryAdd2);
64                         target3 = new JoinTarget<T3> (this, SignalArrivalTarget, compHelper,
65                                 () => outgoing.IsCompleted, dataflowBlockOptions,
66                                 dataflowBlockOptions.Greedy, TryAdd3);
67                         outgoing = new OutgoingQueue<Tuple<T1, T2, T3>> (
68                                 this, compHelper,
69                                 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted
70                                       || target3.Buffer.IsCompleted,
71                                 _ =>
72                                 {
73                                         target1.DecreaseCount ();
74                                         target2.DecreaseCount ();
75                                         target3.DecreaseCount ();
76                                 }, dataflowBlockOptions);
77                 }
78
79                 public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2, T3>> target, DataflowLinkOptions linkOptions)
80                 {
81                         return outgoing.AddTarget (target, linkOptions);
82                 }
83
84                 public bool TryReceive (Predicate<Tuple<T1, T2, T3>> filter, out Tuple<T1, T2, T3> item)
85                 {
86                         return outgoing.TryReceive (filter, out item);
87                 }
88
89                 public bool TryReceiveAll (out IList<Tuple<T1, T2, T3>> items)
90                 {
91                         return outgoing.TryReceiveAll (out items);
92                 }
93
94                 Tuple<T1, T2, T3> ISourceBlock<Tuple<T1, T2, T3>>.ConsumeMessage (
95                         DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2, T3>> target,
96                         out bool messageConsumed)
97                 {
98                         return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
99                 }
100
101                 void ISourceBlock<Tuple<T1, T2, T3>>.ReleaseReservation (
102                         DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2, T3>> target)
103                 {
104                         outgoing.ReleaseReservation (messageHeader, target);
105                 }
106
107                 bool ISourceBlock<Tuple<T1, T2, T3>>.ReserveMessage (
108                         DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2, T3>> target)
109                 {
110                         return outgoing.ReserveMessage (messageHeader, target);
111                 }
112
113                 public void Complete ()
114                 {
115                         target1.Complete ();
116                         target2.Complete ();
117                         target3.Complete ();
118                         outgoing.Complete ();
119                 }
120
121                 void IDataflowBlock.Fault (Exception exception)
122                 {
123                         compHelper.RequestFault (exception);
124                 }
125
126                 public Task Completion {
127                         get { return compHelper.Completion; }
128                 }
129
130                 /// <summary>
131                 /// Returns whether a new item can be accepted by the first target,
132                 /// and increments a counter if it can.
133                 /// </summary>
134                 bool TryAdd1 ()
135                 {
136                         return dataflowBlockOptions.MaxNumberOfGroups == -1
137                                || Interlocked.Increment (ref target1Count)
138                                <= dataflowBlockOptions.MaxNumberOfGroups;
139                 }
140
141                 /// <summary>
142                 /// Returns whether a new item can be accepted by the second target,
143                 /// and increments a counter if it can.
144                 /// </summary>
145                 bool TryAdd2 ()
146                 {
147                         return dataflowBlockOptions.MaxNumberOfGroups == -1
148                                || Interlocked.Increment (ref target2Count)
149                                <= dataflowBlockOptions.MaxNumberOfGroups;
150                 }
151
152                 /// <summary>
153                 /// Returns whether a new item can be accepted by the third target,
154                 /// and increments a counter if it can.
155                 /// </summary>
156                 bool TryAdd3 ()
157                 {
158                         return dataflowBlockOptions.MaxNumberOfGroups == -1
159                                || Interlocked.Increment (ref target3Count)
160                                <= dataflowBlockOptions.MaxNumberOfGroups;
161                 }
162
163                 /// <summary>
164                 /// Decides whether to create a new tuple or not.
165                 /// </summary>
166                 void SignalArrivalTarget ()
167                 {
168                         if (dataflowBlockOptions.Greedy) {
169                                 bool taken = false;
170                                 T1 value1;
171                                 T2 value2;
172                                 T3 value3;
173
174                                 try {
175                                         targetLock.Enter (ref taken);
176
177                                         if (target1.Buffer.Count == 0 || target2.Buffer.Count == 0
178                                             || target3.Buffer.Count == 0)
179                                                 return;
180
181                                         value1 = target1.Buffer.Take ();
182                                         value2 = target2.Buffer.Take ();
183                                         value3 = target3.Buffer.Take ();
184                                 } finally {
185                                         if (taken)
186                                                 targetLock.Exit ();
187                                 }
188
189                                 TriggerMessage (value1, value2, value3);
190                         } else {
191                                 if (ShouldProcesNonGreedy ())
192                                         EnsureNonGreedyProcessing ();
193                         }
194                 }
195
196                 /// <summary>
197                 /// Returns whether non-greedy creation of a tuple should be started.
198                 /// </summary>
199                 bool ShouldProcesNonGreedy ()
200                 {
201                         return target1.PostponedMessagesCount >= 1
202                                && target2.PostponedMessagesCount >= 1
203                                && target3.PostponedMessagesCount >= 1
204                                && (dataflowBlockOptions.BoundedCapacity == -1
205                                    || outgoing.Count < dataflowBlockOptions.BoundedCapacity);
206                 }
207
208                 /// <summary>
209                 /// Starts non-greedy creation of tuples, if one doesn't already run.
210                 /// </summary>
211                 void EnsureNonGreedyProcessing ()
212                 {
213                         if (nonGreedyProcessing.TrySet())
214                                 Task.Factory.StartNew (NonGreedyProcess,
215                                         dataflowBlockOptions.CancellationToken,
216                                         TaskCreationOptions.PreferFairness,
217                                         dataflowBlockOptions.TaskScheduler);
218                 }
219
220                 /// <summary>
221                 /// Creates tuples in non-greedy mode,
222                 /// making sure the whole tuple is available by using reservations.
223                 /// </summary>
224                 void NonGreedyProcess ()
225                 {
226                         while (ShouldProcesNonGreedy ()) {
227                                 var reservation1 = target1.ReserveMessage ();
228
229                                 if (reservation1 == null)
230                                         break;
231
232                                 var reservation2 = target2.ReserveMessage ();
233                                 if (reservation2 == null) {
234                                         target1.RelaseReservation (reservation1);
235                                         break;
236                                 }
237
238                                 var reservation3 = target3.ReserveMessage ();
239                                 if (reservation3 == null) {
240                                         target1.RelaseReservation (reservation1);
241                                         target2.RelaseReservation (reservation2);
242                                         break;
243                                 }
244
245                                 var value1 = target1.ConsumeReserved (reservation1);
246                                 var value2 = target2.ConsumeReserved (reservation2);
247                                 var value3 = target3.ConsumeReserved (reservation3);
248
249                                 TriggerMessage (value1, value2, value3);
250                         }
251
252                         nonGreedyProcessing.Value = false;
253
254                         if (ShouldProcesNonGreedy ())
255                                 EnsureNonGreedyProcessing ();
256                 }
257
258                 /// <summary>
259                 /// Creates a tuple from the given values and adds the result to the output queue.
260                 /// </summary>
261                 void TriggerMessage (T1 val1, T2 val2, T3 val3)
262                 {
263                         outgoing.AddData (Tuple.Create (val1, val2, val3));
264
265                         if (dataflowBlockOptions.MaxNumberOfGroups != -1
266                             && Interlocked.Increment (ref numberOfGroups)
267                             >= dataflowBlockOptions.MaxNumberOfGroups)
268                                 Complete ();
269                 }
270
271                 public ITargetBlock<T1> Target1 {
272                         get { return target1; }
273                 }
274
275                 public ITargetBlock<T2> Target2 {
276                         get { return target2; }
277                 }
278
279                 public ITargetBlock<T3> Target3 {
280                         get { return target3; }
281                 }
282
283                 public int OutputCount {
284                         get { return outgoing.Count; }
285                 }
286
287                 public override string ToString ()
288                 {
289                         return NameHelper.GetName (this, dataflowBlockOptions);
290                 }
291         }
292 }