From 746c068fd0445145d034f68b3a68e1aaa0dccd77 Mon Sep 17 00:00:00 2001 From: Petr Onderka Date: Tue, 3 Jul 2012 00:40:25 +0200 Subject: [PATCH] Corrected handling of exceptions Also fixed some race conditions. --- .../Assembly/AssemblyInfo.cs | 1 + ...eading.Tasks.Dataflow-tests-net_4_5.csproj | 6 +- .../ActionBlock.cs | 13 ++- .../BatchBlock.cs | 2 +- .../BatchedJoinBlock.cs | 2 +- .../BatchedJoinBlock`3.cs | 2 +- .../BroadcastBlock.cs | 2 +- .../BufferBlock.cs | 2 +- .../CompletionHelper.cs | 66 +++++++++-- .../ExecutingMessageBox.cs | 83 ++++++++++---- .../JoinBlock.cs | 2 +- .../JoinBlock`3.cs | 2 +- .../MessageBox.cs | 10 +- .../PropagatorWrapperBlock.cs | 2 +- .../TransformBlock.cs | 20 ++-- .../TransformManyBlock.cs | 35 +++--- .../WriteOnceBlock.cs | 2 +- .../ActionBlockTest.cs | 8 +- .../CompletionHelperTest.cs | 2 +- .../CompletionTest.cs | 44 ++++---- .../ExecutionBlocksTest.cs | 106 ++++++++++++++++++ .../OptionsTest.cs | 31 +++-- 22 files changed, 324 insertions(+), 119 deletions(-) create mode 100644 mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ExecutionBlocksTest.cs diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Assembly/AssemblyInfo.cs b/mcs/class/System.Threading.Tasks.Dataflow/Assembly/AssemblyInfo.cs index 36e10796e70..725c7900181 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/Assembly/AssemblyInfo.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/Assembly/AssemblyInfo.cs @@ -57,3 +57,4 @@ using System.Runtime.InteropServices; [assembly: ComVisible (false)] +[assembly: InternalsVisibleTo("System.Threading.Tasks.Dataflow_test_net_4_5")] diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj index ec030ba7300..fa3c44d625a 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj @@ -1,4 +1,4 @@ - + Debug @@ -42,11 +42,11 @@ + - @@ -91,4 +91,4 @@ - + \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs index b83dcd3e8fc..27c2b2ca39b 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs @@ -56,7 +56,8 @@ namespace System.Threading.Tasks.Dataflow this.action = action; this.dataflowBlockOptions = dataflowBlockOptions; this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions); - this.messageBox = new ExecutingMessageBox (messageQueue, compHelper, () => true, ProcessQueue, dataflowBlockOptions); + this.messageBox = new ExecutingMessageBox (messageQueue, compHelper, + () => true, ProcessItem, () => { }, dataflowBlockOptions); } [MonoTODO] @@ -79,13 +80,13 @@ namespace System.Threading.Tasks.Dataflow return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept); } - void ProcessQueue (int maxMessages) + bool ProcessItem () { - int i = 0; TInput data; - while ((maxMessages == DataflowBlockOptions.Unbounded || i++ < maxMessages) - && messageQueue.TryTake (out data)) + bool dequeued = messageQueue.TryTake (out data); + if (dequeued) action (data); + return dequeued; } public void Complete () @@ -95,7 +96,7 @@ namespace System.Threading.Tasks.Dataflow public void Fault (Exception ex) { - compHelper.Fault (ex); + compHelper.RequestFault (ex); } public Task Completion { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs index 5ce17b416cb..450f49fff76 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs @@ -166,7 +166,7 @@ namespace System.Threading.Tasks.Dataflow public void Fault (Exception ex) { - compHelper.Fault (ex); + compHelper.RequestFault (ex); } public Task Completion { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs index f406f00a76b..270da44c2a7 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs @@ -145,7 +145,7 @@ namespace System.Threading.Tasks.Dataflow { void IDataflowBlock.Fault (Exception exception) { - completionHelper.Fault (exception); + completionHelper.RequestFault (exception); } Tuple, IList> ISourceBlock, IList>>.ConsumeMessage ( diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs index e42c1590d36..425c6d137de 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs @@ -166,7 +166,7 @@ namespace System.Threading.Tasks.Dataflow { void IDataflowBlock.Fault (Exception exception) { - completionHelper.Fault (exception); + completionHelper.RequestFault (exception); } Tuple, IList, IList> diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs index ecd03b294ee..0152f8c34d9 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs @@ -123,7 +123,7 @@ namespace System.Threading.Tasks.Dataflow public void Fault (Exception ex) { - compHelper.Fault (ex); + compHelper.RequestFault (ex); } public Task Completion { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BufferBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BufferBlock.cs index 6ec4cba9e83..53cf1c09206 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BufferBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BufferBlock.cs @@ -125,7 +125,7 @@ namespace System.Threading.Tasks.Dataflow public void Fault (Exception ex) { - compHelper.Fault (ex); + compHelper.RequestFault (ex); } public Task Completion { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs index a78e0668174..e68779e1391 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs @@ -27,12 +27,20 @@ namespace System.Threading.Tasks.Dataflow { /// /// This is used to implement a default behavior for Dataflow completion tracking - /// that is the Completion property and Complete/Fault method combo + /// that is the Completion property, Complete/Fault method combo + /// and the CancellationToken option. /// - internal struct CompletionHelper + internal class CompletionHelper { TaskCompletionSource source; + private readonly AtomicBoolean canFaultOrCancelImmediatelly = + new AtomicBoolean { Value = true }; + private readonly AtomicBoolean requestedFaultOrCancel = + new AtomicBoolean { Value = false }; + + Exception requestedException; + public static CompletionHelper GetNew (DataflowBlockOptions options) { var completionHelper = new CompletionHelper { source = new TaskCompletionSource () }; @@ -45,23 +53,65 @@ namespace System.Threading.Tasks.Dataflow get { return source.Task; } } + public bool CanFaultOrCancelImmediatelly { + get { return canFaultOrCancelImmediatelly.Value; } + set { + if (value) { + if (canFaultOrCancelImmediatelly.TrySet () && requestedFaultOrCancel.Value) { + if (requestedException == null) + Cancel (); + else + Fault (requestedException); + } + } else + canFaultOrCancelImmediatelly.Value = false; + } + } + + public bool CanRun { + get { + return source.Task.Status == TaskStatus.WaitingForActivation + && !requestedFaultOrCancel.Value; + } + } + public void Complete () { source.TrySetResult (null); } - public void Fault (Exception ex) + public void RequestFault (Exception ex) + { + if (CanFaultOrCancelImmediatelly) + Fault (ex); + else { + Interlocked.CompareExchange (ref requestedException, ex, null); + requestedFaultOrCancel.Value = true; + } + } + + void Fault (Exception ex) { source.TrySetException (ex); } + void RequestCancel () + { + if (CanFaultOrCancelImmediatelly) + Cancel(); + else + requestedFaultOrCancel.Value = true; + } + + void Cancel () + { + source.TrySetCanceled (); + } + void SetOptions (DataflowBlockOptions options) { - // source can't be used in a lambda directly - var sourceTmp = source; if (options.CancellationToken != CancellationToken.None) - options.CancellationToken.Register ( - () => sourceTmp.TrySetCanceled ()); + options.CancellationToken.Register (RequestCancel); } } -} +} \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs index 355eb57e175..cdc74f4837c 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs @@ -27,64 +27,101 @@ namespace System.Threading.Tasks.Dataflow internal class ExecutingMessageBox : MessageBox { readonly ExecutionDataflowBlockOptions options; - readonly Action processQueue; - CompletionHelper compHelper; + readonly Func processItem; + readonly Action outgoingQueueComplete; + readonly CompletionHelper compHelper; - readonly AtomicBoolean waitingTask = new AtomicBoolean (); - int degreeOfParallelism; + // even number: Task is waiting to run + // odd number: Task is not waiting to run + // invariant: dop / 2 Tasks are running or waiting + int degreeOfParallelism = 1; public ExecutingMessageBox ( BlockingCollection messageQueue, CompletionHelper compHelper, - Func externalCompleteTester, Action processQueue, + Func externalCompleteTester, Func processItem, Action outgoingQueueComplete, ExecutionDataflowBlockOptions options) : base (messageQueue, compHelper, externalCompleteTester) { this.options = options; - this.processQueue = processQueue; + this.processItem = processItem; + this.outgoingQueueComplete = outgoingQueueComplete; this.compHelper = compHelper; } protected override void EnsureProcessing () { - if ((options.MaxDegreeOfParallelism != DataflowBlockOptions.Unbounded - && Thread.VolatileRead (ref degreeOfParallelism) >= options.MaxDegreeOfParallelism) || - !waitingTask.TrySet ()) - return; - StartProcessing (); } void StartProcessing () { + // atomically increase degreeOfParallelism by 1 only if it's odd + // and low enough + int startDegreeOfParallelism; + int currentDegreeOfParallelism = degreeOfParallelism; + do { + startDegreeOfParallelism = currentDegreeOfParallelism; + if (startDegreeOfParallelism % 2 == 0 + || (options.MaxDegreeOfParallelism != DataflowBlockOptions.Unbounded + && startDegreeOfParallelism / 2 >= options.MaxDegreeOfParallelism)) + return; + currentDegreeOfParallelism = + Interlocked.CompareExchange (ref degreeOfParallelism, + startDegreeOfParallelism + 1, startDegreeOfParallelism); + } while (startDegreeOfParallelism != currentDegreeOfParallelism); + Task.Factory.StartNew (ProcessQueue, TaskCreationOptions.PreferFairness); } void ProcessQueue () { + compHelper.CanFaultOrCancelImmediatelly = false; + int incrementedDegreeOfParallelism = Interlocked.Increment (ref degreeOfParallelism); if ((options.MaxDegreeOfParallelism == DataflowBlockOptions.Unbounded - || incrementedDegreeOfParallelism < options.MaxDegreeOfParallelism) - && (MessageQueue.Count > 0)) - StartProcessing(); - else - waitingTask.Value = false; + || incrementedDegreeOfParallelism / 2 < options.MaxDegreeOfParallelism) + && MessageQueue.Count > 0 && compHelper.CanRun) + StartProcessing (); try { - processQueue (options.MaxMessagesPerTask); + int i = 0; + while (compHelper.CanRun + && (options.MaxMessagesPerTask == DataflowBlockOptions.Unbounded + || i++ < options.MaxMessagesPerTask)) { + if (!processItem ()) + break; + } } catch (Exception e) { - compHelper.Fault (e); + compHelper.RequestFault (e); } int decrementedDegreeOfParallelism = - Interlocked.Decrement (ref degreeOfParallelism); + Interlocked.Add (ref degreeOfParallelism, -2); - if (!waitingTask.Value) { - if (decrementedDegreeOfParallelism == 0 && MessageQueue.IsCompleted) - compHelper.Complete (); - else if (MessageQueue.Count > 0) + if (decrementedDegreeOfParallelism % 2 == 1) { + if (decrementedDegreeOfParallelism == 1) { + compHelper.CanFaultOrCancelImmediatelly = true; + base.VerifyCompleteness (); + if (MessageQueue.IsCompleted) + outgoingQueueComplete (); + } + if (MessageQueue.Count > 0) EnsureProcessing (); } } + + protected override void OutgoingQueueComplete () + { + if (MessageQueue.IsCompleted + && Thread.VolatileRead (ref degreeOfParallelism) == 1) + outgoingQueueComplete (); + } + + protected override void VerifyCompleteness () + { + if (Thread.VolatileRead (ref degreeOfParallelism) == 1) + base.VerifyCompleteness (); + } } } \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs index bb46f602d1f..02134597238 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs @@ -102,7 +102,7 @@ namespace System.Threading.Tasks.Dataflow public void Fault (Exception ex) { - compHelper.Fault (ex); + compHelper.RequestFault (ex); } public Task Completion { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs index ca90da00715..97da7415874 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs @@ -107,7 +107,7 @@ namespace System.Threading.Tasks.Dataflow public void Fault (Exception ex) { - compHelper.Fault (ex); + compHelper.RequestFault (ex); } public Task Completion { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs index d83abbe3cfb..fd6b7572ada 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs @@ -85,14 +85,18 @@ namespace System.Threading.Tasks.Dataflow { // Make message queue complete MessageQueue.CompleteAdding (); + OutgoingQueueComplete (); VerifyCompleteness (); } - void VerifyCompleteness () + protected virtual void OutgoingQueueComplete () + { + } + + protected virtual void VerifyCompleteness () { if (MessageQueue.IsCompleted && externalCompleteTester ()) compHelper.Complete (); } } -} - +} \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs index f01441cb51f..644e75016b5 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs @@ -79,7 +79,7 @@ namespace System.Threading.Tasks.Dataflow public void Fault (Exception ex) { - compHelper.Fault (ex); + compHelper.RequestFault (ex); source.Fault (ex); target.Fault (ex); } diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs index fcea6ba211f..6c385c17ab8 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs @@ -58,11 +58,10 @@ namespace System.Threading.Tasks.Dataflow this.transformer = transformer; this.dataflowBlockOptions = dataflowBlockOptions; this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions); - this.messageBox = new ExecutingMessageBox (messageQueue, - compHelper, - () => outgoing.IsCompleted, - TransformProcess, - dataflowBlockOptions); + this.messageBox = new ExecutingMessageBox ( + messageQueue, compHelper, + () => outgoing.IsCompleted, TransformProcess, () => outgoing.Complete (), + dataflowBlockOptions); this.outgoing = new MessageOutgoingQueue (compHelper, () => messageQueue.IsCompleted); this.vault = new MessageVault (); } @@ -107,14 +106,13 @@ namespace System.Threading.Tasks.Dataflow return outgoing.TryReceiveAll (out items); } - void TransformProcess (int maxMessages) + bool TransformProcess () { - int i = 0; ITargetBlock target; TInput input; - while ((maxMessages == DataflowBlockOptions.Unbounded || i++ < maxMessages) - && messageQueue.TryTake (out input)) { + var dequeued = messageQueue.TryTake (out input); + if (dequeued) { TOutput output = transformer (input); if ((target = targets.Current) != null) @@ -125,6 +123,8 @@ namespace System.Threading.Tasks.Dataflow if (!outgoing.IsEmpty && (target = targets.Current) != null) outgoing.ProcessForTarget (target, this, false, ref headers); + + return dequeued; } public void Complete () @@ -134,7 +134,7 @@ namespace System.Threading.Tasks.Dataflow public void Fault (Exception ex) { - compHelper.Fault (ex); + compHelper.RequestFault (ex); } public Task Completion { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs index 3ef07969691..76af87312b2 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs @@ -58,11 +58,10 @@ namespace System.Threading.Tasks.Dataflow this.transformer = transformer; this.dataflowBlockOptions = dataflowBlockOptions; this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions); - this.messageBox = new ExecutingMessageBox (messageQueue, - compHelper, - () => outgoing.IsCompleted, - TransformProcess, - dataflowBlockOptions); + this.messageBox = new ExecutingMessageBox ( + messageQueue, compHelper, + () => outgoing.IsCompleted, TransformProcess, () => outgoing.Complete (), + dataflowBlockOptions); this.outgoing = new MessageOutgoingQueue (compHelper, () => messageQueue.IsCompleted); this.vault = new MessageVault (); } @@ -107,29 +106,29 @@ namespace System.Threading.Tasks.Dataflow return outgoing.TryReceiveAll (out items); } - void TransformProcess (int maxMessages) + bool TransformProcess () { - int i = 0; ITargetBlock target; TInput input; - while ((maxMessages == DataflowBlockOptions.Unbounded || i++ < maxMessages) - && messageQueue.TryTake (out input)) { + var dequeued = messageQueue.TryTake (out input); + if (dequeued) { var result = transformer (input); - if (result == null) - continue; - - foreach (var item in result) { - if ((target = targets.Current) != null) - target.OfferMessage (headers.Increment (), item, this, false); - else - outgoing.AddData (item); + if (result != null) { + foreach (var item in result) { + if ((target = targets.Current) != null) + target.OfferMessage (headers.Increment (), item, this, false); + else + outgoing.AddData (item); + } } } if (!outgoing.IsEmpty && (target = targets.Current) != null) outgoing.ProcessForTarget (target, this, false, ref headers); + + return dequeued; } public void Complete () @@ -139,7 +138,7 @@ namespace System.Threading.Tasks.Dataflow public void Fault (Exception ex) { - compHelper.Fault (ex); + compHelper.RequestFault (ex); } public Task Completion { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs index c8e4310fc81..84e4c5add8e 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs @@ -156,7 +156,7 @@ namespace System.Threading.Tasks.Dataflow public void Fault (Exception ex) { - compHelper.Fault (ex); + compHelper.RequestFault (ex); } public Task Completion { diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ActionBlockTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ActionBlockTest.cs index 856d703f5d2..84ece0aa355 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ActionBlockTest.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ActionBlockTest.cs @@ -24,10 +24,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -using System; using System.Linq; using System.Threading; -using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using NUnit.Framework; @@ -41,13 +39,13 @@ namespace MonoTests.System.Threading.Tasks.Dataflow public void BasicUsageTest () { bool[] array = new bool[3]; - CountdownEvent evt = new CountdownEvent (array.Length); - ActionBlock block = new ActionBlock ((i) => { array[i] = true; evt.Signal (); }); + var evt = new CountdownEvent (array.Length); + var block = new ActionBlock (i => { array[i] = true; evt.Signal (); }); for (int i = 0; i < array.Length; ++i) Assert.IsTrue (block.Post (i), "Not accepted"); - evt.Wait (); + Assert.IsTrue (evt.Wait (500)); Assert.IsTrue (array.All (b => b), "Some false"); } diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionHelperTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionHelperTest.cs index f4a8690bd16..ca8309e21b2 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionHelperTest.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionHelperTest.cs @@ -57,7 +57,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow public void FaultedTest () { Exception ex = new ApplicationException ("Foobar"); - helper.Fault (ex); + helper.RequestFault (ex); Task completed = helper.Completion; Assert.IsNotNull (completed); diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionTest.cs index be0c6b130ab..13e5af3b2f2 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionTest.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionTest.cs @@ -37,41 +37,38 @@ namespace MonoTests.System.Threading.Tasks.Dataflow [TestFixture] public class CompletionTest { - [Test] - public void WithNoElements () - { - var block = new BufferBlock (); - block.Post (42); - block.Complete (); - Console.WriteLine (block.Completion.IsCompleted); - Console.WriteLine (block.Completion.Status); - block.Receive (); - Console.WriteLine (block.Completion.IsCompleted); - Console.WriteLine (block.Completion.Status); - } - [Test] public void WithElementsStillLingering () { var block = new BufferBlock (); - block.Post (42); + Assert.IsTrue (block.Post (42)); block.Complete (); - Console.WriteLine (block.Completion.IsCompleted); - Console.WriteLine (block.Completion.Status); - block.Receive (); - Console.WriteLine (block.Completion.IsCompleted); - Console.WriteLine (block.Completion.Status); + + Assert.IsFalse (block.Completion.Wait (100)); + Assert.IsFalse (block.Completion.IsCompleted); + Assert.AreEqual (TaskStatus.WaitingForActivation, block.Completion.Status); + + Assert.AreEqual (42, block.Receive ()); + + Assert.IsTrue (block.Completion.Wait (100)); + Assert.IsTrue (block.Completion.IsCompleted); + Assert.AreEqual (TaskStatus.RanToCompletion, block.Completion.Status); } [Test] public void EmptyAfterReceive () { var block = new BufferBlock (); - block.Post (42); + Assert.IsTrue (block.Post (42)); block.Complete (); + + Assert.IsFalse (block.Completion.Wait (100)); Assert.IsFalse (block.Completion.IsCompleted); Assert.AreEqual (TaskStatus.WaitingForActivation, block.Completion.Status); + block.Receive (); + + Assert.IsTrue (block.Completion.Wait (100)); Assert.IsTrue (block.Completion.IsCompleted); Assert.AreEqual (TaskStatus.RanToCompletion, block.Completion.Status); } @@ -80,10 +77,13 @@ namespace MonoTests.System.Threading.Tasks.Dataflow public void WithElementsStillLingeringButFaulted () { var block = new BufferBlock (); - block.Post (42); + Assert.IsTrue (block.Post (42)); ((IDataflowBlock)block).Fault (new Exception ()); + + Assert.Throws (() => block.Completion.Wait (100)); Assert.IsTrue (block.Completion.IsCompleted); Assert.AreEqual (TaskStatus.Faulted, block.Completion.Status); + Assert.IsFalse (block.Post (43)); } } -} +} \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ExecutionBlocksTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ExecutionBlocksTest.cs new file mode 100644 index 00000000000..68c27f7b50c --- /dev/null +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ExecutionBlocksTest.cs @@ -0,0 +1,106 @@ +// ExecutionBlocksTest.cs +// +// Copyright (c) 2012 Petr Onderka +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks.Dataflow; +using NUnit.Framework; + +namespace MonoTests.System.Threading.Tasks.Dataflow { + [TestFixture] + public class ExecutionBlocksTest { + static IEnumerable> GetExecutionBlocksWithAction(Action action) + { + yield return new ActionBlock (i => action()); + yield return new TransformBlock (i => + { + action (); + return i; + }); + yield return new TransformManyBlock (i => + { + action (); + return new int[0]; + }); + } + + [Test] + public void ExceptionTest () + { + var blocks = GetExecutionBlocksWithAction (() => { throw new Exception (); }); + foreach (var block in blocks) { + Assert.IsFalse (block.Completion.Wait (100)); + + block.Post (1); + + var ae = + Assert.Throws (() => block.Completion.Wait (100)); + Assert.AreEqual (1, ae.InnerExceptions.Count); + Assert.AreEqual (typeof(Exception), ae.InnerException.GetType ()); + } + } + + [Test] + public void NoProcessingAfterFaultTest () + { + int shouldRun = 1; + int ranAfterFault = 0; + var evt = new ManualResetEventSlim (); + + var blocks = GetExecutionBlocksWithAction (() => + { + if (Thread.VolatileRead (ref shouldRun) == 0) { + ranAfterFault++; + return; + } + + evt.Wait (); + }); + + foreach (var block in blocks) { + shouldRun = 1; + ranAfterFault = 0; + evt.Reset (); + + Assert.IsTrue (block.Post (1)); + Assert.IsTrue (block.Post (2)); + + Assert.IsFalse (block.Completion.Wait (100)); + Assert.AreEqual (0, ranAfterFault); + + block.Fault (new Exception ()); + + Assert.IsFalse (block.Completion.Wait (100)); + + shouldRun = 0; + evt.Set (); + + Assert.Throws (() => block.Completion.Wait (100)); + + Thread.Sleep (100); + + Assert.AreEqual (0, Thread.VolatileRead (ref ranAfterFault)); + } + } + } +} \ No newline at end of file diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/OptionsTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/OptionsTest.cs index fe1a8a15307..86beb1bdefe 100644 --- a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/OptionsTest.cs +++ b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/OptionsTest.cs @@ -125,7 +125,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow { q => new TransformManyBlock (i => { q.Enqueue (Tuple.Create (i, Task.CurrentId.Value)); - return new int[0]; + return new[] { i }; }, options) }; @@ -139,7 +139,14 @@ namespace MonoTests.System.Threading.Tasks.Dataflow { block.Post (i); block.Complete (); - Assert.IsTrue (block.Completion.Wait (500), queue.Count.ToString()); + + var source = block as ISourceBlock; + if (source != null) { + Assert.IsFalse (block.Completion.Wait (100)); + + source.LinkTo (new BufferBlock ()); + } + Assert.IsTrue (block.Completion.Wait (500)); CollectionAssert.AreEquivalent ( Enumerable.Range (0, 100), queue.Select (t => t.Item1)); @@ -174,7 +181,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow { foreach (var last in lasts) times [last.Value] = Tuple.Create ( - times [last.Value].Item2, last.Key); + times [last.Value].Item1, last.Key); int maxDop = 0; int dop = 0; @@ -195,23 +202,25 @@ namespace MonoTests.System.Threading.Tasks.Dataflow { [Test] public void MaxDegreeOfParallelismTest() { - for (int i = 0; i < 10;i++) + // loop to better test for race conditions + // some that showed in this test were quite rare + for (int i = 0; i < 10; i++) { - var options = new ExecutionDataflowBlockOptions(); + var options = new ExecutionDataflowBlockOptions (); foreach (var taskIds in GetTaskIdsForExecutionsOptions(options)) - Assert.AreEqual(1, CalculateDegreeOfParallelism (taskIds)); + Assert.AreEqual (1, CalculateDegreeOfParallelism (taskIds)); options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 }; - foreach (var taskIds in GetTaskIdsForExecutionsOptions(options)) + foreach (var taskIds in GetTaskIdsForExecutionsOptions (options)) Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 2); options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }; - foreach (var taskIds in GetTaskIdsForExecutionsOptions(options)) - Assert.LessOrEqual(CalculateDegreeOfParallelism (taskIds), 4); + foreach (var taskIds in GetTaskIdsForExecutionsOptions (options)) + Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 4); options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 }; - foreach (var taskIds in GetTaskIdsForExecutionsOptions(options)) - Assert.LessOrEqual(CalculateDegreeOfParallelism (taskIds), taskIds.Length); + foreach (var taskIds in GetTaskIdsForExecutionsOptions (options)) + Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), taskIds.Length); } } -- 2.25.1