../../build/common/MonoTODOAttribute.cs
Assembly/AssemblyInfo.cs
System.Threading.Tasks/ConcurrentExclusiveSchedulerPair.cs
+System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs
System.Threading.Tasks.Dataflow/DataflowBlockOptions.cs
System.Threading.Tasks.Dataflow/DataflowMessageHeader.cs
System.Threading.Tasks.Dataflow/DataflowMessageStatus.cs
System.Threading.Tasks.Dataflow/IReceivableSourceBlock.cs
System.Threading.Tasks.Dataflow/ISourceBlock.cs
System.Threading.Tasks.Dataflow/ITargetBlock.cs
+System.Threading.Tasks.Dataflow/CompletionHelper.cs
+System.Threading.Tasks.Dataflow/MessageBox.cs
+System.Threading.Tasks.Dataflow/MessageOutgoingQueue.cs
+System.Threading.Tasks.Dataflow/MessageVault.cs
+System.Threading.Tasks.Dataflow/PassingMessageBox.cs
+System.Threading.Tasks.Dataflow/TargetBuffer.cs
+../corlib/System.Threading/AtomicBoolean.cs
--- /dev/null
+// ActionBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+
+namespace System.Threading.Tasks.Dataflow
+{
+ /* This is used to implement a default behavior for Dataflow completion tracking
+ * that is the Completion property and Complete/Fault method combo
+ */
+ internal struct CompletionHelper
+ {
+ TaskCompletionSource<object> source;
+
+ public static CompletionHelper GetNew ()
+ {
+ CompletionHelper temp = new CompletionHelper ();
+ temp.source = new TaskCompletionSource<object> ();
+ return temp;
+ }
+
+ public Task Completion {
+ get {
+ return source.Task;
+ }
+ }
+
+ public void Complete ()
+ {
+ source.TrySetResult (null);
+ }
+
+ public void Fault (Exception ex)
+ {
+ source.SetException (ex);
+ }
+ }
+}
+
+#endif
\ No newline at end of file
--- /dev/null
+// ExecutingMessageBox.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+ internal class ExecutingMessageBox<TInput> : MessageBox<TInput>
+ {
+ readonly ExecutionDataflowBlockOptions dataflowBlockOptions;
+ readonly BlockingCollection<TInput> messageQueue;
+ readonly Action processQueue;
+ readonly CompletionHelper compHelper;
+
+ AtomicBoolean started = new AtomicBoolean ();
+
+ public ExecutingMessageBox (BlockingCollection<TInput> messageQueue,
+ CompletionHelper compHelper,
+ Func<bool> externalCompleteTester,
+ Action processQueue,
+ ExecutionDataflowBlockOptions dataflowBlockOptions) : base (messageQueue, compHelper, externalCompleteTester)
+ {
+ this.messageQueue = messageQueue;
+ this.dataflowBlockOptions = dataflowBlockOptions;
+ this.processQueue = processQueue;
+ this.compHelper = compHelper;
+ }
+
+ protected override void EnsureProcessing ()
+ {
+ if (!started.TryRelaxedSet ())
+ return;
+
+ Task[] tasks = new Task[dataflowBlockOptions.MaxDegreeOfParallelism];
+ for (int i = 0; i < tasks.Length; ++i)
+ tasks[i] = Task.Factory.StartNew (processQueue);
+ Task.Factory.ContinueWhenAll (tasks, (_) => {
+ started.Value = false;
+ // Re-run ourselves in case of a race when data is available in the end
+ if (messageQueue.Count > 0)
+ EnsureProcessing ();
+ else if (messageQueue.IsCompleted)
+ compHelper.Complete ();
+ });
+ }
+ }
+}
+
+#endif
--- /dev/null
+// MessageBox.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+ /* In MessageBox we store message that have been offered to us so that they can be
+ * later processed
+ */
+ internal class MessageBox<TInput>
+ {
+ readonly CompletionHelper compHelper;
+ readonly BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
+ readonly Func<bool> externalCompleteTester;
+
+ public MessageBox (BlockingCollection<TInput> messageQueue, CompletionHelper compHelper, Func<bool> externalCompleteTester)
+ {
+ this.compHelper = compHelper;
+ this.messageQueue = messageQueue;
+ this.externalCompleteTester = externalCompleteTester;
+ }
+
+ public DataflowMessageStatus OfferMessage (ITargetBlock<TInput> target,
+ DataflowMessageHeader messageHeader,
+ TInput messageValue,
+ ISourceBlock<TInput> source,
+ bool consumeToAccept)
+ {
+ if (!messageHeader.IsValid)
+ return DataflowMessageStatus.Declined;
+ if (messageQueue.IsAddingCompleted)
+ return DataflowMessageStatus.DecliningPermanently;
+
+ if (consumeToAccept) {
+ bool consummed;
+ if (!source.ReserveMessage (messageHeader, target))
+ return DataflowMessageStatus.NotAvailable;
+ messageValue = source.ConsumeMessage (messageHeader, target, out consummed);
+ if (!consummed)
+ return DataflowMessageStatus.NotAvailable;
+ }
+
+ try {
+ messageQueue.Add (messageValue);
+ } catch (InvalidOperationException) {
+ // This is triggered either if the underlying collection didn't accept the item
+ // or if the messageQueue has been marked complete, either way it corresponds to a false
+ return DataflowMessageStatus.DecliningPermanently;
+ }
+ EnsureProcessing ();
+
+ VerifyCompleteness ();
+
+ return DataflowMessageStatus.Accepted;
+ }
+
+ protected virtual void EnsureProcessing ()
+ {
+
+ }
+
+ public void Complete ()
+ {
+ // Make message queue complete
+ messageQueue.CompleteAdding ();
+ VerifyCompleteness ();
+ }
+
+ void VerifyCompleteness ()
+ {
+ if (messageQueue.IsCompleted && externalCompleteTester ())
+ compHelper.Complete ();
+ }
+ }
+}
+
+#endif
--- /dev/null
+// MessageOutgoingQueue.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+ /* This class handles outgoing message that get queued when there is no
+ * block on the other end to proces it. It also allows receive operations.
+ */
+ internal class MessageOutgoingQueue<T>
+ {
+ readonly ConcurrentQueue<T> store = new ConcurrentQueue<T> ();
+ readonly BlockingCollection<T> outgoing;
+ readonly CompletionHelper compHelper;
+ readonly Func<bool> externalCompleteTester;
+
+ public MessageOutgoingQueue (CompletionHelper compHelper, Func<bool> externalCompleteTester)
+ {
+ this.outgoing = new BlockingCollection<T> (store);
+ this.compHelper = compHelper;
+ this.externalCompleteTester = externalCompleteTester;
+ }
+
+ public void AddData (T data)
+ {
+ try {
+ outgoing.Add (data);
+ } catch (InvalidOperationException) {
+ VerifyCompleteness ();
+ }
+ }
+
+ IEnumerable<T> GetNonBlockingConsumingEnumerable ()
+ {
+ T temp;
+ while (outgoing.TryTake (out temp))
+ yield return temp;
+ }
+
+ public void ProcessForTarget (ITargetBlock<T> target, ISourceBlock<T> source, bool consumeToAccept, ref DataflowMessageHeader headers)
+ {
+ if (target == null)
+ return;
+
+ foreach (var output in GetNonBlockingConsumingEnumerable ())
+ target.OfferMessage (headers.Increment (), output, source, consumeToAccept);
+ }
+
+ public bool TryReceive (Predicate<T> filter, out T item)
+ {
+ item = default (T);
+
+ T result;
+ bool success = false;
+ if (store.TryPeek (out result) && (filter == null || filter (result)))
+ success = outgoing.TryTake (out item);
+
+ VerifyCompleteness ();
+
+ return success;
+ }
+
+ public bool TryReceiveAll (out IList<T> items)
+ {
+ items = null;
+
+ if (store.IsEmpty)
+ return false;
+
+ List<T> list = new List<T> (outgoing.Count);
+ if (list.Count == 0)
+ return false;
+
+ list.AddRange (GetNonBlockingConsumingEnumerable ());
+ items = list;
+
+ VerifyCompleteness ();
+
+ return list.Count > 0;
+ }
+
+ public void Complete ()
+ {
+ outgoing.CompleteAdding ();
+ VerifyCompleteness ();
+ }
+
+ void VerifyCompleteness ()
+ {
+ if (outgoing.IsCompleted && externalCompleteTester ())
+ compHelper.Complete ();
+ }
+
+ public bool IsEmpty {
+ get {
+ return store.IsEmpty;
+ }
+ }
+
+ public int Count {
+ get {
+ return store.Count;
+ }
+ }
+
+ public bool IsCompleted {
+ get {
+ return outgoing.IsCompleted;
+ }
+ }
+ }
+}
+
+#endif
--- /dev/null
+// MessageVault.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+ /* MessageVault is used to store message value & header that have been proposed via OfferMessage
+ * so that target block can then Consume them
+ */
+ internal class MessageVault<T>
+ {
+ class ReservationSlot
+ {
+ public T Data;
+ public IDataflowBlock Reserved;
+
+ public ReservationSlot (T data)
+ {
+ Data = data;
+ Reserved = null;
+ }
+ }
+
+ ConcurrentDictionary<DataflowMessageHeader, ReservationSlot> store = new ConcurrentDictionary<DataflowMessageHeader, ReservationSlot> ();
+
+ public bool StoreMessage (DataflowMessageHeader header, T data)
+ {
+ if (!header.IsValid)
+ throw new ArgumentException ("header", "Header is is not valid");
+
+ return store.TryAdd (header, new ReservationSlot (data));
+ }
+
+ public T ConsumeMessage (DataflowMessageHeader header, IDataflowBlock target, out bool messageConsummed)
+ {
+ messageConsummed = false;
+ if (!header.IsValid)
+ throw new ArgumentException ("header", "Header is is not valid");
+ if (target == null)
+ throw new ArgumentNullException ("target");
+
+ ReservationSlot slot;
+ if (!store.TryRemove (header, out slot) || slot.Reserved != target)
+ return default (T);
+
+ messageConsummed = true;
+ return slot.Data;
+ }
+
+ public bool ReserveMessage (DataflowMessageHeader header, IDataflowBlock target)
+ {
+ if (!header.IsValid)
+ throw new ArgumentException ("header", "Header is is not valid");
+ if (target == null)
+ throw new ArgumentNullException ("target");
+
+ ReservationSlot slot;
+ if (!store.TryGetValue (header, out slot))
+ return false;
+
+ return Interlocked.CompareExchange (ref slot.Reserved, target, null) == null;
+ }
+
+ public void ReleaseReservation (DataflowMessageHeader header, IDataflowBlock target)
+ {
+ if (!header.IsValid)
+ throw new ArgumentException ("header", "Header is is not valid");
+ if (target == null)
+ throw new ArgumentNullException ("target");
+
+ ReservationSlot slot;
+ if (!store.TryGetValue (header, out slot))
+ return;
+
+ if (Interlocked.CompareExchange (ref slot.Reserved, null, target) != target)
+ throw new InvalidOperationException ("The target did not have the message reserved");
+ }
+ }
+}
+
+#endif
--- /dev/null
+// PassingMessageBox.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+ internal class PassingMessageBox<TInput> : MessageBox<TInput>
+ {
+ readonly DataflowBlockOptions dataflowBlockOptions;
+ readonly Action processQueue;
+
+ public PassingMessageBox (BlockingCollection<TInput> messageQueue,
+ CompletionHelper compHelper,
+ Func<bool> externalCompleteTester,
+ Action processQueue,
+ DataflowBlockOptions dataflowBlockOptions) : base (messageQueue, compHelper, externalCompleteTester)
+ {
+ this.dataflowBlockOptions = dataflowBlockOptions;
+ this.processQueue = processQueue;
+ }
+
+ protected override void EnsureProcessing ()
+ {
+ processQueue ();
+ }
+ }
+}
+
+#endif
--- /dev/null
+// TargetBuffer.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using System.Collections;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+ internal class TargetBuffer<T> : IEnumerable<ITargetBlock<T>>
+ {
+ ConcurrentQueue<TargetWaiter> targetWaiters = new ConcurrentQueue<TargetWaiter> ();
+
+ class TargetWaiter : IDisposable
+ {
+ public volatile bool Disabled;
+ public readonly ITargetBlock<T> Target;
+ public readonly bool UnlinkAfterOne;
+
+ ConcurrentQueue<TargetWaiter> queue;
+ AtomicBooleanValue removed;
+
+ public TargetWaiter (ITargetBlock<T> target, bool unlinkAfterOne, ConcurrentQueue<TargetWaiter> queue)
+ {
+ Target = target;
+ UnlinkAfterOne = unlinkAfterOne;
+ this.queue = queue;
+ }
+
+ public void Dispose ()
+ {
+ TargetWaiter t;
+ Disabled = true;
+
+ Thread.MemoryBarrier ();
+
+ if (queue.TryPeek (out t) && t == this && removed.TryRelaxedSet ()) {
+ queue.TryDequeue (out t);
+ } else {
+ SpinWait wait = new SpinWait ();
+ while (queue.TryPeek (out t) && t == this)
+ wait.SpinOnce ();
+ }
+ }
+ }
+
+ public IDisposable AddTarget (ITargetBlock<T> block, bool unlinkAfterOne)
+ {
+ TargetWaiter w = new TargetWaiter (block, unlinkAfterOne, targetWaiters);
+ targetWaiters.Enqueue (w);
+
+ return w;
+ }
+
+ public ITargetBlock<T> Current {
+ get {
+ TargetWaiter w;
+
+ while (true) {
+ if (!targetWaiters.TryPeek (out w))
+ return null;
+
+ if (w.Disabled == true) {
+ w.Dispose ();
+ continue;
+ } else if (w.UnlinkAfterOne) {
+ w.Dispose ();
+ }
+
+ return w.Target;
+ }
+ }
+ }
+
+ public IEnumerator<ITargetBlock<T>> GetEnumerator ()
+ {
+ return targetWaiters.Select (w => w.Target).GetEnumerator ();
+ }
+
+ IEnumerator IEnumerable.GetEnumerator ()
+ {
+ return targetWaiters.Select (w => w.Target).GetEnumerator ();
+ }
+ }
+}
+
+#endif
System.Threading.Tasks/ConcurrentExclusiveSchedulerPairTest.cs
System.Threading.Tasks.Dataflow/DataflowMessageHeaderTest.cs
+System.Threading.Tasks.Dataflow/CompletionHelperTest.cs
+../System.Threading.Tasks.Dataflow/CompletionHelper.cs
+
--- /dev/null
+#if NET_4_0
+//
+// CompletionHelperTest.cs
+//
+// Author:
+// Jérémie "garuma" Laval <jeremie.laval@gmail.com>
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+ [TestFixture]
+ public class CompletionHelperTest
+ {
+ CompletionHelper helper;
+
+ [SetUp]
+ public void Setup ()
+ {
+ helper = CompletionHelper.GetNew ();
+ }
+
+ [Test]
+ public void InitialStateTest ()
+ {
+ Task completed = helper.Completion;
+
+ Assert.IsNotNull (completed);
+ Assert.IsFalse (completed.IsCompleted);
+ }
+
+ [Test]
+ public void FaultedTest ()
+ {
+ Exception ex = new ApplicationException ("Foobar");
+ helper.Fault (ex);
+ Task completed = helper.Completion;
+
+ Assert.IsNotNull (completed);
+ Assert.IsTrue (completed.IsCompleted);
+ Assert.AreEqual (TaskStatus.Faulted, completed.Status);
+ Assert.AreEqual (ex, completed.Exception.InnerExceptions.First ());
+ }
+
+ [Test]
+ public void CompleteTest ()
+ {
+ helper.Complete ();
+ Task completed = helper.Completion;
+
+ Assert.IsNotNull (completed);
+ Assert.IsTrue (completed.IsCompleted);
+ Assert.IsFalse (completed.IsFaulted);
+ Assert.IsFalse (completed.IsCanceled);
+ }
+ }
+}
+#endif