Refactored code related to JoinTarget
authorPetr Onderka <gsvick@gmail.com>
Mon, 25 Jun 2012 17:28:47 +0000 (19:28 +0200)
committerPetr Onderka <gsvick@gmail.com>
Sun, 19 Aug 2012 21:36:18 +0000 (23:36 +0200)
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs [new file with mode: 0644]
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs

index 10eb9161b398cf1d1999765fb3cbd2714d30a9f1..65ee4b5eb6772a9f9f44eedc68bda324c731e880 100644 (file)
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>\r
+<?xml version="1.0" encoding="utf-8"?>\r
 <Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">\r
   <PropertyGroup>\r
     <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>\r
@@ -58,6 +58,7 @@
    <Compile Include="System.Threading.Tasks.Dataflow\ISourceBlock.cs" />
    <Compile Include="System.Threading.Tasks.Dataflow\ITargetBlock.cs" />
    <Compile Include="System.Threading.Tasks.Dataflow\CompletionHelper.cs" />
+    <Compile Include="System.Threading.Tasks.Dataflow\JoinTarget.cs" />\r
    <Compile Include="System.Threading.Tasks.Dataflow\MessageBox.cs" />
    <Compile Include="System.Threading.Tasks.Dataflow\MessageOutgoingQueue.cs" />
    <Compile Include="System.Threading.Tasks.Dataflow\MessageVault.cs" />
   </ItemGroup>\r
   \r
 </Project>\r
-
index 3fc6b8f2c7d3c3e3a3fa8b8bdae676f2281e5a60..67963e9760919e4967908bf0a192ae3f6de0f0ba 100644 (file)
@@ -26,7 +26,6 @@
 using System;
 using System.Threading.Tasks;
 using System.Collections.Generic;
-using System.Collections.Concurrent;
 
 namespace System.Threading.Tasks.Dataflow
 {
@@ -40,8 +39,8 @@ namespace System.Threading.Tasks.Dataflow
                MessageVault<Tuple<T1, T2>> vault = new MessageVault<Tuple<T1, T2>> ();
                MessageOutgoingQueue<Tuple<T1, T2>> outgoing;
 
-               JoinTarget<T1> target1;
-               JoinTarget<T2> target2;
+               readonly JoinTarget<T1> target1;
+               readonly JoinTarget<T2> target2;
 
                DataflowMessageHeader headers;
 
@@ -56,9 +55,9 @@ namespace System.Threading.Tasks.Dataflow
                                throw new ArgumentNullException ("dataflowBlockOptions");
 
                        this.dataflowBlockOptions = dataflowBlockOptions;
-                       this.target1 = new JoinTarget<T1> (this, SignalArrivalTarget1, new BlockingCollection<T1> (), compHelper);
-                       this.target2 = new JoinTarget<T2> (this, SignalArrivalTarget2, new BlockingCollection<T2> (), compHelper);
-                       this.outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (compHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
+                       target1 = new JoinTarget<T1> (this, SignalArrivalTarget1, compHelper, () => outgoing.IsCompleted);
+                       target2 = new JoinTarget<T2> (this, SignalArrivalTarget2, compHelper, () => outgoing.IsCompleted);
+                       outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (compHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
                }
 
                public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, bool unlinkAfterOne)
@@ -145,56 +144,6 @@ namespace System.Threading.Tasks.Dataflow
                                outgoing.ProcessForTarget (target, this, false, ref headers);
                }
 
-               class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
-               {
-                       JoinBlock<T1, T2> joinBlock;
-                       BlockingCollection<TTarget> buffer;
-                       Action signal;
-
-                       public JoinTarget (JoinBlock<T1, T2> joinBlock, Action signal, BlockingCollection<TTarget> buffer, CompletionHelper helper)
-                       : base (buffer, helper, () => joinBlock.outgoing.IsCompleted)
-                       {
-                               this.joinBlock = joinBlock;
-                               this.buffer = buffer;
-                               this.signal = signal;
-                       }
-
-                       protected override void EnsureProcessing ()
-                       {
-                               signal ();
-                       }
-
-                       public BlockingCollection<TTarget> Buffer {
-                               get {
-                                       return buffer;
-                               }
-                       }
-
-                       DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
-                                                                                 TTarget messageValue,
-                                                                                 ISourceBlock<TTarget> source,
-                                                                                 bool consumeToAccept)
-                       {
-                               return OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
-                       }
-
-                       void IDataflowBlock.Complete ()
-                       {
-                               Complete ();
-                       }
-
-                       Task IDataflowBlock.Completion {
-                               get {
-                                       return joinBlock.Completion;
-                               }
-                       }
-
-                       void IDataflowBlock.Fault (Exception e)
-                       {
-                               joinBlock.Fault (e);
-                       }
-               }
-
                public ITargetBlock<T1> Target1 {
                        get {
                                return target1;
index db6487f90aeaf8faa56874a169d6c31df9430493..ad72232e18ba006ea87706351e2d56c11f1b588f 100644 (file)
@@ -26,7 +26,6 @@
 using System;
 using System.Threading.Tasks;
 using System.Collections.Generic;
-using System.Collections.Concurrent;
 
 namespace System.Threading.Tasks.Dataflow
 {
@@ -40,9 +39,9 @@ namespace System.Threading.Tasks.Dataflow
                MessageVault<Tuple<T1, T2, T3>> vault = new MessageVault<Tuple<T1, T2, T3>> ();
                MessageOutgoingQueue<Tuple<T1, T2, T3>> outgoing;
 
-               JoinTarget<T1> target1;
-               JoinTarget<T2> target2;
-               JoinTarget<T3> target3;
+               readonly JoinTarget<T1> target1;
+               readonly JoinTarget<T2> target2;
+               readonly JoinTarget<T3> target3;
 
                SpinLock targetLock = new SpinLock (false);
 
@@ -64,10 +63,11 @@ namespace System.Threading.Tasks.Dataflow
                        Func<bool> checker2 = () => target1.Buffer.Count == 0 || target3.Buffer.Count == 0;
                        Func<bool> checker3 = () => target1.Buffer.Count == 0 || target2.Buffer.Count == 0;
 
-                       this.target1 = new JoinTarget<T1> (this, () => SignalArrivalTargetImpl (checker1), new BlockingCollection<T1> (), compHelper);
-                       this.target2 = new JoinTarget<T2> (this, () => SignalArrivalTargetImpl (checker2), new BlockingCollection<T2> (), compHelper);
-                       this.target3 = new JoinTarget<T3> (this, () => SignalArrivalTargetImpl (checker3), new BlockingCollection<T3> (), compHelper);
-                       this.outgoing =
+                       Func<bool> joinTargetCompleteTester = () => outgoing.IsCompleted;
+                       target1 = new JoinTarget<T1> (this, () => SignalArrivalTargetImpl (checker1), compHelper, joinTargetCompleteTester);
+                       target2 = new JoinTarget<T2> (this, () => SignalArrivalTargetImpl (checker2), compHelper, joinTargetCompleteTester);
+                       target3 = new JoinTarget<T3> (this, () => SignalArrivalTargetImpl (checker3), compHelper, joinTargetCompleteTester);
+                       outgoing =
                                new MessageOutgoingQueue<Tuple<T1, T2, T3>> (compHelper,
                                                                             () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted || target3.Buffer.IsCompleted);
                }
@@ -161,56 +161,6 @@ namespace System.Threading.Tasks.Dataflow
                                outgoing.ProcessForTarget (target, this, false, ref headers);
                }
 
-               class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
-               {
-                       JoinBlock<T1, T2, T3> joinBlock;
-                       BlockingCollection<TTarget> buffer;
-                       Action signal;
-
-                       public JoinTarget (JoinBlock<T1, T2, T3> joinBlock, Action signal, BlockingCollection<TTarget> buffer, CompletionHelper helper)
-                       : base (buffer, helper, () => joinBlock.outgoing.IsCompleted)
-                       {
-                               this.joinBlock = joinBlock;
-                               this.buffer = buffer;
-                               this.signal = signal;
-                       }
-
-                       protected override void EnsureProcessing ()
-                       {
-                               signal ();
-                       }
-
-                       public BlockingCollection<TTarget> Buffer {
-                               get {
-                                       return buffer;
-                               }
-                       }
-
-                       DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
-                                                                                 TTarget messageValue,
-                                                                                 ISourceBlock<TTarget> source,
-                                                                                 bool consumeToAccept)
-                       {
-                               return OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
-                       }
-
-                       void IDataflowBlock.Complete ()
-                       {
-                               Complete ();
-                       }
-
-                       Task IDataflowBlock.Completion {
-                               get {
-                                       return joinBlock.Completion;
-                               }
-                       }
-
-                       void IDataflowBlock.Fault (Exception e)
-                       {
-                               joinBlock.Fault (e);
-                       }
-               }
-
                public ITargetBlock<T1> Target1 {
                        get {
                                return target1;
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs
new file mode 100644 (file)
index 0000000..1d6118b
--- /dev/null
@@ -0,0 +1,76 @@
+// JoinBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow {
+       internal class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
+       {
+               readonly IDataflowBlock joinBlock;
+               readonly Action signal;
+
+               public JoinTarget (IDataflowBlock joinBlock, Action signal, CompletionHelper helper,
+                                  Func<bool> externalCompleteTester)
+                       : base (new BlockingCollection<TTarget> (), helper, externalCompleteTester)
+               {
+                       this.joinBlock = joinBlock;
+                       this.signal = signal;
+               }
+
+               protected override void EnsureProcessing ()
+               {
+                       signal ();
+               }
+
+               public BlockingCollection<TTarget> Buffer {
+                       get {
+                               return MessageQueue;
+                       }
+               }
+
+               DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
+                                                                         TTarget messageValue,
+                                                                         ISourceBlock<TTarget> source,
+                                                                         bool consumeToAccept)
+               {
+                       return OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+               }
+
+               void IDataflowBlock.Complete ()
+               {
+                       Complete ();
+               }
+
+               Task IDataflowBlock.Completion {
+                       get {
+                               return joinBlock.Completion;
+                       }
+               }
+
+               void IDataflowBlock.Fault (Exception e)
+               {
+                       joinBlock.Fault (e);
+               }
+       }
+}
\ No newline at end of file
index 178669fb07c7c5acfcf4a33226b38914ff307fc1..d83abbe3cfb9a4903094480ad0a1e8e2982d1c52 100644 (file)
 //
 //
 
-
-using System;
-using System.Threading.Tasks;
-using System.Collections.Generic;
 using System.Collections.Concurrent;
 
 namespace System.Threading.Tasks.Dataflow
@@ -36,13 +32,14 @@ namespace System.Threading.Tasks.Dataflow
        internal class MessageBox<TInput>
        {
                readonly CompletionHelper compHelper;
-               readonly BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
                readonly Func<bool> externalCompleteTester;
 
+               protected BlockingCollection<TInput> MessageQueue { get; private set; }
+
                public MessageBox (BlockingCollection<TInput> messageQueue, CompletionHelper compHelper, Func<bool> externalCompleteTester)
                {
                        this.compHelper = compHelper;
-                       this.messageQueue = messageQueue;
+                       this.MessageQueue = messageQueue;
                        this.externalCompleteTester = externalCompleteTester;
                }
 
@@ -54,7 +51,7 @@ namespace System.Threading.Tasks.Dataflow
                {
                        if (!messageHeader.IsValid)
                                return DataflowMessageStatus.Declined;
-                       if (messageQueue.IsAddingCompleted)
+                       if (MessageQueue.IsAddingCompleted)
                                return DataflowMessageStatus.DecliningPermanently;
 
                        if (consumeToAccept) {
@@ -67,7 +64,7 @@ namespace System.Threading.Tasks.Dataflow
                        }
 
                        try {
-                               messageQueue.Add (messageValue);
+                               MessageQueue.Add (messageValue);
                        } catch (InvalidOperationException) {
                                // This is triggered either if the underlying collection didn't accept the item
                                // or if the messageQueue has been marked complete, either way it corresponds to a false
@@ -82,19 +79,18 @@ namespace System.Threading.Tasks.Dataflow
 
                protected virtual void EnsureProcessing ()
                {
-
                }
 
                public void Complete ()
                {
                        // Make message queue complete
-                       messageQueue.CompleteAdding ();
+                       MessageQueue.CompleteAdding ();
                        VerifyCompleteness ();
                }
 
                void VerifyCompleteness ()
                {
-                       if (messageQueue.IsCompleted && externalCompleteTester ())
+                       if (MessageQueue.IsCompleted && externalCompleteTester ())
                                compHelper.Complete ();
                }
        }