[System] Fixes UdpClient.Receive with IPv6 endpoint
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BroadcastBlock.cs
index 0152f8c34d96ccbd8ca42e471384b6b6a6da43b0..b51d2b37c57dc6db627b4bd7dfce7247cf4370d3 100644 (file)
@@ -1,6 +1,7 @@
 // BroadcastBlock.cs
 //
 // Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
 //
 // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to deal
 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
-//
-//
 
 using System.Collections.Generic;
 using System.Collections.Concurrent;
 
-namespace System.Threading.Tasks.Dataflow
-{
-       public sealed class BroadcastBlock<T> : IPropagatorBlock<T, T>, ITargetBlock<T>, IDataflowBlock, ISourceBlock<T>, IReceivableSourceBlock<T>
-       {
-               static readonly DataflowBlockOptions defaultOptions = new DataflowBlockOptions ();
-
-               CompletionHelper compHelper;
-               BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
-               MessageBox<T> messageBox;
-               MessageVault<T> vault;
-               DataflowBlockOptions dataflowBlockOptions;
-               readonly Func<T, T> cloner;
-               MessageOutgoingQueue<T> outgoing;
-               TargetBuffer<T> targets = new TargetBuffer<T> ();
-               DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
-
-               public BroadcastBlock (Func<T, T> cloner) : this (cloner, defaultOptions)
+namespace System.Threading.Tasks.Dataflow {
+       public sealed class BroadcastBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T> {
+               readonly CompletionHelper compHelper;
+               readonly BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
+               readonly MessageBox<T> messageBox;
+               readonly DataflowBlockOptions dataflowBlockOptions;
+               readonly Func<T, T> cloningFunction;
+               readonly BroadcastOutgoingQueue<T> outgoing;
+
+               public BroadcastBlock (Func<T, T> cloningFunction)
+                       : this (cloningFunction, DataflowBlockOptions.Default)
                {
-
                }
 
-               public BroadcastBlock (Func<T, T> cloner, DataflowBlockOptions dataflowBlockOptions)
+               public BroadcastBlock (Func<T, T> cloningFunction,
+                                      DataflowBlockOptions dataflowBlockOptions)
                {
                        if (dataflowBlockOptions == null)
                                throw new ArgumentNullException ("dataflowBlockOptions");
 
-                       this.cloner = cloner;
+                       this.cloningFunction = cloningFunction;
                        this.dataflowBlockOptions = dataflowBlockOptions;
                        this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
-                       this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper, () => outgoing.IsCompleted, BroadcastProcess, dataflowBlockOptions);
-                       this.outgoing = new MessageOutgoingQueue<T> (compHelper, () => messageQueue.IsCompleted);
-                       this.vault = new MessageVault<T> ();
+                       this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper,
+                               () => outgoing.IsCompleted, _ => BroadcastProcess (), dataflowBlockOptions);
+                       this.outgoing = new BroadcastOutgoingQueue<T> (this, compHelper,
+                               () => messageQueue.IsCompleted, messageBox.DecreaseCount,
+                               dataflowBlockOptions, cloningFunction != null);
                }
 
-               public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
-                                                          T messageValue,
-                                                          ISourceBlock<T> source,
-                                                          bool consumeToAccept)
+               DataflowMessageStatus ITargetBlock<T>.OfferMessage (
+                       DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
+                       bool consumeToAccept)
                {
-                       return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+                       return messageBox.OfferMessage (messageHeader, messageValue, source,
+                               consumeToAccept);
                }
 
-               public IDisposable LinkTo (ITargetBlock<T> target, bool unlinkAfterOne)
+               public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)
                {
-                       return targets.AddTarget (target, unlinkAfterOne);
+                       if (linkOptions == null)
+                               throw new ArgumentNullException("linkOptions");
+
+                       return outgoing.AddTarget (target, linkOptions);
                }
 
-               public T ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
+               T ISourceBlock<T>.ConsumeMessage (DataflowMessageHeader messageHeader,
+                                                 ITargetBlock<T> target,
+                                                 out bool messageConsumed)
                {
-                       return cloner(vault.ConsumeMessage (messageHeader, target, out messageConsumed));
+                       T message = outgoing.ConsumeMessage (
+                               messageHeader, target, out messageConsumed);
+                       if (messageConsumed && cloningFunction != null)
+                               message = cloningFunction (message);
+                       return message;
                }
 
-               public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+               bool ISourceBlock<T>.ReserveMessage (DataflowMessageHeader messageHeader,
+                                                    ITargetBlock<T> target)
                {
-                       vault.ReleaseReservation (messageHeader, target);
+                       return outgoing.ReserveMessage (messageHeader, target);
                }
 
-               public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+               void ISourceBlock<T>.ReleaseReservation (DataflowMessageHeader messageHeader,
+                                                        ITargetBlock<T> target)
                {
-                       return vault.ReserveMessage (messageHeader, target);
+                       outgoing.ReleaseReservation (messageHeader, target);
                }
 
                public bool TryReceive (Predicate<T> filter, out T item)
                {
-                       return outgoing.TryReceive (filter, out item);
+                       var received = outgoing.TryReceive (filter, out item);
+                       if (received && cloningFunction != null)
+                               item = cloningFunction (item);
+                       return received;
                }
 
-               public bool TryReceiveAll (out IList<T> items)
+               bool IReceivableSourceBlock<T>.TryReceiveAll (out IList<T> items)
                {
-                       return outgoing.TryReceiveAll (out items);
+                       T item;
+                       if (!TryReceive (null, out item)) {
+                               items = null;
+                               return false;
+                       }
+
+                       items = new[] { item };
+                       return true;
                }
 
+               /// <summary>
+               /// Moves items from the input queue to the output queue.
+               /// </summary>
                void BroadcastProcess ()
                {
-                       T input;
-
-                       if (!messageQueue.TryTake (out input) || targets.Current == null)
-                               return;
-
-                       foreach (var target in targets) {
-                               DataflowMessageHeader header = headers.Increment ();
-                               if (cloner != null)
-                                       vault.StoreMessage (header, input);
-                               target.OfferMessage (header, input, this, cloner != null);
-                               // TODO: verify if it's the correct semantic
-                               T save = input;
-                               if (!messageQueue.TryTake (out input))
-                                       input = save;
-                       }
+                       T item;
+                       while (messageQueue.TryTake (out item))
+                               outgoing.AddData (item);
                }
 
                public void Complete ()
                {
                        messageBox.Complete ();
+                       outgoing.Complete ();
                }
 
-               public void Fault (Exception ex)
+               void IDataflowBlock.Fault (Exception exception)
                {
-                       compHelper.RequestFault (ex);
+                       compHelper.RequestFault (exception);
                }
 
                public Task Completion {
-                       get {
-                               return compHelper.Completion;
-                       }
+                       get { return compHelper.Completion; }
                }
 
                public override string ToString ()