2 // BroadcastBlockTest.cs
5 // Jérémie "garuma" Laval <jeremie.laval@gmail.com>
6 // Petr Onderka <gsvick@gmail.com>
8 // Copyright (c) 2011 Jérémie "garuma" Laval
9 // Copyright (c) 2012 Petr Onderka
11 // Permission is hereby granted, free of charge, to any person obtaining a copy
12 // of this software and associated documentation files (the "Software"), to deal
13 // in the Software without restriction, including without limitation the rights
14 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 // copies of the Software, and to permit persons to whom the Software is
16 // furnished to do so, subject to the following conditions:
18 // The above copyright notice and this permission notice shall be included in
19 // all copies or substantial portions of the Software.
21 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
30 using System.Collections.Generic;
31 using System.Threading;
32 using System.Threading.Tasks.Dataflow;
33 using NUnit.Framework;
35 namespace MonoTests.System.Threading.Tasks.Dataflow {
37 public class BroadcastBlockTest {
39 public void BasicUsageTest ()
41 bool act1 = false, act2 = false;
42 var evt = new CountdownEvent (2);
44 var broadcast = new BroadcastBlock<int> (null);
45 var action1 = new ActionBlock<int> (i =>
50 var action2 = new ActionBlock<int> (i =>
56 broadcast.LinkTo (action1);
57 broadcast.LinkTo (action2);
59 Assert.IsTrue (broadcast.Post (42));
61 Assert.IsTrue (evt.Wait (100));
68 public void LinkAfterPostTest ()
71 var evt = new ManualResetEventSlim ();
73 var broadcast = new BroadcastBlock<int> (null);
74 var action = new ActionBlock<int> (i =>
80 Assert.IsTrue (broadcast.Post (42));
82 broadcast.LinkTo (action);
84 Assert.IsTrue (evt.Wait (100));
90 public void PostponedTest ()
92 var broadcast = new BroadcastBlock<int> (null);
93 var target = new BufferBlock<int> (
94 new DataflowBlockOptions { BoundedCapacity = 1 });
95 broadcast.LinkTo (target);
97 Assert.IsTrue (target.Post (1));
99 Assert.IsTrue (broadcast.Post (2));
101 Assert.AreEqual (1, target.Receive (TimeSpan.FromMilliseconds (0)));
102 Assert.AreEqual (2, target.Receive (TimeSpan.FromMilliseconds (100)));
106 public void ConsumeChangedTest ()
108 var scheduler = new TestScheduler ();
109 var broadcast = new BroadcastBlock<int> (null,
110 new DataflowBlockOptions { TaskScheduler = scheduler });
111 var target = new TestTargetBlock<int> { Postpone = true };
113 broadcast.LinkTo (target);
115 Assert.IsFalse (target.HasPostponed);
117 Assert.IsTrue (broadcast.Post (1));
119 scheduler.ExecuteAll ();
121 Assert.IsTrue (target.HasPostponed);
123 Assert.IsTrue (broadcast.Post (2));
125 scheduler.ExecuteAll ();
128 Assert.IsTrue (target.RetryPostponed (out value));
129 Assert.AreEqual (2, value);
133 public void ReserveConsumeChangedTest ()
135 var scheduler = new TestScheduler ();
136 var broadcast = new BroadcastBlock<int> (null,
137 new DataflowBlockOptions { TaskScheduler = scheduler });
138 var target = new TestTargetBlock<int> { Postpone = true };
140 broadcast.LinkTo (target);
142 Assert.IsFalse (target.HasPostponed);
144 Assert.IsTrue (broadcast.Post (1));
146 scheduler.ExecuteAll ();
148 Assert.IsTrue (target.HasPostponed);
150 Assert.IsTrue (target.ReservePostponed ());
152 Assert.IsTrue (broadcast.Post (2));
154 scheduler.ExecuteAll ();
157 Assert.IsTrue (target.RetryPostponed (out value));
158 Assert.AreEqual (1, value);
162 public void ReserveChangedTest ()
164 var scheduler = new TestScheduler ();
165 var broadcast = new BroadcastBlock<int> (null,
166 new DataflowBlockOptions { TaskScheduler = scheduler });
167 var target = new TestTargetBlock<int> { Postpone = true };
169 broadcast.LinkTo (target);
171 Assert.IsFalse (target.HasPostponed);
173 Assert.IsTrue (broadcast.Post (1));
175 scheduler.ExecuteAll ();
177 Assert.IsTrue (target.HasPostponed);
179 Assert.IsTrue(broadcast.Post(2));
181 scheduler.ExecuteAll ();
183 Assert.IsTrue (target.ReservePostponed ());
186 Assert.IsTrue (target.RetryPostponed (out value));
187 Assert.AreEqual (2, value);
191 public void QueuedMessagesTest ()
193 var scheduler = new TestScheduler ();
194 var broadcast = new BroadcastBlock<int> (null,
195 new DataflowBlockOptions { TaskScheduler = scheduler });
196 var target = new BufferBlock<int> ();
197 broadcast.LinkTo (target);
199 Assert.IsTrue (broadcast.Post (1));
200 Assert.IsTrue (broadcast.Post (2));
202 AssertEx.Throws<TimeoutException> (
203 () => target.Receive (TimeSpan.FromMilliseconds (100)));
205 scheduler.ExecuteAll ();
208 Assert.IsTrue (target.TryReceive (out item));
209 Assert.AreEqual (1, item);
210 Assert.IsTrue (target.TryReceive (out item));
211 Assert.AreEqual (2, item);
215 public void BoundedQueuedTest ()
217 var scheduler = new TestScheduler ();
218 var broadcast = new BroadcastBlock<int> (
220 new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
222 Assert.IsTrue (broadcast.Post (1));
223 Assert.IsFalse (broadcast.Post (2));
227 public void BoundedPostponedTest ()
229 var scheduler = new TestScheduler ();
230 var broadcast = new BroadcastBlock<int> (
232 new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
233 ITargetBlock<int> target = broadcast;
234 var source = new TestSourceBlock<int> ();
236 Assert.IsTrue (broadcast.Post (1));
237 var header = new DataflowMessageHeader (1);
238 source.AddMessage (header, 2);
239 Assert.AreEqual (DataflowMessageStatus.Postponed,
240 target.OfferMessage (header, 2, source, false));
241 Assert.IsFalse (source.WasConsumed (header));
243 scheduler.ExecuteAll ();
245 Assert.IsTrue (source.WasConsumed (header));
249 public void CloningTest ()
251 object act1 = null, act2 = null;
252 var evt = new CountdownEvent (2);
254 object source = new object ();
255 var broadcast = new BroadcastBlock<object> (o => new object ());
256 var action1 = new ActionBlock<object> (i =>
261 var action2 = new ActionBlock<object> (i =>
267 broadcast.LinkTo (action1);
268 broadcast.LinkTo (action2);
270 Assert.IsTrue (broadcast.Post (source));
272 Assert.IsTrue (evt.Wait (100));
274 Assert.IsNotNull (act1);
275 Assert.IsNotNull (act2);
277 Assert.IsFalse (source.Equals (act1));
278 Assert.IsFalse (source.Equals (act2));
279 Assert.IsFalse (act2.Equals (act1));
283 public void TryReceiveTest()
285 var scheduler = new TestScheduler();
286 var block = new BroadcastBlock<int>(i => i * 10, new DataflowBlockOptions { TaskScheduler = scheduler });
289 Assert.IsFalse(block.TryReceive(null, out item));
291 Assert.IsTrue(block.Post(1));
292 Assert.IsTrue(block.Post(2));
294 scheduler.ExecuteAll();
296 Assert.IsTrue(block.TryReceive(null, out item));
297 Assert.AreEqual(20, item);
298 // predicate is tested on original value, but returned is cloned
299 Assert.IsTrue(block.TryReceive(i => i < 10, out item));
300 Assert.AreEqual(20, item);
304 public void TryReceiveAllTest()
306 var scheduler = new TestScheduler();
307 var block = new BroadcastBlock<int>(null, new DataflowBlockOptions { TaskScheduler = scheduler });
308 IReceivableSourceBlock<int> source = block;
310 Assert.IsTrue(block.Post(1));
311 Assert.IsTrue(block.Post(2));
313 scheduler.ExecuteAll();
316 Assert.IsTrue(source.TryReceiveAll(out items));
318 CollectionAssert.AreEqual(new[] { 2 }, items);
322 public void DontOfferTwiceTest()
324 var scheduler = new TestScheduler ();
325 var block = new BroadcastBlock<int> (null,
326 new DataflowBlockOptions { TaskScheduler = scheduler });
328 new TestTargetBlock<int> { Postpone = true };
329 block.LinkTo (target);
331 Assert.IsFalse (target.HasPostponed);
333 Assert.IsTrue (block.Post (1));
335 scheduler.ExecuteAll();
337 Assert.IsTrue (target.HasPostponed);
339 target.Postpone = false;
342 Assert.IsTrue(target.RetryPostponed(out value));
343 Assert.AreEqual(1, value);
345 block.LinkTo(new BufferBlock<int>());
347 scheduler.ExecuteAll();
349 Assert.AreEqual(default(int), target.DirectlyAccepted);