Implemented LinkTo() with predicate
authorPetr Onderka <gsvick@gmail.com>
Fri, 20 Jul 2012 21:45:16 +0000 (23:45 +0200)
committerPetr Onderka <gsvick@gmail.com>
Sun, 19 Aug 2012 22:00:32 +0000 (00:00 +0200)
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PredicateBlock.cs [new file with mode: 0644]
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs

index f3a551c9e70d8422a37dcdb64bfacf5b7deaf436..c6552c75ba064ea9bab9353688ffb15f18c053b1 100644 (file)
@@ -48,6 +48,7 @@
    <Compile Include="Assembly\AssemblyInfo.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\BroadcastOutgoingQueue.cs" />\r
     <Compile Include="System.Threading.Tasks.Dataflow\OutgoingQueueBase.cs" />\r
+    <Compile Include="System.Threading.Tasks.Dataflow\PredicateBlock.cs" />\r
     <Compile Include="System.Threading.Tasks.Dataflow\SendBlock.cs" />\r
     <Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock.cs" />\r
     <Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock`3.cs" />\r
index 14bc184f1f9ddef56ff714044a38e6ddb6dafffb..89c0ae6bfeae2478fcfe8453cf4872799c3561d0 100644 (file)
@@ -42,3 +42,4 @@ System.Threading.Tasks.Dataflow/TransformBlock.cs
 System.Threading.Tasks.Dataflow/TransformManyBlock.cs
 System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
 System.Threading.Tasks.Dataflow/SendBlock.cs
+System.Threading.Tasks.Dataflow/PredicateBlock.cs
index 44dc58fe1ea4a44ab71b27793898e1d6b5523e97..5073aacb45a6a98c7d4f0c0208cd8a27fb9be6c5 100644 (file)
@@ -132,7 +132,6 @@ namespace System.Threading.Tasks.Dataflow {
                        return source.LinkTo (target, DataflowLinkOptions.Default, predicate);
                }
 
-               [MonoTODO("Use predicate")]
                public static IDisposable LinkTo<TOutput> (
                        this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target,
                        DataflowLinkOptions linkOptions, Predicate<TOutput> predicate)
@@ -142,7 +141,9 @@ namespace System.Threading.Tasks.Dataflow {
                        if (predicate == null)
                                throw new ArgumentNullException("predicate");
 
-                       return source.LinkTo (target, linkOptions);
+                       var predicateBlock = new PredicateBlock<TOutput> (source, target, predicate);
+
+                       return source.LinkTo (predicateBlock, linkOptions);
                }
 
                [MonoTODO]
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PredicateBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PredicateBlock.cs
new file mode 100644 (file)
index 0000000..60cfdb7
--- /dev/null
@@ -0,0 +1,131 @@
+// PredicateBlock.cs
+//
+// Copyright (c) 2012 Petr Onderka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+namespace System.Threading.Tasks.Dataflow {
+       /// <summary>
+       /// This block is used by the version of <see cref="DataflowBlock.LinkTo"/>
+       /// that has a predicate to wrap the target block,
+       /// so that the predicate can be checked.
+       /// </summary>
+       class PredicateBlock<T> : ITargetBlock<T> {
+               /// <summary>
+               /// Wraps the source block of the link.
+               /// This is necessary so that the communication from target to source works correctly.
+               /// </summary>
+               class SourceBlock : ISourceBlock<T> {
+                       readonly ISourceBlock<T> actualSource;
+                       readonly PredicateBlock<T> predicateBlock;
+
+                       public SourceBlock (ISourceBlock<T> actualSource,
+                                           PredicateBlock<T> predicateBlock)
+                       {
+                               this.actualSource = actualSource;
+                               this.predicateBlock = predicateBlock;
+                       }
+
+                       public Task Completion
+                       {
+                               get { return actualSource.Completion; }
+                       }
+
+                       public void Complete ()
+                       {
+                               actualSource.Complete ();
+                       }
+
+                       public void Fault (Exception exception)
+                       {
+                               actualSource.Fault (exception);
+                       }
+
+                       public T ConsumeMessage (DataflowMessageHeader messageHeader,
+                                                ITargetBlock<T> target, out bool messageConsumed)
+                       {
+                               return actualSource.ConsumeMessage (messageHeader, predicateBlock,
+                                       out messageConsumed);
+                       }
+
+                       public IDisposable LinkTo (ITargetBlock<T> target,
+                                                  DataflowLinkOptions linkOptions)
+                       {
+                               return actualSource.LinkTo (target, linkOptions);
+                       }
+
+                       public void ReleaseReservation (DataflowMessageHeader messageHeader,
+                                                       ITargetBlock<T> target)
+                       {
+                               actualSource.ReleaseReservation (messageHeader, predicateBlock);
+                       }
+
+                       public bool ReserveMessage (DataflowMessageHeader messageHeader,
+                                                   ITargetBlock<T> target)
+                       {
+                               return actualSource.ReserveMessage (messageHeader, predicateBlock);
+                       }
+               }
+
+               readonly ITargetBlock<T> actualTarget;
+               readonly Predicate<T> predicate;
+               readonly SourceBlock sourceBlock;
+
+               public PredicateBlock (ISourceBlock<T> actualSource,
+                                      ITargetBlock<T> actualTarget, Predicate<T> predicate)
+               {
+                       this.actualTarget = actualTarget;
+                       this.predicate = predicate;
+                       sourceBlock = new SourceBlock (actualSource, this);
+               }
+
+               public DataflowMessageStatus OfferMessage (
+                       DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
+                       bool consumeToAccept)
+               {
+                       if (!messageHeader.IsValid)
+                               throw new ArgumentException ("The messageHeader is not valid.",
+                                       "messageHeader");
+                       if (consumeToAccept && source == null)
+                               throw new ArgumentException (
+                                       "consumeToAccept may only be true if provided with a non-null source.",
+                                       "consumeToAccept");
+
+                       if (!predicate(messageValue))
+                               return DataflowMessageStatus.Declined;
+
+                       return actualTarget.OfferMessage (messageHeader, messageValue, sourceBlock,
+                               consumeToAccept);
+               }
+
+               public Task Completion {
+                       get { return actualTarget.Completion; }
+               }
+
+               public void Complete ()
+               {
+                       actualTarget.Complete ();
+               }
+
+               public void Fault (Exception exception)
+               {
+                       actualTarget.Fault (exception);
+               }
+       }
+}
\ No newline at end of file
index b473ded8d5cf67b2c9304c5d9632aed650dc8277..79e54fe627bf5562ea7b1cce3afe8e0b5ae14674 100644 (file)
@@ -237,5 +237,108 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
                        Assert.IsTrue (task.Wait (100));
                        Assert.IsFalse (task.Result);
                }
+
+               [Test]
+               public void LinkToPredicateTest ()
+               {
+                       var scheduler = new TestScheduler ();
+                       var source = new BufferBlock<int> (
+                               new DataflowBlockOptions { TaskScheduler = scheduler });
+                       var target = new BufferBlock<int> ();
+                       source.LinkTo (target, i => i % 2 == 1);
+
+                       Assert.IsTrue (source.Post (1));
+                       Assert.IsTrue (source.Post (2));
+                       Assert.IsTrue (source.Post (3));
+
+                       scheduler.ExecuteAll ();
+
+                       int item;
+                       Assert.IsTrue (target.TryReceive (out item));
+                       Assert.AreEqual (1, item);
+                       Assert.IsFalse (target.TryReceive (out item));
+
+                       Assert.IsTrue (source.TryReceive (out item));
+                       Assert.AreEqual (2, item);
+
+                       scheduler.ExecuteAll ();
+
+                       Assert.IsTrue (target.TryReceive (out item));
+                       Assert.AreEqual (3, item);
+               }
+
+               [Test]
+               public void LinkToPredicateMaxMessagesTest ()
+               {
+                       var scheduler = new TestScheduler ();
+                       var source = new BufferBlock<int> (
+                               new DataflowBlockOptions { TaskScheduler = scheduler });
+                       var target = new BufferBlock<int> ();
+                       source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 1 },
+                               i => i % 2 == 1);
+
+                       Assert.IsTrue (source.Post (2));
+                       Assert.IsTrue (source.Post (1));
+                       Assert.IsTrue (source.Post (3));
+
+                       scheduler.ExecuteAll ();
+
+                       int item;
+                       Assert.IsFalse (target.TryReceive (out item));
+                       Assert.IsTrue (source.TryReceive (out item));
+                       Assert.AreEqual (2, item);
+
+                       scheduler.ExecuteAll ();
+
+                       Assert.IsTrue (target.TryReceive (out item));
+                       Assert.AreEqual (1, item);
+
+                       scheduler.ExecuteAll ();
+                       
+                       Assert.IsFalse (target.TryReceive (out item));
+               }
+
+               [Test]
+               public void LinkToPredicatePostponed ()
+               {
+                       var scheduler = new TestScheduler ();
+                       var source = new BufferBlock<int> (
+                               new DataflowBlockOptions { TaskScheduler = scheduler });
+                       var target = new BufferBlock<int> (
+                               new DataflowBlockOptions { BoundedCapacity = 1, TaskScheduler = scheduler });
+                       source.LinkTo (target, i => true);
+
+                       Assert.IsTrue (target.Post (1));
+                       Assert.IsTrue (source.Post (2));
+
+                       scheduler.ExecuteAll ();
+
+                       int item;
+                       Assert.IsTrue (target.TryReceive (out item));
+                       Assert.AreEqual (1, item);
+
+                       scheduler.ExecuteAll ();
+
+                       Assert.IsTrue (target.TryReceive (out item));
+                       Assert.AreEqual (2, item);
+               }
+
+               [Test]
+               public void LinkToPredicateClonerTest ()
+               {
+                       var scheduler = new TestScheduler ();
+                       var source = new BroadcastBlock<int> (i => i * 10,
+                               new DataflowBlockOptions { TaskScheduler = scheduler });
+                       var target = new BufferBlock<int> ();
+                       source.LinkTo (target, i => i < 10);
+
+                       Assert.IsTrue (source.Post (1));
+
+                       scheduler.ExecuteAll ();
+
+                       int item;
+                       Assert.IsTrue (target.TryReceive (out item));
+                       Assert.AreEqual (10, item);
+               }
        }
 }
\ No newline at end of file