Added BatchedJoinBlock`3
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BatchedJoinBlock`3.cs
1 // BatchedJoinBlock.cs
2 //
3 // Copyright (c) 2012 Petr Onderka
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 using System.Collections.Generic;
26
27 namespace System.Threading.Tasks.Dataflow {
28         public sealed class BatchedJoinBlock<T1, T2, T3> :
29                 IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> {
30                 GroupingDataflowBlockOptions options;
31
32                 CompletionHelper completionHelper = CompletionHelper.GetNew ();
33                 readonly MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> outgoing;
34
35                 readonly MessageVault<Tuple<IList<T1>, IList<T2>, IList<T3>>> vault =
36                         new MessageVault<Tuple<IList<T1>, IList<T2>, IList<T3>>> ();
37
38                 readonly TargetBuffer<Tuple<IList<T1>, IList<T2>, IList<T3>>> targets =
39                         new TargetBuffer<Tuple<IList<T1>, IList<T2>, IList<T3>>> ();
40
41                 DataflowMessageHeader headers;
42                 SpinLock batchLock;
43
44                 readonly JoinTarget<T1> target1;
45                 readonly JoinTarget<T2> target2;
46                 readonly JoinTarget<T3> target3;
47
48                 int batchCount;
49
50                 public BatchedJoinBlock (int batchSize)
51                         : this (batchSize, GroupingDataflowBlockOptions.Default)
52                 {
53                 }
54
55                 public BatchedJoinBlock (int batchSize,
56                                          GroupingDataflowBlockOptions dataflowBlockOptions)
57                 {
58                         if (batchSize <= 0)
59                                 throw new ArgumentOutOfRangeException (
60                                         "batchSize", batchSize, "The batchSize must be positive.");
61                         if (dataflowBlockOptions == null)
62                                 throw new ArgumentNullException ("dataflowBlockOptions");
63
64                         BatchSize = batchSize;
65                         options = dataflowBlockOptions;
66
67                         target1 = new JoinTarget<T1> (
68                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
69                         target2 = new JoinTarget<T2> (
70                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
71                         target3 = new JoinTarget<T3>(
72                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
73
74                         outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> (
75                                 completionHelper,
76                                 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted || target3.Buffer.IsCompleted);
77                 }
78
79                 public int BatchSize { get; private set; }
80
81                 public ITargetBlock<T1> Target1 {
82                         get { return target1; }
83                 }
84
85                 public ITargetBlock<T2> Target2 {
86                         get { return target2; }
87                 }
88
89                 public ITargetBlock<T3> Target3 {
90                         get { return target3; }
91                 }
92
93                 void SignalTarget ()
94                 {
95                         int current = Interlocked.Increment (ref batchCount);
96
97                         if (current % BatchSize != 0)
98                                 return;
99
100                         Interlocked.Add (ref batchCount, -current);
101
102                         MakeBatch (BatchSize);
103                 }
104
105                 void MakeBatch (int batchSize)
106                 {
107                         var list1 = new List<T1> ();
108                         var list2 = new List<T2> ();
109                         var list3 = new List<T3> ();
110
111                         // lock is necessary here to make sure items are in the correct order
112                         bool taken = false;
113                         try {
114                                 batchLock.Enter (ref taken);
115
116                                 int i = 0;
117
118                                 T1 item1;
119                                 while (i < batchSize && target1.Buffer.TryTake (out item1)) {
120                                         list1.Add (item1);
121                                         i++;
122                                 }
123
124                                 T2 item2;
125                                 while (i < batchSize && target2.Buffer.TryTake (out item2)) {
126                                         list2.Add (item2);
127                                         i++;
128                                 }
129
130                                 T3 item3;
131                                 while (i < batchSize && target3.Buffer.TryTake (out item3)) {
132                                         list3.Add (item3);
133                                         i++;
134                                 }
135
136                                 if (i < batchSize)
137                                         throw new InvalidOperationException ("Unexpected count of items.");
138                         } finally {
139                                 if (taken)
140                                         batchLock.Exit ();
141                         }
142
143                         var batch = Tuple.Create<IList<T1>, IList<T2>, IList<T3>> (list1, list2,
144                                 list3);
145
146                         var target = targets.Current;
147                         if (target == null)
148                                 outgoing.AddData (batch);
149                         else
150                                 target.OfferMessage (headers.Increment (), batch, this, false);
151
152                         if (!outgoing.IsEmpty && targets.Current != null)
153                                 outgoing.ProcessForTarget (targets.Current, this, false, ref headers);
154                 }
155
156                 public Task Completion
157                 {
158                         get { return completionHelper.Completion; }
159                 }
160
161                 public void Complete ()
162                 {
163                         outgoing.Complete ();
164                 }
165
166                 void IDataflowBlock.Fault (Exception exception)
167                 {
168                         completionHelper.Fault (exception);
169                 }
170
171                 Tuple<IList<T1>, IList<T2>, IList<T3>>
172                         ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage (
173                         DataflowMessageHeader messageHeader,
174                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
175                         out bool messageConsumed)
176                 {
177                         return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
178                 }
179
180                 public IDisposable LinkTo (
181                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
182                         bool unlinkAfterOne)
183                 {
184                         var result = targets.AddTarget (target, unlinkAfterOne);
185                         outgoing.ProcessForTarget (target, this, false, ref headers);
186                         return result;
187                 }
188
189                 void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation (
190                         DataflowMessageHeader messageHeader,
191                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
192                 {
193                         vault.ReleaseReservation (messageHeader, target);
194                 }
195
196                 bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage (
197                         DataflowMessageHeader messageHeader,
198                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
199                 {
200                         return vault.ReserveMessage (messageHeader, target);
201                 }
202
203                 public bool TryReceive (
204                         Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>> filter,
205                         out Tuple<IList<T1>, IList<T2>, IList<T3>> item)
206                 {
207                         return outgoing.TryReceive (filter, out item);
208                 }
209
210                 public bool TryReceiveAll (
211                         out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>> items)
212                 {
213                         return outgoing.TryReceiveAll (out items);
214                 }
215         }
216 }