Verify arguments in public methods
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / TransformBlock.cs
index 69cbc789c7aee79dc828917dc6822180260b7d11..cab66733112cc542f45d03c05907b4f6a4db4e39 100644 (file)
 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
-//
-//
-
 
-using System;
-using System.Threading.Tasks;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
 
 namespace System.Threading.Tasks.Dataflow
 {
        public sealed class TransformBlock<TInput, TOutput> :
-               IPropagatorBlock<TInput, TOutput>, ITargetBlock<TInput>, IDataflowBlock, ISourceBlock<TOutput>, IReceivableSourceBlock<TOutput>
+               IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>
        {
                static readonly ExecutionDataflowBlockOptions defaultOptions = new ExecutionDataflowBlockOptions ();
 
-               ExecutionDataflowBlockOptions dataflowBlockOptions;
-               CompletionHelper compHelper = CompletionHelper.GetNew ();
-               BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
-               MessageBox<TInput> messageBox;
-               MessageVault<TOutput> vault;
-               MessageOutgoingQueue<TOutput> outgoing;
-               TargetBuffer<TOutput> targets = new TargetBuffer<TOutput> ();
-               DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
+               readonly ExecutionDataflowBlockOptions dataflowBlockOptions;
+               readonly CompletionHelper compHelper;
+               readonly BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
+               readonly MessageBox<TInput> messageBox;
+               readonly MessageOutgoingQueue<TOutput> outgoing;
                readonly Func<TInput, TOutput> transformer;
 
                public TransformBlock (Func<TInput, TOutput> transformer) : this (transformer, defaultOptions)
@@ -52,18 +44,21 @@ namespace System.Threading.Tasks.Dataflow
 
                public TransformBlock (Func<TInput, TOutput> transformer, ExecutionDataflowBlockOptions dataflowBlockOptions)
                {
+                       if (transformer == null)
+                               throw new ArgumentNullException("transformer");
                        if (dataflowBlockOptions == null)
                                throw new ArgumentNullException ("dataflowBlockOptions");
 
                        this.transformer = transformer;
                        this.dataflowBlockOptions = dataflowBlockOptions;
-                       this.messageBox = new ExecutingMessageBox<TInput> (messageQueue,
-                                                                          compHelper,
-                                                                          () => outgoing.IsCompleted,
-                                                                          TransformProcess,
-                                                                          dataflowBlockOptions);
-                       this.outgoing = new MessageOutgoingQueue<TOutput> (compHelper, () => messageQueue.IsCompleted);
-                       this.vault = new MessageVault<TOutput> ();
+                       this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
+                       this.messageBox = new ExecutingMessageBox<TInput> (
+                               this, messageQueue, compHelper,
+                               () => outgoing.IsCompleted, TransformProcess, () => outgoing.Complete (),
+                               dataflowBlockOptions);
+                       this.outgoing = new MessageOutgoingQueue<TOutput> (this, compHelper,
+                               () => messageQueue.IsCompleted, messageBox.DecreaseCount,
+                               dataflowBlockOptions);
                }
 
                public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
@@ -71,29 +66,27 @@ namespace System.Threading.Tasks.Dataflow
                                                           ISourceBlock<TInput> source,
                                                           bool consumeToAccept)
                {
-                       return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+                       return messageBox.OfferMessage (messageHeader, messageValue, source, consumeToAccept);
                }
 
-               public IDisposable LinkTo (ITargetBlock<TOutput> target, bool unlinkAfterOne)
+               public IDisposable LinkTo (ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
                {
-                       var result = targets.AddTarget (target, unlinkAfterOne);
-                       outgoing.ProcessForTarget (target, this, false, ref headers);
-                       return result;
+                       return outgoing.AddTarget (target, linkOptions);
                }
 
                public TOutput ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed)
                {
-                       return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+                       return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
                }
 
                public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
                {
-                       vault.ReleaseReservation (messageHeader, target);
+                       outgoing.ReleaseReservation (messageHeader, target);
                }
 
                public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
                {
-                       return vault.ReserveMessage (messageHeader, target);
+                       return outgoing.ReserveMessage (messageHeader, target);
                }
 
                public bool TryReceive (Predicate<TOutput> filter, out TOutput item)
@@ -106,22 +99,15 @@ namespace System.Threading.Tasks.Dataflow
                        return outgoing.TryReceiveAll (out items);
                }
 
-               void TransformProcess ()
+               bool TransformProcess ()
                {
-                       ITargetBlock<TOutput> target;
                        TInput input;
 
-                       while (messageQueue.TryTake (out input)) {
-                               TOutput output = transformer (input);
-
-                               if ((target = targets.Current) != null)
-                                       target.OfferMessage (headers.Increment (), output, this, false);
-                               else
-                                       outgoing.AddData (output);
-                       }
+                       var dequeued = messageQueue.TryTake (out input);
+                       if (dequeued)
+                               outgoing.AddData (transformer (input));
 
-                       if (!outgoing.IsEmpty && (target = targets.Current) != null)
-                               outgoing.ProcessForTarget (target, this, false, ref headers);
+                       return dequeued;
                }
 
                public void Complete ()
@@ -129,9 +115,9 @@ namespace System.Threading.Tasks.Dataflow
                        messageBox.Complete ();
                }
 
-               public void Fault (Exception ex)
+               public void Fault (Exception exception)
                {
-                       compHelper.Fault (ex);
+                       compHelper.RequestFault (exception);
                }
 
                public Task Completion {
@@ -151,6 +137,10 @@ namespace System.Threading.Tasks.Dataflow
                                return messageQueue.Count;
                        }
                }
-       }
-}
 
+               public override string ToString ()
+               {
+                       return NameHelper.GetName (this, dataflowBlockOptions);
+               }
+       }
+}
\ No newline at end of file