[System] Fixes UdpClient.Receive with IPv6 endpoint
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / JoinBlock.cs
1 // JoinBlock.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23
24 using System.Collections.Generic;
25
26 namespace System.Threading.Tasks.Dataflow
27 {
28         public sealed class JoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<T1, T2>>
29         {
30                 readonly CompletionHelper compHelper;
31                 readonly GroupingDataflowBlockOptions dataflowBlockOptions;
32                 readonly OutgoingQueue<Tuple<T1, T2>> outgoing;
33
34                 readonly JoinTarget<T1> target1;
35                 readonly JoinTarget<T2> target2;
36
37                 SpinLock targetLock = new SpinLock(false);
38                 readonly AtomicBoolean nonGreedyProcessing = new AtomicBoolean ();
39
40                 long target1Count;
41                 long target2Count;
42                 long numberOfGroups;
43
44                 public JoinBlock () : this (GroupingDataflowBlockOptions.Default)
45                 {
46                 }
47
48                 public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
49                 {
50                         if (dataflowBlockOptions == null)
51                                 throw new ArgumentNullException ("dataflowBlockOptions");
52
53                         this.dataflowBlockOptions = dataflowBlockOptions;
54                         compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
55                         target1 = new JoinTarget<T1> (this, SignalArrivalTarget, compHelper,
56                                 () => outgoing.IsCompleted, dataflowBlockOptions,
57                                 dataflowBlockOptions.Greedy, TryAdd1);
58                         target2 = new JoinTarget<T2> (this, SignalArrivalTarget, compHelper,
59                                 () => outgoing.IsCompleted, dataflowBlockOptions,
60                                 dataflowBlockOptions.Greedy, TryAdd2);
61                         outgoing = new OutgoingQueue<Tuple<T1, T2>> (this, compHelper,
62                                 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
63                                 _ =>
64                                 {
65                                         target1.DecreaseCount ();
66                                         target2.DecreaseCount ();
67                                 }, dataflowBlockOptions);
68                 }
69
70                 public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
71                 {
72                         return outgoing.AddTarget (target, linkOptions);
73                 }
74
75                 public bool TryReceive (Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
76                 {
77                         return outgoing.TryReceive (filter, out item);
78                 }
79
80                 public bool TryReceiveAll (out IList<Tuple<T1, T2>> items)
81                 {
82                         return outgoing.TryReceiveAll (out items);
83                 }
84
85                 Tuple<T1, T2> ISourceBlock<Tuple<T1, T2>>.ConsumeMessage (
86                         DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target,
87                         out bool messageConsumed)
88                 {
89                         return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
90                 }
91
92                 void ISourceBlock<Tuple<T1, T2>>.ReleaseReservation (
93                         DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
94                 {
95                         outgoing.ReleaseReservation (messageHeader, target);
96                 }
97
98                 bool ISourceBlock<Tuple<T1, T2>>.ReserveMessage (
99                         DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
100                 {
101                         return outgoing.ReserveMessage (messageHeader, target);
102                 }
103
104                 public void Complete ()
105                 {
106                         target1.Complete ();
107                         target2.Complete ();
108                         outgoing.Complete ();
109                 }
110
111                 void IDataflowBlock.Fault (Exception exception)
112                 {
113                         compHelper.RequestFault (exception);
114                 }
115
116                 public Task Completion {
117                         get { return compHelper.Completion; }
118                 }
119
120                 /// <summary>
121                 /// Returns whether a new item can be accepted by the first target,
122                 /// and increments a counter if it can.
123                 /// </summary>
124                 bool TryAdd1 ()
125                 {
126                         return dataflowBlockOptions.MaxNumberOfGroups == -1
127                                || Interlocked.Increment (ref target1Count)
128                                <= dataflowBlockOptions.MaxNumberOfGroups;
129                 }
130
131                 /// <summary>
132                 /// Returns whether a new item can be accepted by the second target,
133                 /// and increments a counter if it can.
134                 /// </summary>
135                 bool TryAdd2 ()
136                 {
137                         return dataflowBlockOptions.MaxNumberOfGroups == -1
138                                || Interlocked.Increment (ref target2Count)
139                                <= dataflowBlockOptions.MaxNumberOfGroups;
140                 }
141
142                 /// <summary>
143                 /// Decides whether to create a new tuple or not.
144                 /// </summary>
145                 void SignalArrivalTarget ()
146                 {
147                         if (dataflowBlockOptions.Greedy) {
148                                 bool taken = false;
149                                 T1 value1;
150                                 T2 value2;
151
152                                 try {
153                                         targetLock.Enter (ref taken);
154
155                                         if (target1.Buffer.Count == 0 || target2.Buffer.Count == 0)
156                                                 return;
157
158                                         value1 = target1.Buffer.Take ();
159                                         value2 = target2.Buffer.Take ();
160                                 } finally {
161                                         if (taken)
162                                                 targetLock.Exit ();
163                                 }
164
165                                 TriggerMessage (value1, value2);
166                         } else {
167                                 if (ShouldProcessNonGreedy ())
168                                         EnsureNonGreedyProcessing ();
169                         }
170                 }
171
172                 /// <summary>
173                 /// Returns whether non-greedy creation of a tuple should be started.
174                 /// </summary>
175                 bool ShouldProcessNonGreedy ()
176                 {
177                         return target1.PostponedMessagesCount >= 1
178                                && target2.PostponedMessagesCount >= 1
179                                && (dataflowBlockOptions.BoundedCapacity == -1
180                                    || outgoing.Count < dataflowBlockOptions.BoundedCapacity);
181                 }
182
183                 /// <summary>
184                 /// Starts non-greedy creation of tuples, if one doesn't already run.
185                 /// </summary>
186                 void EnsureNonGreedyProcessing ()
187                 {
188                         if (nonGreedyProcessing.TrySet ())
189                                 Task.Factory.StartNew (NonGreedyProcess,
190                                         dataflowBlockOptions.CancellationToken,
191                                         TaskCreationOptions.PreferFairness,
192                                         dataflowBlockOptions.TaskScheduler);
193                 }
194
195                 /// <summary>
196                 /// Creates tuples in non-greedy mode,
197                 /// making sure the whole tuple is available by using reservations.
198                 /// </summary>
199                 void NonGreedyProcess()
200                 {
201                         while (ShouldProcessNonGreedy ()) {
202                                 var reservation1 = target1.ReserveMessage ();
203
204                                 if (reservation1 == null)
205                                         break;
206
207                                 var reservation2 = target2.ReserveMessage ();
208                                 if (reservation2 == null) {
209                                         target1.RelaseReservation (reservation1);
210                                         break;
211                                 }
212
213                                 var value1 = target1.ConsumeReserved (reservation1);
214                                 var value2 = target2.ConsumeReserved (reservation2);
215
216                                 TriggerMessage (value1, value2);
217                         }
218
219                         nonGreedyProcessing.Value = false;
220
221                         if (ShouldProcessNonGreedy ())
222                                 EnsureNonGreedyProcessing ();
223                 }
224
225
226                 /// <summary>
227                 /// Creates a tuple from the given values and adds the result to the output queue.
228                 /// </summary>
229                 void TriggerMessage (T1 val1, T2 val2)
230                 {
231                         outgoing.AddData (Tuple.Create (val1, val2));
232
233                         if (dataflowBlockOptions.MaxNumberOfGroups != -1
234                             && Interlocked.Increment (ref numberOfGroups)
235                             >= dataflowBlockOptions.MaxNumberOfGroups)
236                                 Complete ();
237                 }
238
239                 public ITargetBlock<T1> Target1 {
240                         get { return target1; }
241                 }
242
243                 public ITargetBlock<T2> Target2 {
244                         get { return target2; }
245                 }
246
247                 public int OutputCount {
248                         get { return outgoing.Count; }
249                 }
250
251                 public override string ToString ()
252                 {
253                         return NameHelper.GetName (this, dataflowBlockOptions);
254                 }
255         }
256 }