Add plumbing internal classes used throughout the framework
authorJérémie Laval <jeremie.laval@gmail.com>
Wed, 10 Aug 2011 16:28:53 +0000 (18:28 +0200)
committerJérémie Laval <jeremie.laval@gmail.com>
Wed, 17 Aug 2011 18:03:46 +0000 (20:03 +0200)
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs [new file with mode: 0644]
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs [new file with mode: 0644]
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs [new file with mode: 0644]
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageOutgoingQueue.cs [new file with mode: 0644]
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageVault.cs [new file with mode: 0644]
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs [new file with mode: 0644]
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetBuffer.cs [new file with mode: 0644]
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionHelperTest.cs [new file with mode: 0644]

index 28f039f9520586842efb13e98974f2912c65d6df..64ba8161f0e5250a432dfde1b74831ebf3d6fc4d 100644 (file)
@@ -3,6 +3,7 @@
 ../../build/common/MonoTODOAttribute.cs
 Assembly/AssemblyInfo.cs
 System.Threading.Tasks/ConcurrentExclusiveSchedulerPair.cs
+System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs
 System.Threading.Tasks.Dataflow/DataflowBlockOptions.cs
 System.Threading.Tasks.Dataflow/DataflowMessageHeader.cs
 System.Threading.Tasks.Dataflow/DataflowMessageStatus.cs
@@ -13,3 +14,10 @@ System.Threading.Tasks.Dataflow/IPropagatorBlock.cs
 System.Threading.Tasks.Dataflow/IReceivableSourceBlock.cs
 System.Threading.Tasks.Dataflow/ISourceBlock.cs
 System.Threading.Tasks.Dataflow/ITargetBlock.cs
+System.Threading.Tasks.Dataflow/CompletionHelper.cs
+System.Threading.Tasks.Dataflow/MessageBox.cs
+System.Threading.Tasks.Dataflow/MessageOutgoingQueue.cs
+System.Threading.Tasks.Dataflow/MessageVault.cs
+System.Threading.Tasks.Dataflow/PassingMessageBox.cs
+System.Threading.Tasks.Dataflow/TargetBuffer.cs
+../corlib/System.Threading/AtomicBoolean.cs
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs
new file mode 100644 (file)
index 0000000..3e77856
--- /dev/null
@@ -0,0 +1,64 @@
+// ActionBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+
+namespace System.Threading.Tasks.Dataflow
+{
+       /* This is used to implement a default behavior for Dataflow completion tracking
+        * that is the Completion property and Complete/Fault method combo
+        */
+       internal struct CompletionHelper
+       {
+               TaskCompletionSource<object> source;
+
+               public static CompletionHelper GetNew ()
+               {
+                       CompletionHelper temp = new CompletionHelper ();
+                       temp.source = new TaskCompletionSource<object> ();
+                       return temp;
+               }
+
+               public Task Completion {
+                       get {
+                               return source.Task;
+                       }
+               }
+
+               public void Complete ()
+               {
+                       source.TrySetResult (null);
+               }
+
+               public void Fault (Exception ex)
+               {
+                       source.SetException (ex);
+               }
+       }
+}
+
+#endif
\ No newline at end of file
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs
new file mode 100644 (file)
index 0000000..653758a
--- /dev/null
@@ -0,0 +1,75 @@
+// ExecutingMessageBox.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+       internal class ExecutingMessageBox<TInput> : MessageBox<TInput>
+       {
+               readonly ExecutionDataflowBlockOptions dataflowBlockOptions;
+               readonly BlockingCollection<TInput> messageQueue;
+               readonly Action processQueue;
+               readonly CompletionHelper compHelper;
+
+               AtomicBoolean started = new AtomicBoolean ();
+               
+               public ExecutingMessageBox (BlockingCollection<TInput> messageQueue,
+                                           CompletionHelper compHelper,
+                                           Func<bool> externalCompleteTester,
+                                           Action processQueue,
+                                           ExecutionDataflowBlockOptions dataflowBlockOptions) : base (messageQueue, compHelper, externalCompleteTester)
+               {
+                       this.messageQueue = messageQueue;
+                       this.dataflowBlockOptions = dataflowBlockOptions;
+                       this.processQueue = processQueue;
+                       this.compHelper = compHelper;
+               }
+
+               protected override void EnsureProcessing ()
+               {
+                       if (!started.TryRelaxedSet ())
+                               return;
+
+                       Task[] tasks = new Task[dataflowBlockOptions.MaxDegreeOfParallelism];
+                       for (int i = 0; i < tasks.Length; ++i)
+                               tasks[i] = Task.Factory.StartNew (processQueue);
+                       Task.Factory.ContinueWhenAll (tasks, (_) => {
+                               started.Value = false;
+                               // Re-run ourselves in case of a race when data is available in the end
+                               if (messageQueue.Count > 0)
+                                       EnsureProcessing ();
+                               else if (messageQueue.IsCompleted)
+                                       compHelper.Complete ();
+                       });
+               }
+       }
+}
+
+#endif
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs
new file mode 100644 (file)
index 0000000..dfdab7e
--- /dev/null
@@ -0,0 +1,104 @@
+// MessageBox.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+       /* In MessageBox we store message that have been offered to us so that they can be
+        * later processed
+        */
+       internal class MessageBox<TInput>
+       {
+               readonly CompletionHelper compHelper;
+               readonly BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
+               readonly Func<bool> externalCompleteTester;
+
+               public MessageBox (BlockingCollection<TInput> messageQueue, CompletionHelper compHelper, Func<bool> externalCompleteTester)
+               {
+                       this.compHelper = compHelper;
+                       this.messageQueue = messageQueue;
+                       this.externalCompleteTester = externalCompleteTester;
+               }
+
+               public DataflowMessageStatus OfferMessage (ITargetBlock<TInput> target,
+                                                          DataflowMessageHeader messageHeader,
+                                                          TInput messageValue,
+                                                          ISourceBlock<TInput> source,
+                                                          bool consumeToAccept)
+               {
+                       if (!messageHeader.IsValid)
+                               return DataflowMessageStatus.Declined;
+                       if (messageQueue.IsAddingCompleted)
+                               return DataflowMessageStatus.DecliningPermanently;
+
+                       if (consumeToAccept) {
+                               bool consummed;
+                               if (!source.ReserveMessage (messageHeader, target))
+                                       return DataflowMessageStatus.NotAvailable;
+                               messageValue = source.ConsumeMessage (messageHeader, target, out consummed);
+                               if (!consummed)
+                                       return DataflowMessageStatus.NotAvailable;
+                       }
+
+                       try {
+                               messageQueue.Add (messageValue);
+                       } catch (InvalidOperationException) {
+                               // This is triggered either if the underlying collection didn't accept the item
+                               // or if the messageQueue has been marked complete, either way it corresponds to a false
+                               return DataflowMessageStatus.DecliningPermanently;
+                       }
+                       EnsureProcessing ();
+
+                       VerifyCompleteness ();
+
+                       return DataflowMessageStatus.Accepted;
+               }
+
+               protected virtual void EnsureProcessing ()
+               {
+
+               }
+
+               public void Complete ()
+               {
+                       // Make message queue complete
+                       messageQueue.CompleteAdding ();
+                       VerifyCompleteness ();
+               }
+
+               void VerifyCompleteness ()
+               {
+                       if (messageQueue.IsCompleted && externalCompleteTester ())
+                               compHelper.Complete ();
+               }
+       }
+}
+
+#endif
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageOutgoingQueue.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageOutgoingQueue.cs
new file mode 100644 (file)
index 0000000..7567347
--- /dev/null
@@ -0,0 +1,141 @@
+// MessageOutgoingQueue.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+       /* This class handles outgoing message that get queued when there is no
+        * block on the other end to proces it. It also allows receive operations.
+        */
+       internal class MessageOutgoingQueue<T>
+       {
+               readonly ConcurrentQueue<T> store = new ConcurrentQueue<T> ();
+               readonly BlockingCollection<T> outgoing;
+               readonly CompletionHelper compHelper;
+               readonly Func<bool> externalCompleteTester;
+
+               public MessageOutgoingQueue (CompletionHelper compHelper, Func<bool> externalCompleteTester)
+               {
+                       this.outgoing = new BlockingCollection<T> (store);
+                       this.compHelper = compHelper;
+                       this.externalCompleteTester = externalCompleteTester;
+               }
+
+               public void AddData (T data)
+               {
+                       try {
+                               outgoing.Add (data);
+                       } catch (InvalidOperationException) {
+                               VerifyCompleteness ();
+                       }
+               }
+
+               IEnumerable<T> GetNonBlockingConsumingEnumerable ()
+               {
+                       T temp;
+                       while (outgoing.TryTake (out temp))
+                               yield return temp;
+               }
+
+               public void ProcessForTarget (ITargetBlock<T> target, ISourceBlock<T> source, bool consumeToAccept, ref DataflowMessageHeader headers)
+               {
+                       if (target == null)
+                               return;
+
+                       foreach (var output in GetNonBlockingConsumingEnumerable ())
+                               target.OfferMessage (headers.Increment (), output, source, consumeToAccept);
+               }
+
+               public bool TryReceive (Predicate<T> filter, out T item)
+               {
+                       item = default (T);
+
+                       T result;
+                       bool success = false;
+                       if (store.TryPeek (out result) && (filter == null || filter (result)))
+                               success = outgoing.TryTake (out item);
+
+                       VerifyCompleteness ();
+
+                       return success;
+               }
+
+               public bool TryReceiveAll (out IList<T> items)
+               {
+                       items = null;
+
+                       if (store.IsEmpty)
+                               return false;
+
+                       List<T> list = new List<T> (outgoing.Count);
+                       if (list.Count == 0)
+                               return false;
+
+                       list.AddRange (GetNonBlockingConsumingEnumerable ());
+                       items = list;
+
+                       VerifyCompleteness ();
+
+                       return list.Count > 0;
+               }
+
+               public void Complete ()
+               {
+                       outgoing.CompleteAdding ();
+                       VerifyCompleteness ();
+               }
+
+               void VerifyCompleteness ()
+               {
+                       if (outgoing.IsCompleted && externalCompleteTester ())
+                               compHelper.Complete ();
+               }
+
+               public bool IsEmpty {
+                       get {
+                               return store.IsEmpty;
+                       }
+               }
+
+               public int Count {
+                       get {
+                               return store.Count;
+                       }
+               }
+
+               public bool IsCompleted {
+                       get {
+                               return outgoing.IsCompleted;
+                       }
+               }
+       }
+}
+
+#endif
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageVault.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageVault.cs
new file mode 100644 (file)
index 0000000..ef1c350
--- /dev/null
@@ -0,0 +1,108 @@
+// MessageVault.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+       /* MessageVault is used to store message value & header that have been proposed via OfferMessage
+        * so that target block can then Consume them
+        */
+       internal class MessageVault<T>
+       {
+               class ReservationSlot
+               {
+                       public T Data;
+                       public IDataflowBlock Reserved;
+
+                       public ReservationSlot (T data)
+                       {
+                               Data = data;
+                               Reserved = null;
+                       }
+               }
+
+               ConcurrentDictionary<DataflowMessageHeader, ReservationSlot> store = new ConcurrentDictionary<DataflowMessageHeader, ReservationSlot> ();
+
+               public bool StoreMessage (DataflowMessageHeader header, T data)
+               {
+                       if (!header.IsValid)
+                               throw new ArgumentException ("header", "Header is is not valid");
+
+                       return store.TryAdd (header, new ReservationSlot (data));
+               }
+
+               public T ConsumeMessage (DataflowMessageHeader header, IDataflowBlock target, out bool messageConsummed)
+               {
+                       messageConsummed = false;
+                       if (!header.IsValid)
+                               throw new ArgumentException ("header", "Header is is not valid");
+                       if (target == null)
+                               throw new ArgumentNullException ("target");
+
+                       ReservationSlot slot;
+                       if (!store.TryRemove (header, out slot) || slot.Reserved != target)
+                               return default (T);
+
+                       messageConsummed = true;
+                       return slot.Data;
+               }
+
+               public bool ReserveMessage (DataflowMessageHeader header, IDataflowBlock target)
+               {
+                       if (!header.IsValid)
+                               throw new ArgumentException ("header", "Header is is not valid");
+                       if (target == null)
+                               throw new ArgumentNullException ("target");
+
+                       ReservationSlot slot;
+                       if (!store.TryGetValue (header, out slot))
+                               return false;
+
+                       return Interlocked.CompareExchange (ref slot.Reserved, target, null) == null;
+               }
+
+               public void ReleaseReservation (DataflowMessageHeader header, IDataflowBlock target)
+               {
+                       if (!header.IsValid)
+                               throw new ArgumentException ("header", "Header is is not valid");
+                       if (target == null)
+                               throw new ArgumentNullException ("target");
+
+                       ReservationSlot slot;
+                       if (!store.TryGetValue (header, out slot))
+                               return;
+
+                       if (Interlocked.CompareExchange (ref slot.Reserved, null, target) != target)
+                               throw new InvalidOperationException ("The target did not have the message reserved");
+               }
+       }
+}
+
+#endif
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs
new file mode 100644 (file)
index 0000000..229e5b3
--- /dev/null
@@ -0,0 +1,56 @@
+// PassingMessageBox.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+       internal class PassingMessageBox<TInput> : MessageBox<TInput>
+       {
+               readonly DataflowBlockOptions dataflowBlockOptions;
+               readonly Action processQueue;
+               
+               public PassingMessageBox (BlockingCollection<TInput> messageQueue,
+                                         CompletionHelper compHelper,
+                                         Func<bool> externalCompleteTester,
+                                         Action processQueue,
+                                         DataflowBlockOptions dataflowBlockOptions) : base (messageQueue, compHelper, externalCompleteTester)
+               {
+                       this.dataflowBlockOptions = dataflowBlockOptions;
+                       this.processQueue = processQueue;
+               }
+
+               protected override void EnsureProcessing ()
+               {
+                       processQueue ();
+               }
+       }
+}
+
+#endif
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetBuffer.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetBuffer.cs
new file mode 100644 (file)
index 0000000..4ecd123
--- /dev/null
@@ -0,0 +1,113 @@
+// TargetBuffer.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using System.Collections;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+       internal class TargetBuffer<T> : IEnumerable<ITargetBlock<T>>
+       {
+               ConcurrentQueue<TargetWaiter> targetWaiters = new ConcurrentQueue<TargetWaiter> ();
+
+               class TargetWaiter : IDisposable
+               {
+                       public volatile bool Disabled;
+                       public readonly ITargetBlock<T> Target;
+                       public readonly bool UnlinkAfterOne;
+                       
+                       ConcurrentQueue<TargetWaiter> queue;
+                       AtomicBooleanValue removed;
+
+                       public TargetWaiter (ITargetBlock<T> target, bool unlinkAfterOne, ConcurrentQueue<TargetWaiter> queue)
+                       {
+                               Target = target;
+                               UnlinkAfterOne = unlinkAfterOne;
+                               this.queue = queue;
+                       }
+
+                       public void Dispose ()
+                       {
+                               TargetWaiter t;
+                               Disabled = true;
+
+                               Thread.MemoryBarrier ();
+
+                               if (queue.TryPeek (out t) && t == this && removed.TryRelaxedSet ()) {
+                                       queue.TryDequeue (out t);
+                               } else {
+                                       SpinWait wait = new SpinWait ();
+                                       while (queue.TryPeek (out t) && t == this)
+                                               wait.SpinOnce ();
+                               }
+                       }
+               }
+
+               public IDisposable AddTarget (ITargetBlock<T> block, bool unlinkAfterOne)
+               {
+                       TargetWaiter w = new TargetWaiter (block, unlinkAfterOne, targetWaiters);
+                       targetWaiters.Enqueue (w);
+
+                       return w;
+               }
+
+               public ITargetBlock<T> Current {
+                       get {
+                               TargetWaiter w;
+                               
+                               while (true) {
+                                       if (!targetWaiters.TryPeek (out w))
+                                               return null;
+
+                                       if (w.Disabled == true) {
+                                               w.Dispose ();
+                                               continue;
+                                       } else if (w.UnlinkAfterOne) {
+                                               w.Dispose ();
+                                       }
+
+                                       return w.Target;
+                               }
+                       }
+               }
+
+               public IEnumerator<ITargetBlock<T>> GetEnumerator ()
+               {
+                       return targetWaiters.Select (w => w.Target).GetEnumerator ();
+               }
+
+               IEnumerator IEnumerable.GetEnumerator ()
+               {
+                       return targetWaiters.Select (w => w.Target).GetEnumerator ();
+               }
+       }
+}
+
+#endif
index 3fd038196da383db8c305ffe55af496ec10d437f..afe60232f0bf3489bca11aa427d4d63397ed4f78 100644 (file)
@@ -1,2 +1,5 @@
 System.Threading.Tasks/ConcurrentExclusiveSchedulerPairTest.cs
 System.Threading.Tasks.Dataflow/DataflowMessageHeaderTest.cs
+System.Threading.Tasks.Dataflow/CompletionHelperTest.cs
+../System.Threading.Tasks.Dataflow/CompletionHelper.cs
+
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionHelperTest.cs b/mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionHelperTest.cs
new file mode 100644 (file)
index 0000000..4dd0a42
--- /dev/null
@@ -0,0 +1,84 @@
+#if NET_4_0
+// 
+// CompletionHelperTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <jeremie.laval@gmail.com>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+       [TestFixture]
+       public class CompletionHelperTest
+       {
+               CompletionHelper helper;
+
+               [SetUp]
+               public void Setup ()
+               {
+                       helper = CompletionHelper.GetNew ();
+               }
+
+               [Test]
+               public void InitialStateTest ()
+               {
+                       Task completed = helper.Completion;
+
+                       Assert.IsNotNull (completed);
+                       Assert.IsFalse (completed.IsCompleted);
+               }
+
+               [Test]
+               public void FaultedTest ()
+               {
+                       Exception ex = new ApplicationException ("Foobar");
+                       helper.Fault (ex);
+                       Task completed = helper.Completion;
+
+                       Assert.IsNotNull (completed);
+                       Assert.IsTrue (completed.IsCompleted);
+                       Assert.AreEqual (TaskStatus.Faulted, completed.Status);
+                       Assert.AreEqual (ex, completed.Exception.InnerExceptions.First ());
+               }
+
+               [Test]
+               public void CompleteTest ()
+               {
+                       helper.Complete ();
+                       Task completed = helper.Completion;
+
+                       Assert.IsNotNull (completed);
+                       Assert.IsTrue (completed.IsCompleted);
+                       Assert.IsFalse (completed.IsFaulted);
+                       Assert.IsFalse (completed.IsCanceled);
+               }
+       }
+}
+#endif