Added BatchedJoinBlock`3
authorPetr Onderka <gsvick@gmail.com>
Fri, 29 Jun 2012 18:11:28 +0000 (20:11 +0200)
committerPetr Onderka <gsvick@gmail.com>
Sun, 19 Aug 2012 21:40:06 +0000 (23:40 +0200)
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs [new file with mode: 0644]
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchedJoinBlockTest.cs
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3Test.cs [new file with mode: 0644]

index 13799825d77dd764da8f810da700de54942dda37..e82ffb60e96936f1eb063a3fcd6f67d729a0677f 100644 (file)
@@ -47,6 +47,7 @@
    <Compile Include="..\..\build\common\MonoTODOAttribute.cs" />
    <Compile Include="Assembly\AssemblyInfo.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock.cs" />\r
+    <Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock`3.cs" />\r
    <Compile Include="System.Threading.Tasks.Dataflow\ExecutingMessageBox.cs" />
    <Compile Include="System.Threading.Tasks.Dataflow\DataflowBlockOptions.cs" />
    <Compile Include="System.Threading.Tasks.Dataflow\DataflowMessageHeader.cs" />
index 297a4feda9753af8eb98b1c990950300c66df085..3006647ec2bfa104b1566bb7e1bcc9a65229651b 100644 (file)
@@ -43,6 +43,7 @@
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />\r
   <ItemGroup>\r
     <Compile Include="Test\System.Threading.Tasks.Dataflow\BatchedJoinBlockTest.cs" />\r
+    <Compile Include="Test\System.Threading.Tasks.Dataflow\BatchedJoinBlock`3Test.cs" />\r
    <Compile Include="Test\System.Threading.Tasks.Dataflow\DataflowMessageHeaderTest.cs" />
    <Compile Include="Test\System.Threading.Tasks.Dataflow\CompletionHelperTest.cs" />
    <Compile Include="System.Threading.Tasks.Dataflow\CompletionHelper.cs" />
@@ -89,4 +90,4 @@
     <Folder Include="Properties\" />\r
   </ItemGroup>\r
   \r
-</Project>\r
+</Project>
index bd42827a911d0713247dddd3b120771ead18f868..6e0e4c3ab05f646e871ddd7d26c46021e27e0516 100644 (file)
@@ -22,7 +22,6 @@
 //
 //
 
-using System.Collections.Concurrent;
 using System.Collections.Generic;
 
 namespace System.Threading.Tasks.Dataflow {
@@ -59,8 +58,10 @@ namespace System.Threading.Tasks.Dataflow {
                        BatchSize = batchSize;
                        options = dataflowBlockOptions;
 
-                       target1 = new JoinTarget<T1> (this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
-                       target2 = new JoinTarget<T2> (this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+                       target1 = new JoinTarget<T1> (
+                               this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+                       target2 = new JoinTarget<T2> (
+                               this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
 
                        outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
                                completionHelper,
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs
new file mode 100644 (file)
index 0000000..7889e75
--- /dev/null
@@ -0,0 +1,216 @@
+// BatchedJoinBlock.cs
+//
+// Copyright (c) 2012 Petr Onderka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+using System.Collections.Generic;
+
+namespace System.Threading.Tasks.Dataflow {
+       public sealed class BatchedJoinBlock<T1, T2, T3> :
+               IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> {
+               GroupingDataflowBlockOptions options;
+
+               CompletionHelper completionHelper = CompletionHelper.GetNew ();
+               readonly MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> outgoing;
+
+               readonly MessageVault<Tuple<IList<T1>, IList<T2>, IList<T3>>> vault =
+                       new MessageVault<Tuple<IList<T1>, IList<T2>, IList<T3>>> ();
+
+               readonly TargetBuffer<Tuple<IList<T1>, IList<T2>, IList<T3>>> targets =
+                       new TargetBuffer<Tuple<IList<T1>, IList<T2>, IList<T3>>> ();
+
+               DataflowMessageHeader headers;
+               SpinLock batchLock;
+
+               readonly JoinTarget<T1> target1;
+               readonly JoinTarget<T2> target2;
+               readonly JoinTarget<T3> target3;
+
+               int batchCount;
+
+               public BatchedJoinBlock (int batchSize)
+                       : this (batchSize, GroupingDataflowBlockOptions.Default)
+               {
+               }
+
+               public BatchedJoinBlock (int batchSize,
+                                        GroupingDataflowBlockOptions dataflowBlockOptions)
+               {
+                       if (batchSize <= 0)
+                               throw new ArgumentOutOfRangeException (
+                                       "batchSize", batchSize, "The batchSize must be positive.");
+                       if (dataflowBlockOptions == null)
+                               throw new ArgumentNullException ("dataflowBlockOptions");
+
+                       BatchSize = batchSize;
+                       options = dataflowBlockOptions;
+
+                       target1 = new JoinTarget<T1> (
+                               this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+                       target2 = new JoinTarget<T2> (
+                               this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+                       target3 = new JoinTarget<T3>(
+                               this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+
+                       outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> (
+                               completionHelper,
+                               () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted || target3.Buffer.IsCompleted);
+               }
+
+               public int BatchSize { get; private set; }
+
+               public ITargetBlock<T1> Target1 {
+                       get { return target1; }
+               }
+
+               public ITargetBlock<T2> Target2 {
+                       get { return target2; }
+               }
+
+               public ITargetBlock<T3> Target3 {
+                       get { return target3; }
+               }
+
+               void SignalTarget ()
+               {
+                       int current = Interlocked.Increment (ref batchCount);
+
+                       if (current % BatchSize != 0)
+                               return;
+
+                       Interlocked.Add (ref batchCount, -current);
+
+                       MakeBatch (BatchSize);
+               }
+
+               void MakeBatch (int batchSize)
+               {
+                       var list1 = new List<T1> ();
+                       var list2 = new List<T2> ();
+                       var list3 = new List<T3> ();
+
+                       // lock is necessary here to make sure items are in the correct order
+                       bool taken = false;
+                       try {
+                               batchLock.Enter (ref taken);
+
+                               int i = 0;
+
+                               T1 item1;
+                               while (i < batchSize && target1.Buffer.TryTake (out item1)) {
+                                       list1.Add (item1);
+                                       i++;
+                               }
+
+                               T2 item2;
+                               while (i < batchSize && target2.Buffer.TryTake (out item2)) {
+                                       list2.Add (item2);
+                                       i++;
+                               }
+
+                               T3 item3;
+                               while (i < batchSize && target3.Buffer.TryTake (out item3)) {
+                                       list3.Add (item3);
+                                       i++;
+                               }
+
+                               if (i < batchSize)
+                                       throw new InvalidOperationException ("Unexpected count of items.");
+                       } finally {
+                               if (taken)
+                                       batchLock.Exit ();
+                       }
+
+                       var batch = Tuple.Create<IList<T1>, IList<T2>, IList<T3>> (list1, list2,
+                               list3);
+
+                       var target = targets.Current;
+                       if (target == null)
+                               outgoing.AddData (batch);
+                       else
+                               target.OfferMessage (headers.Increment (), batch, this, false);
+
+                       if (!outgoing.IsEmpty && targets.Current != null)
+                               outgoing.ProcessForTarget (targets.Current, this, false, ref headers);
+               }
+
+               public Task Completion
+               {
+                       get { return completionHelper.Completion; }
+               }
+
+               public void Complete ()
+               {
+                       outgoing.Complete ();
+               }
+
+               void IDataflowBlock.Fault (Exception exception)
+               {
+                       completionHelper.Fault (exception);
+               }
+
+               Tuple<IList<T1>, IList<T2>, IList<T3>>
+                       ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ConsumeMessage (
+                       DataflowMessageHeader messageHeader,
+                       ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
+                       out bool messageConsumed)
+               {
+                       return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+               }
+
+               public IDisposable LinkTo (
+                       ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target,
+                       bool unlinkAfterOne)
+               {
+                       var result = targets.AddTarget (target, unlinkAfterOne);
+                       outgoing.ProcessForTarget (target, this, false, ref headers);
+                       return result;
+               }
+
+               void ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReleaseReservation (
+                       DataflowMessageHeader messageHeader,
+                       ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
+               {
+                       vault.ReleaseReservation (messageHeader, target);
+               }
+
+               bool ISourceBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>>.ReserveMessage (
+                       DataflowMessageHeader messageHeader,
+                       ITargetBlock<Tuple<IList<T1>, IList<T2>, IList<T3>>> target)
+               {
+                       return vault.ReserveMessage (messageHeader, target);
+               }
+
+               public bool TryReceive (
+                       Predicate<Tuple<IList<T1>, IList<T2>, IList<T3>>> filter,
+                       out Tuple<IList<T1>, IList<T2>, IList<T3>> item)
+               {
+                       return outgoing.TryReceive (filter, out item);
+               }
+
+               public bool TryReceiveAll (
+                       out IList<Tuple<IList<T1>, IList<T2>, IList<T3>>> items)
+               {
+                       return outgoing.TryReceiveAll (out items);
+               }
+       }
+}
\ No newline at end of file
index c088fe88ca6644d385a19e1e92fd4075e43da896..ec9696cf94bb1c11259d6ee8948503b7eda71129 100644 (file)
@@ -51,6 +51,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
                        // both targets once
                        Assert.IsTrue (block.Target1.Post (1));
 
+                       Assert.IsFalse(evt.Wait(100));
                        Assert.IsNull (result);
 
                        Assert.IsTrue (block.Target2.Post (2));
@@ -67,6 +68,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
                        // target 1 twice
                        Assert.IsTrue (block.Target1.Post (3));
 
+                       Assert.IsFalse(evt.Wait(100));
                        Assert.IsNull (result);
 
                        Assert.IsTrue (block.Target1.Post (4));
@@ -82,6 +84,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
                        // target 2 twice
                        Assert.IsTrue (block.Target2.Post (5));
 
+                       Assert.IsFalse(evt.Wait(100));
                        Assert.IsNull (result);
 
                        Assert.IsTrue (block.Target2.Post (6));
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3Test.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3Test.cs
new file mode 100644 (file)
index 0000000..1d90bd9
--- /dev/null
@@ -0,0 +1,70 @@
+// 
+// BatchedJoinBlockTest.cs
+//  
+// Author:
+//       Petr Onderka <gsvick@gmail.com>
+// 
+// Copyright (c) 2012 Petr Onderka
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks.Dataflow;
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+       [TestFixture]
+       public class BatchedJoinBlock3Test
+       {
+               [Test]
+               public void BasicUsageTest()
+               {
+                       Tuple<IList<int>, IList<int>, IList<string>> result = null;
+                       var evt = new ManualResetEventSlim(false);
+
+                       var actionBlock = new ActionBlock<Tuple<IList<int>, IList<int>, IList<string>>>(r =>
+                       {
+                               result = r;
+                               evt.Set();
+                       });
+                       var block = new BatchedJoinBlock<int, int, string>(3);
+
+                       block.LinkTo(actionBlock);
+
+                       // all targets once
+                       Assert.IsTrue(block.Target1.Post(1));
+                       Assert.IsTrue(block.Target2.Post(2));
+
+                       Assert.IsFalse (evt.Wait(100));
+                       Assert.IsNull(result);
+
+                       Assert.IsTrue(block.Target3.Post("foo"));
+
+                       Assert.IsTrue(evt.Wait(100));
+
+                       Assert.IsNotNull(result);
+                       CollectionAssert.AreEqual(new[] { 1 }, result.Item1);
+                       CollectionAssert.AreEqual(new[] { 2 }, result.Item2);
+                       CollectionAssert.AreEqual(new[] { "foo" }, result.Item3);
+               }
+       }
+}
\ No newline at end of file