[System] Fixes UdpClient.Receive with IPv6 endpoint
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BatchedJoinBlock.cs
index 9778169ea4a91ec85e7680687df6009c2b7636d0..cfafa02c14fa438eda23a4f843daa6f46c7630c8 100644 (file)
 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
-//
-//
 
 using System.Collections.Generic;
 
 namespace System.Threading.Tasks.Dataflow {
        public sealed class BatchedJoinBlock<T1, T2> :
                IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>> {
-               GroupingDataflowBlockOptions options;
+               readonly GroupingDataflowBlockOptions options;
 
-               CompletionHelper completionHelper;
-               readonly MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
-               DataflowMessageHeader headers;
+               readonly CompletionHelper completionHelper;
+               readonly OutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
                SpinLock batchLock;
 
                readonly JoinTarget<T1> target1;
                readonly JoinTarget<T2> target2;
 
                int batchCount;
+               long numberOfGroups;
+               SpinLock batchCountLock;
 
                public BatchedJoinBlock (int batchSize)
                        : this (batchSize, GroupingDataflowBlockOptions.Default)
@@ -52,19 +51,33 @@ namespace System.Threading.Tasks.Dataflow {
                                        "batchSize", batchSize, "The batchSize must be positive.");
                        if (dataflowBlockOptions == null)
                                throw new ArgumentNullException ("dataflowBlockOptions");
+                       if (!dataflowBlockOptions.Greedy)
+                               throw new ArgumentException (
+                                       "Greedy must be true for this dataflow block.", "dataflowBlockOptions");
+                       if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
+                               throw new ArgumentException (
+                                       "BoundedCapacity must be Unbounded or -1 for this dataflow block.",
+                                       "dataflowBlockOptions");
 
                        BatchSize = batchSize;
                        options = dataflowBlockOptions;
                        completionHelper = CompletionHelper.GetNew (dataflowBlockOptions);
 
                        target1 = new JoinTarget<T1> (
-                               this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+                               this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+                               dataflowBlockOptions, true, TryAdd);
                        target2 = new JoinTarget<T2> (
-                               this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+                               this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+                               dataflowBlockOptions, true, TryAdd);
 
-                       outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
+                       outgoing = new OutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
                                this, completionHelper,
-                               () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted, options);
+                               () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
+                               _ =>
+                               {
+                                       target1.DecreaseCount ();
+                                       target2.DecreaseCount ();
+                               }, options);
                }
 
                public int BatchSize { get; private set; }
@@ -77,20 +90,57 @@ namespace System.Threading.Tasks.Dataflow {
                        get { return target2; }
                }
 
-               private void SignalTarget()
+               /// <summary>
+               /// Returns whether a new item can be accepted, and increments a counter if it can.
+               /// </summary>
+               bool TryAdd ()
                {
-                       int current = Interlocked.Increment (ref batchCount);
+                       bool lockTaken = false;
+                       try {
+                               batchCountLock.Enter (ref lockTaken);
 
-                       if (current % BatchSize != 0)
-                               return;
+                               if (options.MaxNumberOfGroups != -1
+                                   && numberOfGroups + batchCount / BatchSize >= options.MaxNumberOfGroups)
+                                       return false;
 
-                       Interlocked.Add (ref batchCount, -current);
+                               batchCount++;
+                               return true;
+                       } finally {
+                               if (lockTaken)
+                                       batchCountLock.Exit();
+                       }
+               }
+
+               /// <summary>
+               /// Decides whether to create a new batch or not.
+               /// </summary>
+               void SignalTarget ()
+               {
+                       bool lockTaken = false;
+                       try {
+                               batchCountLock.Enter (ref lockTaken);
+
+                               if (batchCount < BatchSize)
+                                       return;
+
+                               batchCount -= BatchSize;
+                               numberOfGroups++;
+                       } finally {
+                               if (lockTaken)
+                                       batchCountLock.Exit();
+                       }
 
                        MakeBatch (BatchSize);
                }
 
+               /// <summary>
+               /// Creates a batch of the given size and adds the resulting batch to the output queue.
+               /// </summary>
                void MakeBatch (int batchSize)
                {
+                       if (batchSize == 0)
+                               return;
+
                        var list1 = new List<T1> ();
                        var list2 = new List<T2> ();
                        
@@ -123,6 +173,33 @@ namespace System.Threading.Tasks.Dataflow {
                        var batch = Tuple.Create<IList<T1>, IList<T2>> (list1, list2);
 
                        outgoing.AddData (batch);
+
+                       VerifyMaxNumberOfGroups ();
+               }
+
+               /// <summary>
+               /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+               /// has been reached. If it did, <see cref="Complete"/>s the block.
+               /// </summary>
+               void VerifyMaxNumberOfGroups ()
+               {
+                       if (options.MaxNumberOfGroups == -1)
+                               return;
+
+                       bool shouldComplete;
+
+                       bool lockTaken = false;
+                       try {
+                               batchCountLock.Enter (ref lockTaken);
+
+                               shouldComplete = numberOfGroups >= options.MaxNumberOfGroups;
+                       } finally {
+                               if (lockTaken)
+                                       batchCountLock.Exit ();
+                       }
+
+                       if (shouldComplete)
+                               Complete ();
                }
 
                public Task Completion {
@@ -131,6 +208,9 @@ namespace System.Threading.Tasks.Dataflow {
 
                public void Complete ()
                {
+                       target1.Complete ();
+                       target2.Complete ();
+                       MakeBatch (batchCount);
                        outgoing.Complete ();
                }
 
@@ -178,6 +258,10 @@ namespace System.Threading.Tasks.Dataflow {
                        return outgoing.TryReceiveAll (out items);
                }
 
+               public int OutputCount {
+                       get { return outgoing.Count; }
+               }
+
                public override string ToString ()
                {
                        return NameHelper.GetName (this, options);