Corrected sending messages to targets in most blocks
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BatchedJoinBlock.cs
1 // BatchedJoinBlock.cs
2 //
3 // Copyright (c) 2012 Petr Onderka
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 using System.Collections.Generic;
26
27 namespace System.Threading.Tasks.Dataflow {
28         public sealed class BatchedJoinBlock<T1, T2> :
29                 IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>> {
30                 GroupingDataflowBlockOptions options;
31
32                 CompletionHelper completionHelper;
33                 readonly MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
34                 DataflowMessageHeader headers;
35                 SpinLock batchLock;
36
37                 readonly JoinTarget<T1> target1;
38                 readonly JoinTarget<T2> target2;
39
40                 int batchCount;
41
42                 public BatchedJoinBlock (int batchSize)
43                         : this (batchSize, GroupingDataflowBlockOptions.Default)
44                 {
45                 }
46
47                 public BatchedJoinBlock (int batchSize,
48                                          GroupingDataflowBlockOptions dataflowBlockOptions)
49                 {
50                         if (batchSize <= 0)
51                                 throw new ArgumentOutOfRangeException (
52                                         "batchSize", batchSize, "The batchSize must be positive.");
53                         if (dataflowBlockOptions == null)
54                                 throw new ArgumentNullException ("dataflowBlockOptions");
55
56                         BatchSize = batchSize;
57                         options = dataflowBlockOptions;
58                         completionHelper = CompletionHelper.GetNew (dataflowBlockOptions);
59
60                         target1 = new JoinTarget<T1> (
61                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
62                         target2 = new JoinTarget<T2> (
63                                 this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
64
65                         outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
66                                 this, completionHelper,
67                                 () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted, options);
68                 }
69
70                 public int BatchSize { get; private set; }
71
72                 public ITargetBlock<T1> Target1 {
73                         get { return target1; }
74                 }
75
76                 public ITargetBlock<T2> Target2 {
77                         get { return target2; }
78                 }
79
80                 private void SignalTarget()
81                 {
82                         int current = Interlocked.Increment (ref batchCount);
83
84                         if (current % BatchSize != 0)
85                                 return;
86
87                         Interlocked.Add (ref batchCount, -current);
88
89                         MakeBatch (BatchSize);
90                 }
91
92                 void MakeBatch (int batchSize)
93                 {
94                         var list1 = new List<T1> ();
95                         var list2 = new List<T2> ();
96                         
97                         // lock is necessary here to make sure items are in the correct order
98                         bool taken = false;
99                         try {
100                                 batchLock.Enter (ref taken);
101
102                                 int i = 0;
103
104                                 T1 item1;
105                                 while (i < batchSize && target1.Buffer.TryTake (out item1)) {
106                                         list1.Add (item1);
107                                         i++;
108                                 }
109
110                                 T2 item2;
111                                 while (i < batchSize && target2.Buffer.TryTake (out item2)) {
112                                         list2.Add (item2);
113                                         i++;
114                                 }
115
116                                 if (i < batchSize)
117                                         throw new InvalidOperationException("Unexpected count of items.");
118                         } finally {
119                                 if (taken)
120                                         batchLock.Exit ();
121                         }
122
123                         var batch = Tuple.Create<IList<T1>, IList<T2>> (list1, list2);
124
125                         outgoing.AddData (batch);
126                 }
127
128                 public Task Completion {
129                         get { return completionHelper.Completion; }
130                 }
131
132                 public void Complete ()
133                 {
134                         outgoing.Complete ();
135                 }
136
137                 void IDataflowBlock.Fault (Exception exception)
138                 {
139                         completionHelper.RequestFault (exception);
140                 }
141
142                 Tuple<IList<T1>, IList<T2>> ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage (
143                         DataflowMessageHeader messageHeader,
144                         ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
145                         out bool messageConsumed)
146                 {
147                         return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
148                 }
149
150                 public IDisposable LinkTo (ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
151                                            DataflowLinkOptions linkOptions)
152                 {
153                         return outgoing.AddTarget(target, linkOptions);
154                 }
155
156                 void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation (
157                         DataflowMessageHeader messageHeader,
158                         ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
159                 {
160                         outgoing.ReleaseReservation (messageHeader, target);
161                 }
162
163                 bool ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage (
164                         DataflowMessageHeader messageHeader,
165                         ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
166                 {
167                         return outgoing.ReserveMessage (messageHeader, target);
168                 }
169
170                 public bool TryReceive (Predicate<Tuple<IList<T1>, IList<T2>>> filter,
171                                         out Tuple<IList<T1>, IList<T2>> item)
172                 {
173                         return outgoing.TryReceive (filter, out item);
174                 }
175
176                 public bool TryReceiveAll (out IList<Tuple<IList<T1>, IList<T2>>> items)
177                 {
178                         return outgoing.TryReceiveAll (out items);
179                 }
180
181                 public override string ToString ()
182                 {
183                         return NameHelper.GetName (this, options);
184                 }
185         }
186 }