Fixed sending from WriteOnceBlock
authorPetr Onderka <gsvick@gmail.com>
Thu, 19 Jul 2012 21:06:40 +0000 (23:06 +0200)
committerPetr Onderka <gsvick@gmail.com>
Sun, 19 Aug 2012 22:00:17 +0000 (00:00 +0200)
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageVault.cs [deleted file]
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetBuffer.cs [deleted file]
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/WriteOnceBlockTest.cs

index 451c1121f7348a34b3cc8214115038a05dbc3ad2..f3a551c9e70d8422a37dcdb64bfacf5b7deaf436 100644 (file)
     <Compile Include="System.Threading.Tasks.Dataflow\JoinTarget.cs" />\r
    <Compile Include="System.Threading.Tasks.Dataflow\MessageBox.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\OutgoingQueue.cs" />\r
-   <Compile Include="System.Threading.Tasks.Dataflow\MessageVault.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\NameHelper.cs" />\r
    <Compile Include="System.Threading.Tasks.Dataflow\PassingMessageBox.cs" />
-   <Compile Include="System.Threading.Tasks.Dataflow\TargetBuffer.cs" />
    <Compile Include="..\corlib\System.Threading\AtomicBoolean.cs" />
    <Compile Include="System.Threading.Tasks.Dataflow\ActionBlock.cs" />
    <Compile Include="System.Threading.Tasks.Dataflow\BatchBlock.cs" />
index a286d22bb89711b1e8b508cb996b5bd54b5af2ba..14bc184f1f9ddef56ff714044a38e6ddb6dafffb 100644 (file)
@@ -19,9 +19,7 @@ System.Threading.Tasks.Dataflow/MessageBox.cs
 System.Threading.Tasks.Dataflow/OutgoingQueueBase.cs
 System.Threading.Tasks.Dataflow/OutgoingQueue.cs
 System.Threading.Tasks.Dataflow/BroadcastOutgoingQueue.cs
-System.Threading.Tasks.Dataflow/MessageVault.cs
 System.Threading.Tasks.Dataflow/PassingMessageBox.cs
-System.Threading.Tasks.Dataflow/TargetBuffer.cs
 System.Threading.Tasks.Dataflow/NameHelper.cs
 System.Threading.Tasks.Dataflow/TargetCollection.cs
 System.Threading.Tasks.Dataflow/JoinTarget.cs
index 95418759ce8f016bc30d574bf1bba9d32af5a46a..538e33dee30fd0ca88de953828d275a997993100 100644 (file)
@@ -102,14 +102,14 @@ namespace System.Threading.Tasks.Dataflow {
 
                public bool TryReceiveAll (out IList<T> items)
                {
-                       T[] originalItems;
-                       bool received = outgoing.TryReceiveAll (out originalItems);
-                       if (received && cloner != null)
-                               items = Array.ConvertAll (originalItems, new Converter<T, T> (cloner));
-                       else
-                               items = originalItems;
+                       T item;
+                       if (!TryReceive (null, out item)) {
+                               items = null;
+                               return false;
+                       }
 
-                       return received;
+                       items = new[] { item };
+                       return true;
                }
 
                void BroadcastProcess ()
index 3fb8e80fbe50054228261e95c3e9ac54de9037e3..ba345b3b1506fbcdce370e78e9debc88d225dda0 100644 (file)
@@ -79,18 +79,23 @@ namespace System.Threading.Tasks.Dataflow {
                        }
                }
 
+               public void DequeueItem()
+               {
+                       T item;
+                       if (Outgoing.TryTake (out item)) {
+                               DecreaseCounts (item);
+                               targets.SetCurrentItem (item);
+
+                               CurrentItem = item;
+                       }
+               }
+
                protected override void Process ()
                {
                        do {
                                ForceProcessing = false;
 
-                               T item;
-                               if (Outgoing.TryTake (out item)) {
-                                       DecreaseCounts (item);
-                                       targets.SetCurrentItem (item);
-
-                                       CurrentItem = item;
-                               }
+                               DequeueItem ();
 
                                targets.OfferItemToTargets ();
                        } while (!Store.IsEmpty || targets.NeedsProcessing);
@@ -189,17 +194,5 @@ namespace System.Threading.Tasks.Dataflow {
 
                        return false;
                }
-
-               public bool TryReceiveAll (out T[] items)
-               {
-                       T item;
-                       if (TryReceive (null, out item)) {
-                               items = new[] { item };
-                               return true;
-                       }
-
-                       items = null;
-                       return false;
-               }
        }
 }
\ No newline at end of file
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageVault.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageVault.cs
deleted file mode 100644 (file)
index d2b4e14..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-// MessageVault.cs
-//
-// Copyright (c) 2011 Jérémie "garuma" Laval
-//
-// Permission is hereby granted, free of charge, to any person obtaining a copy
-// of this software and associated documentation files (the "Software"), to deal
-// in the Software without restriction, including without limitation the rights
-// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-// copies of the Software, and to permit persons to whom the Software is
-// furnished to do so, subject to the following conditions:
-//
-// The above copyright notice and this permission notice shall be included in
-// all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-// THE SOFTWARE.
-//
-//
-
-
-using System;
-using System.Threading;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace System.Threading.Tasks.Dataflow
-{
-       /* MessageVault is used to store message value & header that have been proposed via OfferMessage
-        * so that target block can then Consume them
-        */
-       internal class MessageVault<T>
-       {
-               class ReservationSlot
-               {
-                       public T Data;
-                       public IDataflowBlock Reserved;
-
-                       public ReservationSlot (T data)
-                       {
-                               Data = data;
-                               Reserved = null;
-                       }
-               }
-
-               ConcurrentDictionary<DataflowMessageHeader, ReservationSlot> store = new ConcurrentDictionary<DataflowMessageHeader, ReservationSlot> ();
-
-               public bool StoreMessage (DataflowMessageHeader header, T data)
-               {
-                       if (!header.IsValid)
-                               throw new ArgumentException ("header", "Header is is not valid");
-
-                       return store.TryAdd (header, new ReservationSlot (data));
-               }
-
-               public T ConsumeMessage (DataflowMessageHeader header, IDataflowBlock target, out bool messageConsummed)
-               {
-                       messageConsummed = false;
-                       if (!header.IsValid)
-                               throw new ArgumentException ("header", "Header is is not valid");
-                       if (target == null)
-                               throw new ArgumentNullException ("target");
-
-                       ReservationSlot slot;
-                       if (!store.TryRemove (header, out slot) || slot.Reserved != target)
-                               return default (T);
-
-                       messageConsummed = true;
-                       return slot.Data;
-               }
-
-               public bool ReserveMessage (DataflowMessageHeader header, IDataflowBlock target)
-               {
-                       if (!header.IsValid)
-                               throw new ArgumentException ("header", "Header is is not valid");
-                       if (target == null)
-                               throw new ArgumentNullException ("target");
-
-                       ReservationSlot slot;
-                       if (!store.TryGetValue (header, out slot))
-                               return false;
-
-                       return Interlocked.CompareExchange (ref slot.Reserved, target, null) == null;
-               }
-
-               public void ReleaseReservation (DataflowMessageHeader header, IDataflowBlock target)
-               {
-                       if (!header.IsValid)
-                               throw new ArgumentException ("header", "Header is is not valid");
-                       if (target == null)
-                               throw new ArgumentNullException ("target");
-
-                       ReservationSlot slot;
-                       if (!store.TryGetValue (header, out slot))
-                               return;
-
-                       if (Interlocked.CompareExchange (ref slot.Reserved, null, target) != target)
-                               throw new InvalidOperationException ("The target did not have the message reserved");
-               }
-       }
-}
-
diff --git a/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetBuffer.cs b/mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TargetBuffer.cs
deleted file mode 100644 (file)
index a3db2f7..0000000
+++ /dev/null
@@ -1,114 +0,0 @@
-// TargetBuffer.cs
-//
-// Copyright (c) 2011 Jérémie "garuma" Laval
-//
-// Permission is hereby granted, free of charge, to any person obtaining a copy
-// of this software and associated documentation files (the "Software"), to deal
-// in the Software without restriction, including without limitation the rights
-// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-// copies of the Software, and to permit persons to whom the Software is
-// furnished to do so, subject to the following conditions:
-//
-// The above copyright notice and this permission notice shall be included in
-// all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-// THE SOFTWARE.
-//
-//
-
-
-using System;
-using System.Linq;
-using System.Threading.Tasks;
-using System.Collections;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace System.Threading.Tasks.Dataflow
-{
-       internal class TargetBuffer<T> : IEnumerable<ITargetBlock<T>>
-       {
-               ConcurrentQueue<TargetWaiter> targetWaiters = new ConcurrentQueue<TargetWaiter> ();
-
-               class TargetWaiter : IDisposable
-               {
-                       public volatile bool Disabled;
-                       public readonly ITargetBlock<T> Target;
-                       public readonly bool UnlinkAfterOne;
-                       
-                       ConcurrentQueue<TargetWaiter> queue;
-                       AtomicBooleanValue removed;
-
-                       public TargetWaiter (ITargetBlock<T> target, bool unlinkAfterOne, ConcurrentQueue<TargetWaiter> queue)
-                       {
-                               Target = target;
-                               UnlinkAfterOne = unlinkAfterOne;
-                               this.queue = queue;
-                       }
-
-                       public void Dispose ()
-                       {
-                               TargetWaiter t;
-                               Disabled = true;
-
-                               Thread.MemoryBarrier ();
-
-                               if (queue.TryPeek (out t) && t == this && removed.TryRelaxedSet ()) {
-                                       queue.TryDequeue (out t);
-                               } else {
-                                       SpinWait wait = new SpinWait ();
-                                       while (queue.TryPeek (out t) && t == this)
-                                               wait.SpinOnce ();
-                               }
-                       }
-               }
-
-               public IDisposable AddTarget (ITargetBlock<T> target, bool unlinkAfterOne)
-               {
-                       if (target == null)
-                               throw new ArgumentNullException("target");
-
-                       TargetWaiter w = new TargetWaiter (target, unlinkAfterOne, targetWaiters);
-                       targetWaiters.Enqueue (w);
-
-                       return w;
-               }
-
-               public ITargetBlock<T> Current {
-                       get {
-                               TargetWaiter w;
-                               
-                               while (true) {
-                                       if (!targetWaiters.TryPeek (out w))
-                                               return null;
-
-                                       if (w.Disabled == true) {
-                                               w.Dispose ();
-                                               continue;
-                                       } else if (w.UnlinkAfterOne) {
-                                               w.Dispose ();
-                                       }
-
-                                       return w.Target;
-                               }
-                       }
-               }
-
-               public IEnumerator<ITargetBlock<T>> GetEnumerator ()
-               {
-                       return targetWaiters.Select (w => w.Target).GetEnumerator ();
-               }
-
-               IEnumerator IEnumerable.GetEnumerator ()
-               {
-                       return targetWaiters.Select (w => w.Target).GetEnumerator ();
-               }
-       }
-}
-
index c4ce4ff548dd6e65809f25bbf9bf1811623adebd..cb2997d061ccad7bd7feac24e1f7795037fa3605 100644 (file)
@@ -1,6 +1,7 @@
 // WriteOnceBlock.cs
 //
 // Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
 //
 // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to deal
 using System.Collections.Generic;
 using System.Collections.Concurrent;
 
-namespace System.Threading.Tasks.Dataflow
-{
-       public sealed class WriteOnceBlock<T> : IPropagatorBlock<T, T>, ITargetBlock<T>, IDataflowBlock, ISourceBlock<T>, IReceivableSourceBlock<T>
-       {
-               static readonly DataflowBlockOptions defaultOptions = new DataflowBlockOptions ();
-
+namespace System.Threading.Tasks.Dataflow {
+       public sealed class WriteOnceBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T> {
                readonly CompletionHelper compHelper;
                readonly BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
                readonly MessageBox<T> messageBox;
-               readonly MessageVault<T> vault;
                readonly DataflowBlockOptions dataflowBlockOptions;
                readonly Func<T, T> cloner;
-               TargetBuffer<T> targets = new TargetBuffer<T> ();
-               DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
-
+               readonly BroadcastOutgoingQueue<T> outgoing;
                AtomicBooleanValue written;
-               bool ready;
-               T finalValue;
 
-               public WriteOnceBlock (Func<T, T> cloner) : this (cloner, defaultOptions)
+               public WriteOnceBlock (Func<T, T> cloner)
+                       : this (cloner, DataflowBlockOptions.Default)
                {
-
                }
 
-               public WriteOnceBlock (Func<T, T> cloner, DataflowBlockOptions dataflowBlockOptions)
+               public WriteOnceBlock (Func<T, T> cloner,
+                                      DataflowBlockOptions dataflowBlockOptions)
                {
                        if (dataflowBlockOptions == null)
                                throw new ArgumentNullException ("dataflowBlockOptions");
@@ -56,108 +49,84 @@ namespace System.Threading.Tasks.Dataflow
                        this.dataflowBlockOptions = dataflowBlockOptions;
                        this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
                        this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper,
-                               () => true, _ => BroadcastProcess (), dataflowBlockOptions);
-                       this.vault = new MessageVault<T> ();
+                               () => true, _ => BroadcastProcess (), dataflowBlockOptions,
+                               canAccept: () => written.TrySet ());
+                       this.outgoing = new BroadcastOutgoingQueue<T> (this, compHelper,
+                               () => messageQueue.IsCompleted, messageBox.DecreaseCount,
+                               dataflowBlockOptions, cloner != null);
                }
 
                public DataflowMessageStatus OfferMessage (
                        DataflowMessageHeader messageHeader, T messageValue,
                        ISourceBlock<T> source, bool consumeToAccept)
                {
-                       if (!messageHeader.IsValid)
-                               throw new ArgumentException ("The messageHeader is not valid.",
-                                       "messageHeader");
-                       if (consumeToAccept && source == null)
-                               throw new ArgumentException (
-                                       "consumeToAccept may only be true if provided with a non-null source.",
-                                       "consumeToAccept");
-
-                       if (written.TryRelaxedSet ()) {
-                               Thread.MemoryBarrier ();
-                               finalValue = messageValue;
-                               Thread.MemoryBarrier ();
-                               ready = true;
-                               return messageBox.OfferMessage (messageHeader, finalValue, source, consumeToAccept);
-                       } else {
-                               return DataflowMessageStatus.DecliningPermanently;
-                       }
+                       var result = messageBox.OfferMessage (messageHeader, messageValue, source,
+                               consumeToAccept);
+                       if (result == DataflowMessageStatus.Accepted)
+                               messageQueue.CompleteAdding ();
+                       return result;
                }
 
-               public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)
+               public IDisposable LinkTo (ITargetBlock<T> target,
+                                          DataflowLinkOptions linkOptions)
                {
-                       if (linkOptions == null)
-                               throw new ArgumentNullException("linkOptions");
-
-                       return targets.AddTarget (target, true);
+                       return outgoing.AddTarget (target, linkOptions);
                }
 
-               public T ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
+               public T ConsumeMessage (DataflowMessageHeader messageHeader,
+                                        ITargetBlock<T> target, out bool messageConsumed)
                {
-                       return cloner(vault.ConsumeMessage (messageHeader, target, out messageConsumed));
+                       T message = outgoing.ConsumeMessage (
+                               messageHeader, target, out messageConsumed);
+                       if (messageConsumed && cloner != null)
+                               message = cloner (message);
+                       return message;
                }
 
-               public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+               public void ReleaseReservation (DataflowMessageHeader messageHeader,
+                                               ITargetBlock<T> target)
                {
-                       vault.ReleaseReservation (messageHeader, target);
+                       outgoing.ReleaseReservation (messageHeader, target);
                }
 
-               public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+               public bool ReserveMessage (DataflowMessageHeader messageHeader,
+                                           ITargetBlock<T> target)
                {
-                       return vault.ReserveMessage (messageHeader, target);
+                       return outgoing.ReserveMessage (messageHeader, target);
                }
 
                public bool TryReceive (Predicate<T> filter, out T item)
                {
-                       item = default (T);
-                       if (!written.Value)
-                               return false;
-
-                       if (!ready) {
-                               SpinWait spin = new SpinWait ();
-                               while (!ready)
-                                       spin.SpinOnce ();
-                       }
-
-                       if (filter == null || filter (finalValue)) {
-                               item = cloner != null ? cloner (finalValue) : finalValue;
-                               return true;
-                       }
-
-                       return false;
+                       var received = outgoing.TryReceive (filter, out item);
+                       if (received && cloner != null)
+                               item = cloner (item);
+                       return received;
                }
 
                public bool TryReceiveAll (out IList<T> items)
                {
-                       items = null;
-                       if (!written.Value)
-                               return false;
-
                        T item;
-                       if (!TryReceive (null, out item))
+                       if (!TryReceive (null, out item)) {
+                               items = null;
                                return false;
+                       }
 
-                       items = new T[] { item };
+                       items = new[] { item };
                        return true;
                }
 
                void BroadcastProcess ()
                {
-                       T input;
-
-                       if (!messageQueue.TryTake (out input))
-                               return;
-
-                       foreach (var target in targets) {
-                               DataflowMessageHeader header = headers.Increment ();
-                               if (cloner != null)
-                                       vault.StoreMessage (header, input);
-                               target.OfferMessage (header, input, this, cloner != null);
-                       }
+                       T item;
+                       if (messageQueue.TryTake (out item))
+                               outgoing.AddData (item);
+                       outgoing.DequeueItem ();
                }
 
                public void Complete ()
                {
                        messageBox.Complete ();
+                       outgoing.Complete ();
                }
 
                public void Fault (Exception exception)
@@ -166,9 +135,7 @@ namespace System.Threading.Tasks.Dataflow
                }
 
                public Task Completion {
-                       get {
-                               return compHelper.Completion;
-                       }
+                       get { return compHelper.Completion; }
                }
 
                public override string ToString ()
index 4eb9f606d399eed40a43761aed348f3cf6c228e4..1d59b64bd611ac5982589b05176bef0d17380084 100644 (file)
@@ -1,10 +1,11 @@
-// 
 // WriteOnceBlockTest.cs
 //  
 // Author:
 //       Jérémie "garuma" Laval <jeremie.laval@gmail.com>
+//       Petr Onderka <gsvick@gmail.com>
 // 
 // Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
 // 
 // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to deal
 // THE SOFTWARE.
 
 using System;
-using System.Linq;
 using System.Threading;
-using System.Threading.Tasks;
 using System.Collections.Generic;
 using System.Threading.Tasks.Dataflow;
-
 using NUnit.Framework;
 
-namespace MonoTests.System.Threading.Tasks.Dataflow
-{
+namespace MonoTests.System.Threading.Tasks.Dataflow {
        [TestFixture]
-       public class WriteOnceBlockTest
-       {
+       public class WriteOnceBlockTest {
                [Test]
                public void BasicUsageTest ()
                {
                        bool act1 = false, act2 = false;
                        var evt = new CountdownEvent (2);
 
-                       var broadcast = new WriteOnceBlock<int> (null);
-                       var action1 = new ActionBlock<int> (i => { act1 = i == 42; evt.Signal (); });
-                       var action2 = new ActionBlock<int> (i => { act2 = i == 42; evt.Signal (); });
+                       var block = new WriteOnceBlock<int> (null);
+                       var action1 = new ActionBlock<int> (i =>
+                       {
+                               act1 = i == 42;
+                               evt.Signal ();
+                       });
+                       var action2 = new ActionBlock<int> (i =>
+                       {
+                               act2 = i == 42;
+                               evt.Signal ();
+                       });
 
-                       broadcast.LinkTo (action1);
-                       broadcast.LinkTo (action2);
+                       block.LinkTo (action1);
+                       block.LinkTo (action2);
 
-                       Assert.IsTrue (broadcast.Post (42));
+                       Assert.IsTrue (block.Post (42));
+                       Assert.IsFalse (block.Post (43));
 
-                       evt.Wait ();
+                       Assert.IsTrue (evt.Wait (100));
 
                        Assert.IsTrue (act1);
                        Assert.IsTrue (act2);
                }
 
+               [Test]
+               public void LinkAfterPostTest ()
+               {
+                       bool act = false;
+                       var evt = new ManualResetEventSlim ();
+
+                       var block = new WriteOnceBlock<int> (null);
+                       var action = new ActionBlock<int> (i =>
+                       {
+                               act = i == 42;
+                               evt.Set ();
+                       });
+
+                       Assert.IsTrue (block.Post (42));
+
+                       block.LinkTo (action);
+
+                       Assert.IsTrue (evt.Wait (100));
+
+                       Assert.IsTrue (act);
+               }
+
+               [Test]
+               public void PostponedTest ()
+               {
+                       var block = new WriteOnceBlock<int> (null);
+                       var target = new BufferBlock<int> (
+                               new DataflowBlockOptions { BoundedCapacity = 1 });
+                       block.LinkTo (target);
+
+                       Assert.IsTrue (target.Post (1));
+
+                       Assert.IsTrue (block.Post (2));
+
+                       Assert.AreEqual (1, target.Receive (TimeSpan.FromMilliseconds (100)));
+                       Assert.AreEqual (2, target.Receive (TimeSpan.FromMilliseconds (100)));
+               }
+
+               [Test]
+               public void QueuedMessageTest ()
+               {
+                       var scheduler = new TestScheduler ();
+                       var block = new WriteOnceBlock<int> (null,
+                               new DataflowBlockOptions { TaskScheduler = scheduler });
+                       var target = new BufferBlock<int> ();
+                       block.LinkTo (target);
+
+                       Assert.IsTrue (block.Post (1));
+
+                       AssertEx.Throws<TimeoutException> (
+                               () => target.Receive (TimeSpan.FromMilliseconds (100)));
+
+                       scheduler.ExecuteAll ();
+
+                       int item;
+                       Assert.IsTrue (target.TryReceive (out item));
+                       Assert.AreEqual (1, item);
+               }
+
                [Test]
                public void CloningTest ()
                {
@@ -66,16 +130,24 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
                        var evt = new CountdownEvent (2);
 
                        object source = new object ();
-                       var broadcast = new WriteOnceBlock<object> (o => new object ());
-                       var action1 = new ActionBlock<object> (i => { act1 = i; evt.Signal (); });
-                       var action2 = new ActionBlock<object> (i => { act2 = i; evt.Signal (); });
+                       var block = new WriteOnceBlock<object> (o => new object ());
+                       var action1 = new ActionBlock<object> (i =>
+                       {
+                               act1 = i;
+                               evt.Signal ();
+                       });
+                       var action2 = new ActionBlock<object> (i =>
+                       {
+                               act2 = i;
+                               evt.Signal ();
+                       });
 
-                       broadcast.LinkTo (action1);
-                       broadcast.LinkTo (action2);
+                       block.LinkTo (action1);
+                       block.LinkTo (action2);
 
-                       Assert.IsTrue (broadcast.Post (source));
+                       Assert.IsTrue (block.Post (source));
 
-                       evt.Wait ();
+                       Assert.IsTrue (evt.Wait (100));
 
                        Assert.IsNotNull (act1);
                        Assert.IsNotNull (act2);
@@ -92,28 +164,36 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
                        var evt = new CountdownEvent (2);
 
                        var broadcast = new WriteOnceBlock<int> (null);
-                       var action1 = new ActionBlock<int> (i => { act1 = i == 42; evt.Signal (); });
-                       var action2 = new ActionBlock<int> (i => { act2 = i == 42; evt.Signal (); });
+                       var action1 = new ActionBlock<int> (i =>
+                       {
+                               act1 = i == 42;
+                               evt.Signal ();
+                       });
+                       var action2 = new ActionBlock<int> (i =>
+                       {
+                               act2 = i == 42;
+                               evt.Signal ();
+                       });
 
                        broadcast.LinkTo (action1);
                        broadcast.LinkTo (action2);
 
                        Assert.IsTrue (broadcast.Post (42));
 
-                       evt.Wait ();
+                       Assert.IsTrue (evt.Wait (100));
 
                        Assert.IsTrue (act1);
                        Assert.IsTrue (act2);
 
                        Assert.IsFalse (broadcast.Post (24));
-                       Thread.Sleep (1600);
+                       Thread.Sleep (300);
 
                        Assert.IsTrue (act1);
                        Assert.IsTrue (act2);
                }
 
                [Test]
-               public void TryReceiveBehavior ()
+               public void TryReceiveBehaviorTest ()
                {
                        var block = new WriteOnceBlock<int> (null);
                        int foo;
@@ -125,9 +205,38 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
                        Assert.IsFalse (block.TryReceive (i => i == 0, out foo));
                        IList<int> bar;
                        Assert.IsTrue (((IReceivableSourceBlock<int>)block).TryReceiveAll (out bar));
-                       Assert.IsNotNull (bar);
-                       Assert.AreEqual (1, bar.Count);
-                       Assert.AreEqual (42, bar[0]);
+                       CollectionAssert.AreEqual (new[] { 42 }, bar);
+               }
+
+               [Test]
+               public void DontOfferTwiceTest ()
+               {
+                       var scheduler = new TestScheduler ();
+                       var block = new WriteOnceBlock<int> (null,
+                               new DataflowBlockOptions { TaskScheduler = scheduler });
+                       var target =
+                               new TestTargetBlock<int> { Postpone = true };
+                       block.LinkTo (target);
+
+                       Assert.IsFalse (target.HasPostponed);
+
+                       Assert.IsTrue (block.Post (1));
+
+                       scheduler.ExecuteAll ();
+
+                       Assert.IsTrue (target.HasPostponed);
+
+                       target.Postpone = false;
+
+                       int value;
+                       Assert.IsTrue (target.RetryPostponed (out value));
+                       Assert.AreEqual (1, value);
+
+                       block.LinkTo (new BufferBlock<int> ());
+
+                       scheduler.ExecuteAll ();
+
+                       Assert.AreEqual (default(int), target.DirectlyAccepted);
                }
        }
-}
+}
\ No newline at end of file