[System] Fixes UdpClient.Receive with IPv6 endpoint
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BatchedJoinBlock.cs
index bd42827a911d0713247dddd3b120771ead18f868..cfafa02c14fa438eda23a4f843daa6f46c7630c8 100644 (file)
 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
-//
-//
 
-using System.Collections.Concurrent;
 using System.Collections.Generic;
 
 namespace System.Threading.Tasks.Dataflow {
        public sealed class BatchedJoinBlock<T1, T2> :
                IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>> {
-               GroupingDataflowBlockOptions options;
+               readonly GroupingDataflowBlockOptions options;
 
-               CompletionHelper completionHelper = CompletionHelper.GetNew();
-               readonly MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
-               readonly MessageVault<Tuple<IList<T1>, IList<T2>>> vault = new MessageVault<Tuple<IList<T1>, IList<T2>>>();
-               readonly TargetBuffer<Tuple<IList<T1>, IList<T2>>> targets = new TargetBuffer<Tuple<IList<T1>, IList<T2>>>();
-               DataflowMessageHeader headers;
+               readonly CompletionHelper completionHelper;
+               readonly OutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
                SpinLock batchLock;
 
                readonly JoinTarget<T1> target1;
                readonly JoinTarget<T2> target2;
 
                int batchCount;
+               long numberOfGroups;
+               SpinLock batchCountLock;
 
                public BatchedJoinBlock (int batchSize)
                        : this (batchSize, GroupingDataflowBlockOptions.Default)
@@ -55,16 +51,33 @@ namespace System.Threading.Tasks.Dataflow {
                                        "batchSize", batchSize, "The batchSize must be positive.");
                        if (dataflowBlockOptions == null)
                                throw new ArgumentNullException ("dataflowBlockOptions");
+                       if (!dataflowBlockOptions.Greedy)
+                               throw new ArgumentException (
+                                       "Greedy must be true for this dataflow block.", "dataflowBlockOptions");
+                       if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
+                               throw new ArgumentException (
+                                       "BoundedCapacity must be Unbounded or -1 for this dataflow block.",
+                                       "dataflowBlockOptions");
 
                        BatchSize = batchSize;
                        options = dataflowBlockOptions;
-
-                       target1 = new JoinTarget<T1> (this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
-                       target2 = new JoinTarget<T2> (this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
-
-                       outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
-                               completionHelper,
-                               () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
+                       completionHelper = CompletionHelper.GetNew (dataflowBlockOptions);
+
+                       target1 = new JoinTarget<T1> (
+                               this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+                               dataflowBlockOptions, true, TryAdd);
+                       target2 = new JoinTarget<T2> (
+                               this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+                               dataflowBlockOptions, true, TryAdd);
+
+                       outgoing = new OutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
+                               this, completionHelper,
+                               () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
+                               _ =>
+                               {
+                                       target1.DecreaseCount ();
+                                       target2.DecreaseCount ();
+                               }, options);
                }
 
                public int BatchSize { get; private set; }
@@ -77,20 +90,57 @@ namespace System.Threading.Tasks.Dataflow {
                        get { return target2; }
                }
 
-               private void SignalTarget()
+               /// <summary>
+               /// Returns whether a new item can be accepted, and increments a counter if it can.
+               /// </summary>
+               bool TryAdd ()
                {
-                       int current = Interlocked.Increment (ref batchCount);
+                       bool lockTaken = false;
+                       try {
+                               batchCountLock.Enter (ref lockTaken);
 
-                       if (current % BatchSize != 0)
-                               return;
+                               if (options.MaxNumberOfGroups != -1
+                                   && numberOfGroups + batchCount / BatchSize >= options.MaxNumberOfGroups)
+                                       return false;
 
-                       Interlocked.Add (ref batchCount, -current);
+                               batchCount++;
+                               return true;
+                       } finally {
+                               if (lockTaken)
+                                       batchCountLock.Exit();
+                       }
+               }
+
+               /// <summary>
+               /// Decides whether to create a new batch or not.
+               /// </summary>
+               void SignalTarget ()
+               {
+                       bool lockTaken = false;
+                       try {
+                               batchCountLock.Enter (ref lockTaken);
+
+                               if (batchCount < BatchSize)
+                                       return;
+
+                               batchCount -= BatchSize;
+                               numberOfGroups++;
+                       } finally {
+                               if (lockTaken)
+                                       batchCountLock.Exit();
+                       }
 
                        MakeBatch (BatchSize);
                }
 
+               /// <summary>
+               /// Creates a batch of the given size and adds the resulting batch to the output queue.
+               /// </summary>
                void MakeBatch (int batchSize)
                {
+                       if (batchSize == 0)
+                               return;
+
                        var list1 = new List<T1> ();
                        var list2 = new List<T2> ();
                        
@@ -122,14 +172,34 @@ namespace System.Threading.Tasks.Dataflow {
 
                        var batch = Tuple.Create<IList<T1>, IList<T2>> (list1, list2);
 
-                       var target = targets.Current;
-                       if (target == null)
-                               outgoing.AddData(batch);
-                       else
-                               target.OfferMessage (headers.Increment (), batch, this, false);
+                       outgoing.AddData (batch);
 
-                       if (!outgoing.IsEmpty && targets.Current != null)
-                               outgoing.ProcessForTarget (targets.Current, this, false, ref headers);
+                       VerifyMaxNumberOfGroups ();
+               }
+
+               /// <summary>
+               /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+               /// has been reached. If it did, <see cref="Complete"/>s the block.
+               /// </summary>
+               void VerifyMaxNumberOfGroups ()
+               {
+                       if (options.MaxNumberOfGroups == -1)
+                               return;
+
+                       bool shouldComplete;
+
+                       bool lockTaken = false;
+                       try {
+                               batchCountLock.Enter (ref lockTaken);
+
+                               shouldComplete = numberOfGroups >= options.MaxNumberOfGroups;
+                       } finally {
+                               if (lockTaken)
+                                       batchCountLock.Exit ();
+                       }
+
+                       if (shouldComplete)
+                               Complete ();
                }
 
                public Task Completion {
@@ -138,12 +208,15 @@ namespace System.Threading.Tasks.Dataflow {
 
                public void Complete ()
                {
+                       target1.Complete ();
+                       target2.Complete ();
+                       MakeBatch (batchCount);
                        outgoing.Complete ();
                }
 
                void IDataflowBlock.Fault (Exception exception)
                {
-                       completionHelper.Fault (exception);
+                       completionHelper.RequestFault (exception);
                }
 
                Tuple<IList<T1>, IList<T2>> ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage (
@@ -151,29 +224,27 @@ namespace System.Threading.Tasks.Dataflow {
                        ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
                        out bool messageConsumed)
                {
-                       return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+                       return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
                }
 
                public IDisposable LinkTo (ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
-                                          bool unlinkAfterOne)
+                                          DataflowLinkOptions linkOptions)
                {
-                       var result = targets.AddTarget(target, unlinkAfterOne);
-                       outgoing.ProcessForTarget(target, this, false, ref headers);
-                       return result;
+                       return outgoing.AddTarget(target, linkOptions);
                }
 
                void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation (
                        DataflowMessageHeader messageHeader,
                        ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
                {
-                       vault.ReleaseReservation (messageHeader, target);
+                       outgoing.ReleaseReservation (messageHeader, target);
                }
 
                bool ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage (
                        DataflowMessageHeader messageHeader,
                        ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
                {
-                       return vault.ReserveMessage (messageHeader, target);
+                       return outgoing.ReserveMessage (messageHeader, target);
                }
 
                public bool TryReceive (Predicate<Tuple<IList<T1>, IList<T2>>> filter,
@@ -186,5 +257,14 @@ namespace System.Threading.Tasks.Dataflow {
                {
                        return outgoing.TryReceiveAll (out items);
                }
+
+               public int OutputCount {
+                       get { return outgoing.Count; }
+               }
+
+               public override string ToString ()
+               {
+                       return NameHelper.GetName (this, options);
+               }
        }
 }
\ No newline at end of file