Fixed completion of JoinTargets
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / JoinBlock.cs
1 // JoinBlock.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23
24 using System.Collections.Generic;
25
26 namespace System.Threading.Tasks.Dataflow
27 {
28         public sealed class JoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<T1, T2>>
29         {
30                 static readonly GroupingDataflowBlockOptions defaultOptions = new GroupingDataflowBlockOptions ();
31
32                 readonly CompletionHelper compHelper;
33                 readonly GroupingDataflowBlockOptions dataflowBlockOptions;
34                 readonly MessageOutgoingQueue<Tuple<T1, T2>> outgoing;
35
36                 readonly JoinTarget<T1> target1;
37                 readonly JoinTarget<T2> target2;
38
39                 SpinLock targetLock = new SpinLock(false);
40                 readonly AtomicBoolean nonGreedyProcessing = new AtomicBoolean ();
41
42                 public JoinBlock () : this (defaultOptions)
43                 {
44                 }
45
46                 public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
47                 {
48                         if (dataflowBlockOptions == null)
49                                 throw new ArgumentNullException ("dataflowBlockOptions");
50
51                         this.dataflowBlockOptions = dataflowBlockOptions;
52                         compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
53                         target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper,
54                                 () => outgoing.IsCompleted, dataflowBlockOptions,
55                                 dataflowBlockOptions.Greedy);
56                         target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper,
57                                 () => outgoing.IsCompleted, dataflowBlockOptions,
58                                 dataflowBlockOptions.Greedy);
59                         outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (this, compHelper,
60                                 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
61                                 _ =>
62                                 {
63                                         target1.DecreaseCount ();
64                                         target2.DecreaseCount ();
65                                 }, dataflowBlockOptions);
66                 }
67
68                 public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
69                 {
70                         return outgoing.AddTarget (target, linkOptions);
71                 }
72
73                 public bool TryReceive (Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
74                 {
75                         return outgoing.TryReceive (filter, out item);
76                 }
77
78                 public bool TryReceiveAll (out IList<Tuple<T1, T2>> items)
79                 {
80                         return outgoing.TryReceiveAll (out items);
81                 }
82
83                 public Tuple<T1, T2> ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
84                 {
85                         return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
86                 }
87
88                 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
89                 {
90                         outgoing.ReleaseReservation (messageHeader, target);
91                 }
92
93                 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
94                 {
95                         return outgoing.ReserveMessage (messageHeader, target);
96                 }
97
98                 public void Complete ()
99                 {
100                         target1.Complete ();
101                         target2.Complete ();
102                         outgoing.Complete ();
103                 }
104
105                 public void Fault (Exception ex)
106                 {
107                         compHelper.RequestFault (ex);
108                 }
109
110                 public Task Completion {
111                         get {
112                                 return compHelper.Completion;
113                         }
114                 }
115
116                 void SignalArrivalTargetImpl()
117                 {
118                         if (dataflowBlockOptions.Greedy) {
119                                 bool taken = false;
120                                 T1 value1;
121                                 T2 value2;
122
123                                 try {
124                                         targetLock.Enter (ref taken);
125
126                                         if (target1.Buffer.Count == 0 || target2.Buffer.Count == 0)
127                                                 return;
128
129                                         value1 = target1.Buffer.Take ();
130                                         value2 = target2.Buffer.Take ();
131                                 } finally {
132                                         if (taken)
133                                                 targetLock.Exit ();
134                                 }
135
136                                 TriggerMessage (value1, value2);
137                         } else {
138                                 if (ShouldProcessNonGreedy ())
139                                         EnsureNonGreedyProcessing ();
140                         }
141                 }
142
143                 bool ShouldProcessNonGreedy ()
144                 {
145                         return target1.PostponedMessagesCount >= 1
146                                && target2.PostponedMessagesCount >= 1
147                                && (dataflowBlockOptions.BoundedCapacity == -1
148                                    || outgoing.Count < dataflowBlockOptions.BoundedCapacity);
149                 }
150
151                 void EnsureNonGreedyProcessing ()
152                 {
153                         if (nonGreedyProcessing.TrySet ())
154                                 Task.Factory.StartNew (NonGreedyProcess,
155                                         dataflowBlockOptions.CancellationToken,
156                                         TaskCreationOptions.PreferFairness,
157                                         dataflowBlockOptions.TaskScheduler);
158                 }
159
160                 void NonGreedyProcess()
161                 {
162                         while (ShouldProcessNonGreedy ()) {
163                                 var reservation1 = target1.ReserveMessage ();
164
165                                 if (reservation1 == null)
166                                         break;
167
168                                 var reservation2 = target2.ReserveMessage ();
169                                 if (reservation2 == null) {
170                                         target1.RelaseReservation (reservation1);
171                                         break;
172                                 }
173
174                                 var value1 = target1.ConsumeReserved (reservation1);
175                                 var value2 = target2.ConsumeReserved (reservation2);
176
177                                 TriggerMessage (value1, value2);
178                         }
179
180                         nonGreedyProcessing.Value = false;
181
182                         if (ShouldProcessNonGreedy ())
183                                 EnsureNonGreedyProcessing ();
184                 }
185
186
187                 void TriggerMessage (T1 val1, T2 val2)
188                 {
189                         outgoing.AddData (Tuple.Create (val1, val2));
190                 }
191
192                 public ITargetBlock<T1> Target1 {
193                         get {
194                                 return target1;
195                         }
196                 }
197
198                 public ITargetBlock<T2> Target2 {
199                         get {
200                                 return target2;
201                         }
202                 }
203
204                 public override string ToString ()
205                 {
206                         return NameHelper.GetName (this, dataflowBlockOptions);
207                 }
208         }
209 }