1 // DataflowBlockTest.cs
4 // Jérémie "garuma" Laval <jeremie.laval@gmail.com>
5 // Petr Onderka <gsvick@gmail.com>
7 // Copyright (c) 2011 Jérémie "garuma" Laval
8 // Copyright (c) 2012 Petr Onderka
10 // Permission is hereby granted, free of charge, to any person obtaining a copy
11 // of this software and associated documentation files (the "Software"), to deal
12 // in the Software without restriction, including without limitation the rights
13 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 // copies of the Software, and to permit persons to whom the Software is
15 // furnished to do so, subject to the following conditions:
17 // The above copyright notice and this permission notice shall be included in
18 // all copies or substantial portions of the Software.
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 using System.Threading;
30 using System.Threading.Tasks;
31 using System.Threading.Tasks.Dataflow;
32 using NUnit.Framework;
34 namespace MonoTests.System.Threading.Tasks.Dataflow {
36 public class DataflowBlockTest {
38 public void TryReceiveTest ()
40 var block = new BufferBlock<int> ();
45 Assert.IsTrue (block.TryReceive (out value));
46 Assert.AreEqual (42, value);
50 public void ReceiveTest ()
52 var block = new BufferBlock<int> ();
53 Task.Factory.StartNew (() => { Thread.Sleep (300); block.Post (42); });
54 Assert.AreEqual (42, block.Receive ());
58 public void ReceiveCompletedTest ()
60 var block = new BufferBlock<int> ();
62 AssertEx.Throws<InvalidOperationException> (
63 () => block.Receive (TimeSpan.FromMilliseconds (1000)));
67 public void ReceiveTimeoutTest ()
69 var block = new BufferBlock<int> ();
70 AssertEx.Throws<TimeoutException> (
71 () => block.Receive (TimeSpan.FromMilliseconds (1000)));
75 public void ReceiveCancelledTest ()
77 var block = new BufferBlock<int> ();
78 var tokenSource = new CancellationTokenSource (200);
80 AssertEx.Throws<OperationCanceledException> (
81 () => block.Receive (tokenSource.Token));
85 public void AsyncReceiveTest ()
88 var mre = new ManualResetEventSlim (false);
90 var block = new WriteOnceBlock<int> (null);
91 block.ReceiveAsync ().ContinueWith (i =>
96 Task.Factory.StartNew (() =>
101 Assert.IsTrue (mre.Wait (1000));
103 Assert.AreEqual (42, result);
107 public void AsyncReceiveTestCanceled ()
109 var src = new CancellationTokenSource ();
111 var block = new WriteOnceBlock<int> (null);
112 var task = block.ReceiveAsync (src.Token);
113 Task.Factory.StartNew (() =>
121 AggregateException ex = null;
125 } catch (AggregateException e) {
129 Assert.IsNotNull (ex);
130 Assert.IsNotNull (ex.InnerException);
131 Assert.IsInstanceOfType (typeof(OperationCanceledException),
133 Assert.IsTrue (task.IsCompleted);
134 Assert.AreEqual (TaskStatus.Canceled, task.Status);
138 public void SendAsyncAcceptedTest ()
140 var target = new BufferBlock<int> ();
141 var task = target.SendAsync (1);
143 Assert.IsTrue (task.Wait (0));
144 Assert.IsTrue (task.Result);
148 public void SendAsyncDeclinedTest ()
150 var target = new BufferBlock<int> ();
152 var task = target.SendAsync (1);
154 Assert.IsTrue (task.Wait (0));
155 Assert.IsFalse (task.Result);
159 public void SendAsyncPostponedAcceptedTest ()
162 new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
164 Assert.IsTrue (target.Post (1));
166 var task = target.SendAsync (1);
168 Assert.IsFalse (task.Wait (100));
170 Assert.AreEqual (1, target.Receive ());
172 Assert.IsTrue (task.Wait (1000));
173 Assert.IsTrue (task.Result);
177 public void SendAsyncPostponedDeclinedTest ()
180 new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
182 Assert.IsTrue (target.Post (1));
184 var task = target.SendAsync (1);
186 Assert.IsFalse (task.Wait (100));
190 Assert.IsTrue (task.Wait (1000));
191 Assert.IsFalse (task.Result);
195 public void LinkToPredicateTest ()
197 var scheduler = new TestScheduler ();
198 var source = new BufferBlock<int> (
199 new DataflowBlockOptions { TaskScheduler = scheduler });
200 var target = new BufferBlock<int> ();
201 source.LinkTo (target, i => i % 2 == 1);
203 Assert.IsTrue (source.Post (1));
204 Assert.IsTrue (source.Post (2));
205 Assert.IsTrue (source.Post (3));
207 scheduler.ExecuteAll ();
210 Assert.IsTrue (target.TryReceive (out item));
211 Assert.AreEqual (1, item);
212 Assert.IsFalse (target.TryReceive (out item));
214 Assert.IsTrue (source.TryReceive (out item));
215 Assert.AreEqual (2, item);
217 scheduler.ExecuteAll ();
219 Assert.IsTrue (target.TryReceive (out item));
220 Assert.AreEqual (3, item);
224 public void LinkToPredicateMaxMessagesTest ()
226 var scheduler = new TestScheduler ();
227 var source = new BufferBlock<int> (
228 new DataflowBlockOptions { TaskScheduler = scheduler });
229 var target = new BufferBlock<int> ();
230 source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 1 },
233 Assert.IsTrue (source.Post (2));
234 Assert.IsTrue (source.Post (1));
235 Assert.IsTrue (source.Post (3));
237 scheduler.ExecuteAll ();
240 Assert.IsFalse (target.TryReceive (out item));
241 Assert.IsTrue (source.TryReceive (out item));
242 Assert.AreEqual (2, item);
244 scheduler.ExecuteAll ();
246 Assert.IsTrue (target.TryReceive (out item));
247 Assert.AreEqual (1, item);
249 scheduler.ExecuteAll ();
251 Assert.IsFalse (target.TryReceive (out item));
255 public void LinkToPredicatePostponed ()
257 var scheduler = new TestScheduler ();
258 var source = new BufferBlock<int> (
259 new DataflowBlockOptions { TaskScheduler = scheduler });
260 var target = new BufferBlock<int> (
261 new DataflowBlockOptions { BoundedCapacity = 1, TaskScheduler = scheduler });
262 source.LinkTo (target, i => true);
264 Assert.IsTrue (target.Post (1));
265 Assert.IsTrue (source.Post (2));
267 scheduler.ExecuteAll ();
270 Assert.IsTrue (target.TryReceive (out item));
271 Assert.AreEqual (1, item);
273 scheduler.ExecuteAll ();
275 Assert.IsTrue (target.TryReceive (out item));
276 Assert.AreEqual (2, item);
280 public void LinkToPredicateClonerTest ()
282 var scheduler = new TestScheduler ();
283 var source = new BroadcastBlock<int> (i => i * 10,
284 new DataflowBlockOptions { TaskScheduler = scheduler });
285 var target = new BufferBlock<int> ();
286 source.LinkTo (target, i => i < 10);
288 Assert.IsTrue (source.Post (1));
290 scheduler.ExecuteAll ();
293 Assert.IsTrue (target.TryReceive (out item));
294 Assert.AreEqual (10, item);
298 public void NullTargetTest ()
300 var target = DataflowBlock.NullTarget<int> ();
301 Assert.IsTrue (target.Post (1));
303 var source = new TestSourceBlock<int> ();
304 var header = new DataflowMessageHeader (1);
305 source.AddMessage (header, 2);
307 Assert.IsFalse (source.WasConsumed (header));
309 Assert.AreEqual (DataflowMessageStatus.Accepted,
310 target.OfferMessage (header, 2, source, true));
311 Assert.IsTrue (source.WasConsumed (header));
313 Assert.IsFalse (target.Completion.Wait (100));
317 Assert.IsFalse (target.Completion.Wait (100));
319 target.Fault (new Exception ());
321 Assert.IsFalse (target.Completion.Wait (100));