From 3acedb6df8c93408cbd67ec9d4d6274ad56f4949 Mon Sep 17 00:00:00 2001 From: Petr Onderka Date: Mon, 25 Jun 2012 23:07:19 +0200 Subject: [PATCH] Implemented BatchedJoinBlock --- ...em.Threading.Tasks.Dataflow-net_4_5.csproj | 3 +- ...eading.Tasks.Dataflow-tests-net_4_5.csproj | 4 +- .../BatchBlock.cs | 40 ++-- .../BatchedJoinBlock.cs | 190 ++++++++++++++++++ .../GroupingDataflowBlockOptions.cs | 10 +- .../BatchBlockTest.cs | 2 +- .../BatchedJoinBlockTest.cs | 95 +++++++++ 7 files changed, 326 insertions(+), 18 deletions(-) create mode 100644 mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs create mode 100644 mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchedJoinBlockTest.cs diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj index 65ee4b5eb67..13799825d77 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj @@ -46,6 +46,7 @@ + @@ -115,4 +116,4 @@ - + \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj index dd28630b1ea..297a4feda97 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj @@ -12,7 +12,7 @@ true Properties - + MonoTests System.Threading.Tasks.Dataflow_test_net_4_5 v4.5 512 @@ -42,6 +42,7 @@ + @@ -89,4 +90,3 @@ - diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs index 963024709d1..7c1d612af21 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs @@ -44,6 +44,7 @@ namespace System.Threading.Tasks.Dataflow MessageOutgoingQueue outgoing; TargetBuffer targets = new TargetBuffer (); DataflowMessageHeader headers = DataflowMessageHeader.NewValid (); + SpinLock batchLock; public BatchBlock (int batchSize) : this (batchSize, defaultOptions) { @@ -112,30 +113,45 @@ namespace System.Threading.Tasks.Dataflow return; } while (Interlocked.CompareExchange (ref batchCount, 0, earlyBatchSize) != earlyBatchSize); - MakeBatch (targets.Current, earlyBatchSize); + MakeBatch (earlyBatchSize); } - // TODO: there can be out-of-order processing of message elements if two collections - // are triggered and work side by side. See if it's a problem or not. void BatchProcess () { - ITargetBlock target = targets.Current; - int current = Interlocked.Increment (ref batchCount); + // has to deal correctly with concurrent TriggerBatch - if (current % batchSize != 0) - return; + int current; + int previousCount; + do { + previousCount = batchCount; + current = previousCount + 1; - Interlocked.Add (ref batchCount, -current); + if (current == batchSize) + current = 0; + } while (Interlocked.CompareExchange (ref batchCount, current, previousCount) + != previousCount); - MakeBatch (target, batchSize); + if (current == 0) + MakeBatch (batchSize); } - void MakeBatch (ITargetBlock target, int size) + void MakeBatch (int size) { T[] batch = new T[size]; - for (int i = 0; i < size; ++i) - messageQueue.TryTake (out batch[i]); + // lock is necessary here to make sure items are in the correct order + bool taken = false; + try { + batchLock.Enter (ref taken); + + for (int i = 0; i < size; ++i) + messageQueue.TryTake (out batch[i]); + } finally { + if (taken) + batchLock.Exit(); + } + + var target = targets.Current; if (target == null) outgoing.AddData (batch); else diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs new file mode 100644 index 00000000000..bd42827a911 --- /dev/null +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs @@ -0,0 +1,190 @@ +// BatchedJoinBlock.cs +// +// Copyright (c) 2012 Petr Onderka +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +// +// + +using System.Collections.Concurrent; +using System.Collections.Generic; + +namespace System.Threading.Tasks.Dataflow { + public sealed class BatchedJoinBlock : + IReceivableSourceBlock, IList>> { + GroupingDataflowBlockOptions options; + + CompletionHelper completionHelper = CompletionHelper.GetNew(); + readonly MessageOutgoingQueue, IList>> outgoing; + readonly MessageVault, IList>> vault = new MessageVault, IList>>(); + readonly TargetBuffer, IList>> targets = new TargetBuffer, IList>>(); + DataflowMessageHeader headers; + SpinLock batchLock; + + readonly JoinTarget target1; + readonly JoinTarget target2; + + int batchCount; + + public BatchedJoinBlock (int batchSize) + : this (batchSize, GroupingDataflowBlockOptions.Default) + { + } + + public BatchedJoinBlock (int batchSize, + GroupingDataflowBlockOptions dataflowBlockOptions) + { + if (batchSize <= 0) + throw new ArgumentOutOfRangeException ( + "batchSize", batchSize, "The batchSize must be positive."); + if (dataflowBlockOptions == null) + throw new ArgumentNullException ("dataflowBlockOptions"); + + BatchSize = batchSize; + options = dataflowBlockOptions; + + target1 = new JoinTarget (this, SignalTarget, completionHelper, () => outgoing.IsCompleted); + target2 = new JoinTarget (this, SignalTarget, completionHelper, () => outgoing.IsCompleted); + + outgoing = new MessageOutgoingQueue, IList>> ( + completionHelper, + () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted); + } + + public int BatchSize { get; private set; } + + public ITargetBlock Target1 { + get { return target1; } + } + + public ITargetBlock Target2 { + get { return target2; } + } + + private void SignalTarget() + { + int current = Interlocked.Increment (ref batchCount); + + if (current % BatchSize != 0) + return; + + Interlocked.Add (ref batchCount, -current); + + MakeBatch (BatchSize); + } + + void MakeBatch (int batchSize) + { + var list1 = new List (); + var list2 = new List (); + + // lock is necessary here to make sure items are in the correct order + bool taken = false; + try { + batchLock.Enter (ref taken); + + int i = 0; + + T1 item1; + while (i < batchSize && target1.Buffer.TryTake (out item1)) { + list1.Add (item1); + i++; + } + + T2 item2; + while (i < batchSize && target2.Buffer.TryTake (out item2)) { + list2.Add (item2); + i++; + } + + if (i < batchSize) + throw new InvalidOperationException("Unexpected count of items."); + } finally { + if (taken) + batchLock.Exit (); + } + + var batch = Tuple.Create, IList> (list1, list2); + + var target = targets.Current; + if (target == null) + outgoing.AddData(batch); + else + target.OfferMessage (headers.Increment (), batch, this, false); + + if (!outgoing.IsEmpty && targets.Current != null) + outgoing.ProcessForTarget (targets.Current, this, false, ref headers); + } + + public Task Completion { + get { return completionHelper.Completion; } + } + + public void Complete () + { + outgoing.Complete (); + } + + void IDataflowBlock.Fault (Exception exception) + { + completionHelper.Fault (exception); + } + + Tuple, IList> ISourceBlock, IList>>.ConsumeMessage ( + DataflowMessageHeader messageHeader, + ITargetBlock, IList>> target, + out bool messageConsumed) + { + return vault.ConsumeMessage (messageHeader, target, out messageConsumed); + } + + public IDisposable LinkTo (ITargetBlock, IList>> target, + bool unlinkAfterOne) + { + var result = targets.AddTarget(target, unlinkAfterOne); + outgoing.ProcessForTarget(target, this, false, ref headers); + return result; + } + + void ISourceBlock, IList>>.ReleaseReservation ( + DataflowMessageHeader messageHeader, + ITargetBlock, IList>> target) + { + vault.ReleaseReservation (messageHeader, target); + } + + bool ISourceBlock, IList>>.ReserveMessage ( + DataflowMessageHeader messageHeader, + ITargetBlock, IList>> target) + { + return vault.ReserveMessage (messageHeader, target); + } + + public bool TryReceive (Predicate, IList>> filter, + out Tuple, IList> item) + { + return outgoing.TryReceive (filter, out item); + } + + public bool TryReceiveAll (out IList, IList>> items) + { + return outgoing.TryReceiveAll (out items); + } + } +} \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs index ac6bfbd19fe..9ab701efaaa 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/GroupingDataflowBlockOptions.cs @@ -28,8 +28,14 @@ using System.Threading.Tasks; namespace System.Threading.Tasks.Dataflow { - public class GroupingDataflowBlockOptions : DataflowBlockOptions - { + public class GroupingDataflowBlockOptions : DataflowBlockOptions { + static readonly GroupingDataflowBlockOptions DefaultOptions = + new GroupingDataflowBlockOptions (); + + internal static GroupingDataflowBlockOptions Default { + get { return DefaultOptions; } + } + public GroupingDataflowBlockOptions () { MaxNumberOfGroups = -1; diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchBlockTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchBlockTest.cs index 79b675da3c4..3f17718fbf4 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchBlockTest.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchBlockTest.cs @@ -58,7 +58,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow evt.Wait (); Assert.IsNotNull (array); - CollectionAssert.AreEquivalent (new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 42 }, array); + CollectionAssert.AreEqual (new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 42 }, array); } [Test] diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchedJoinBlockTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchedJoinBlockTest.cs new file mode 100644 index 00000000000..c088fe88ca6 --- /dev/null +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchedJoinBlockTest.cs @@ -0,0 +1,95 @@ +// +// BatchedJoinBlockTest.cs +// +// Author: +// Petr Onderka +// +// Copyright (c) 2012 Petr Onderka +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks.Dataflow; +using NUnit.Framework; + +namespace MonoTests.System.Threading.Tasks.Dataflow { + [TestFixture] + public class BatchedJoinBlockTest { + [Test] + public void BasicUsageTest() + { + Tuple, IList> result = null; + var evt = new ManualResetEventSlim (false); + + var actionBlock = new ActionBlock, IList>> (r => + { + result = r; + evt.Set (); + }); + var block = new BatchedJoinBlock (2); + + block.LinkTo (actionBlock); + + // both targets once + Assert.IsTrue (block.Target1.Post (1)); + + Assert.IsNull (result); + + Assert.IsTrue (block.Target2.Post (2)); + + Assert.IsTrue (evt.Wait (100)); + + Assert.IsNotNull (result); + CollectionAssert.AreEqual (new[] { 1 }, result.Item1); + CollectionAssert.AreEqual (new[] { 2 }, result.Item2); + + result = null; + evt.Reset (); + + // target 1 twice + Assert.IsTrue (block.Target1.Post (3)); + + Assert.IsNull (result); + + Assert.IsTrue (block.Target1.Post (4)); + Assert.IsTrue (evt.Wait (100)); + + Assert.IsNotNull (result); + CollectionAssert.AreEqual (new[] { 3, 4 }, result.Item1); + CollectionAssert.IsEmpty (result.Item2); + + result = null; + evt.Reset (); + + // target 2 twice + Assert.IsTrue (block.Target2.Post (5)); + + Assert.IsNull (result); + + Assert.IsTrue (block.Target2.Post (6)); + Assert.IsTrue (evt.Wait (100)); + + Assert.IsNotNull (result); + CollectionAssert.IsEmpty (result.Item1); + CollectionAssert.AreEqual (new[] { 5, 6 }, result.Item2); + } + } +} \ No newline at end of file -- 2.25.1