425c6d137def3caad3cc0ab5bcc636e6b1c91cbd
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BatchedJoinBlock`3.cs
1 // BatchedJoinBlock.cs
2 //
3 // Copyright (c) 2012 Petr Onderka
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 using System.Collections.Generic;
26
27 namespace System.Threading.Tasks.Dataflow {
28         public sealed class BatchedJoinBlock<T1, T2, T3> :
29                 IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> {
30                 readonly GroupingDataflowBlockOptions options;
31
32                 CompletionHelper completionHelper;
33                 readonly MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> outgoing;
34
35                 readonly MessageVault<Tuple<IList<T1>, IList<T2>, IList<T3>>> vault =
36                         new MessageVault<Tuple<IList<T1>, IList<T2>, IList<T3>>> ();
37
38                 readonly TargetBuffer<Tuple<IList<T1>, IList<T2>, IList<T3>>> targets =
39                         new TargetBuffer<Tuple<IList<T1>, IList<T2>, IList<T3>>> ();
40
41                 DataflowMessageHeader headers;
42                 SpinLock batchLock;
43
44                 readonly JoinTarget<T1> target1;
45                 readonly JoinTarget<T2> target2;
46                 readonly JoinTarget<T3> target3;
47
48                 int batchCount;
49
50                 public BatchedJoinBlock (int batchSize)
51                         : this (batchSize, GroupingDataflowBlockOptions.Default)
52                 {
53                 }
54
55                 public BatchedJoinBlock (int batchSize,
56                                          GroupingDataflowBlockOptions dataflowBlockOptions)
57                 {
58                         if (batchSize <= 0)
59                                 throw new ArgumentOutOfRangeException (
60                                         "batchSize", batchSize, "The batchSize must be positive.");
61                         if (dataflowBlockOptions == null)
62                                 throw new ArgumentNullException ("dataflowBlockOptions");
63
64                         BatchSize = batchSize;
65                         options = dataflowBlockOptions;
66                         completionHelper = CompletionHelper.GetNew (options);
67
68                         target1 = new JoinTarget<T1> (
69                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
70                         target2 = new JoinTarget<T2> (
71                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
72                         target3 = new JoinTarget<T3>(
73                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
74
75                         outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> (
76                                 completionHelper,
77                                 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted || target3.Buffer.IsCompleted);
78                 }
79
80                 public int BatchSize { get; private set; }
81
82                 public ITargetBlock<T1> Target1 {
83                         get { return target1; }
84                 }
85
86                 public ITargetBlock<T2> Target2 {
87                         get { return target2; }
88                 }
89
90                 public ITargetBlock<T3> Target3 {
91                         get { return target3; }
92                 }
93
94                 void SignalTarget ()
95                 {
96                         int current = Interlocked.Increment (ref batchCount);
97
98                         if (current % BatchSize != 0)
99                                 return;
100
101                         Interlocked.Add (ref batchCount, -current);
102
103                         MakeBatch (BatchSize);
104                 }
105
106                 void MakeBatch (int batchSize)
107                 {
108                         var list1 = new List<T1> ();
109                         var list2 = new List<T2> ();
110                         var list3 = new List<T3> ();
111
112                         // lock is necessary here to make sure items are in the correct order
113                         bool taken = false;
114                         try {
115                                 batchLock.Enter (ref taken);
116
117                                 int i = 0;
118
119                                 T1 item1;
120                                 while (i < batchSize && target1.Buffer.TryTake (out item1)) {
121                                         list1.Add (item1);
122                                         i++;
123                                 }
124
125                                 T2 item2;
126                                 while (i < batchSize && target2.Buffer.TryTake (out item2)) {
127                                         list2.Add (item2);
128                                         i++;
129                                 }
130
131                                 T3 item3;
132                                 while (i < batchSize && target3.Buffer.TryTake (out item3)) {
133                                         list3.Add (item3);
134                                         i++;
135                                 }
136
137                                 if (i < batchSize)
138                                         throw new InvalidOperationException ("Unexpected count of items.");
139                         } finally {
140                                 if (taken)
141                                         batchLock.Exit ();
142                         }
143
144                         var batch = Tuple.Create<IList<T1>, IList<T2>, IList<T3>> (list1, list2,
145                                 list3);
146
147                         var target = targets.Current;
148                         if (target == null)
149                                 outgoing.AddData (batch);
150                         else
151                                 target.OfferMessage (headers.Increment (), batch, this, false);
152
153                         if (!outgoing.IsEmpty && targets.Current != null)
154                                 outgoing.ProcessForTarget (targets.Current, this, false, ref headers);
155                 }
156
157                 public Task Completion
158                 {
159                         get { return completionHelper.Completion; }
160                 }
161
162                 public void Complete ()
163                 {
164                         outgoing.Complete ();
165                 }
166
167                 void IDataflowBlock.Fault (Exception exception)
168                 {
169                         completionHelper.RequestFault (exception);
170                 }
171
172                 Tuple<IList<T1>, IList<T2>, IList<T3>>
173                         ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage (
174                         DataflowMessageHeader messageHeader,
175                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
176                         out bool messageConsumed)
177                 {
178                         return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
179                 }
180
181                 public IDisposable LinkTo (
182                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
183                         bool unlinkAfterOne)
184                 {
185                         var result = targets.AddTarget (target, unlinkAfterOne);
186                         outgoing.ProcessForTarget (target, this, false, ref headers);
187                         return result;
188                 }
189
190                 void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation (
191                         DataflowMessageHeader messageHeader,
192                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
193                 {
194                         vault.ReleaseReservation (messageHeader, target);
195                 }
196
197                 bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage (
198                         DataflowMessageHeader messageHeader,
199                         ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
200                 {
201                         return vault.ReserveMessage (messageHeader, target);
202                 }
203
204                 public bool TryReceive (
205                         Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>> filter,
206                         out Tuple<IList<T1>, IList<T2>, IList<T3>> item)
207                 {
208                         return outgoing.TryReceive (filter, out item);
209                 }
210
211                 public bool TryReceiveAll (
212                         out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>> items)
213                 {
214                         return outgoing.TryReceiveAll (out items);
215                 }
216
217                 public override string ToString ()
218                 {
219                         return NameHelper.GetName (this, options);
220                 }
221         }
222 }