3 // Copyright (c) 2012 Petr Onderka
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 using System.Collections.Concurrent;
25 using System.Collections.Generic;
27 using System.Threading;
28 using System.Threading.Tasks;
29 using System.Threading.Tasks.Dataflow;
30 using NUnit.Framework;
32 namespace MonoTests.System.Threading.Tasks.Dataflow {
34 public class OptionsTest {
36 public void NameFormatTest ()
38 var constant = "constant";
39 foreach (var block in Blocks.CreateBlocksWithNameFormat (constant))
40 Assert.AreEqual (constant, block.ToString ());
42 foreach (var block in Blocks.CreateBlocksWithNameFormat ("{0}"))
43 Assert.AreEqual (block.GetType ().Name, block.ToString ());
45 foreach (var block in Blocks.CreateBlocksWithNameFormat ("{1}"))
46 Assert.AreEqual (block.Completion.Id.ToString (), block.ToString ());
50 public void CancellationTest ()
52 var source = new CancellationTokenSource ();
53 var blocks = Blocks.CreateBlocksWithCancellationToken (source.Token).ToArray ();
55 foreach (var block in blocks)
56 Assert.IsFalse (block.Completion.Wait (100));
60 foreach (var block in blocks) {
62 AssertEx.Throws<AggregateException> (() => block.Completion.Wait (100));
63 Assert.AreEqual (1, ae.InnerExceptions.Count);
64 Assert.IsInstanceOfType (typeof(TaskCanceledException), ae.InnerExceptions [0]);
65 Assert.IsTrue (block.Completion.IsCanceled);
69 static IEnumerable<int[]> GetTaskIdsForExecutionsOptions (
70 ExecutionDataflowBlockOptions options)
73 new Func<ConcurrentQueue<Tuple<int, int>>, ITargetBlock<int>>[]
75 q => new ActionBlock<int> (
76 i => q.Enqueue (Tuple.Create (i, Task.CurrentId.Value)), options),
77 q => new TransformBlock<int, int> (i =>
79 q.Enqueue (Tuple.Create (i, Task.CurrentId.Value));
82 q => new TransformManyBlock<int, int> (i =>
84 q.Enqueue (Tuple.Create (i, Task.CurrentId.Value));
89 foreach (var factory in blockFactories) {
90 var queue = new ConcurrentQueue<Tuple<int, int>> ();
91 var block = factory (queue);
93 Assert.IsEmpty (queue);
95 for (int i = 0; i < 100; i++)
100 var source = block as ISourceBlock<int>;
101 if (source != null) {
102 Assert.IsFalse (block.Completion.Wait (100));
104 source.LinkTo (new BufferBlock<int> ());
106 Assert.IsTrue (block.Completion.Wait (500));
108 CollectionAssert.AreEquivalent (
109 Enumerable.Range (0, 100), queue.Select (t => t.Item1));
111 yield return queue.Select (t => t.Item2).ToArray ();
115 static int CalculateDegreeOfParallelism(IEnumerable<int> taskIds)
117 var firsts = new Dictionary<int, int> ();
118 var lasts = new Dictionary<int, int> ();
121 foreach (var taskId in taskIds) {
122 if (!firsts.ContainsKey (taskId))
123 firsts.Add (taskId, i);
133 Enumerable.Repeat (Tuple.Create<int?, int?> (null, null), maxTime).ToArray ();
135 foreach (var first in firsts)
136 times [first.Value] = Tuple.Create<int?, int?> (
137 first.Key, times [first.Value].Item2);
139 foreach (var last in lasts)
140 times [last.Value] = Tuple.Create<int?, int?> (
141 times [last.Value].Item1, last.Key);
146 foreach (var time in times) {
147 if (time.Item1 != null)
153 if (time.Item2 != null)
161 public void MaxDegreeOfParallelismTest()
163 // loop to better test for race conditions
164 // some that showed in this test were quite rare
165 for (int i = 0; i < 10; i++)
167 var options = new ExecutionDataflowBlockOptions ();
168 foreach (var taskIds in GetTaskIdsForExecutionsOptions(options))
169 Assert.AreEqual (1, CalculateDegreeOfParallelism (taskIds));
171 options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 };
172 foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
173 Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 2);
175 options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 };
176 foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
177 Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 4);
179 options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 };
180 foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
181 Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), taskIds.Length);
186 public void MaxMessagesPerTaskTest()
188 var options = new ExecutionDataflowBlockOptions ();
189 foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
190 Assert.GreaterOrEqual (taskIds.Distinct ().Count (), 1);
192 options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 1 };
193 foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
194 Assert.AreEqual (100, taskIds.Distinct ().Count ());
196 options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 2 };
197 foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
198 Assert.GreaterOrEqual (taskIds.Distinct ().Count (), taskIds.Length / 2);
200 options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 4 };
201 foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
202 Assert.GreaterOrEqual (taskIds.Distinct ().Count (), taskIds.Length / 4);
206 public void TaskSchedulerTest ()
208 var scheduler = new TestScheduler ();
212 var action = new ActionBlock<int> (
213 i => Interlocked.Increment (ref n),
214 new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
216 Assert.IsTrue (action.Post (1));
218 Assert.AreEqual (0, Thread.VolatileRead (ref n));
220 Assert.AreEqual (1, scheduler.ExecuteAll ());
221 Assert.AreEqual (1, Thread.VolatileRead (ref n));
225 public void DefaultSchedulerIsDefaultTest ()
227 var scheduler = new TestScheduler ();
228 var factory = new TaskFactory (scheduler);
230 ActionBlock<int> action = null;
232 var task = factory.StartNew (() =>
234 Assert.AreEqual (scheduler, TaskScheduler.Current);
236 action = new ActionBlock<int> (
237 i => Assert.AreNotEqual (scheduler, TaskScheduler.Current));
238 Assert.IsTrue (action.Post (1));
242 Assert.AreEqual (1, scheduler.ExecuteAll ());
244 Assert.IsNotNull (action);
246 Assert.IsTrue (action.Completion.Wait (100));
247 Assert.IsTrue (task.Wait (0));
251 public void MaxMessagesDirectTest ()
253 var scheduler = new TestScheduler ();
255 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
257 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
259 source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 1 }));
261 Assert.IsTrue (source.Post (42));
262 scheduler.ExecuteAll ();
265 Assert.IsTrue (target.TryReceive (null, out item));
266 Assert.AreEqual (42, item);
268 Assert.IsTrue (source.Post (43));
269 scheduler.ExecuteAll ();
270 Assert.IsFalse (target.TryReceive (null, out item));
271 Assert.IsTrue (source.TryReceive (null, out item));
272 Assert.AreEqual (43, item);
276 public void MaxMessagesPostponedTest ()
278 var scheduler = new TestScheduler ();
280 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
281 var target = new BufferBlock<int> (
282 new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
284 source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
286 Assert.IsTrue (source.Post (42));
287 Assert.IsTrue (source.Post (43));
288 Assert.IsTrue (source.Post (44));
289 scheduler.ExecuteAll ();
292 Assert.IsTrue (target.TryReceive (null, out item));
293 Assert.AreEqual (42, item);
294 Assert.IsFalse (target.TryReceive (null, out item));
296 scheduler.ExecuteAll ();
298 Assert.IsTrue (target.TryReceive (null, out item));
299 Assert.AreEqual (43, item);
301 scheduler.ExecuteAll ();
303 Assert.IsFalse (target.TryReceive (null, out item));
304 Assert.IsTrue (source.TryReceive (null, out item));
305 Assert.AreEqual (44, item);
309 public void MaxMessagesPostponedUnconsumedTest ()
311 var scheduler = new TestScheduler ();
313 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
315 new BufferBlock<int> (
316 new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
318 source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
320 Assert.IsTrue (source.Post (42));
321 Assert.IsTrue (source.Post (43));
322 Assert.IsTrue (source.Post (44));
323 Assert.IsTrue (source.Post (45));
324 scheduler.ExecuteAll ();
327 Assert.IsTrue (source.TryReceive (null, out item));
328 Assert.AreEqual (43, item);
330 Assert.IsTrue (target.TryReceive (null, out item));
331 Assert.AreEqual (42, item);
332 Assert.IsFalse (target.TryReceive (null, out item));
334 scheduler.ExecuteAll ();
336 Assert.IsTrue (target.TryReceive (null, out item));
337 Assert.AreEqual (44, item);
339 scheduler.ExecuteAll ();
341 Assert.IsFalse (target.TryReceive (null, out item));
342 Assert.IsTrue (source.TryReceive (null, out item));
343 Assert.AreEqual (45, item);
347 public void MaxMessagesBroadcastTest ()
349 var scheduler = new TestScheduler ();
350 var source = new BroadcastBlock<int> (
351 null, new DataflowBlockOptions { TaskScheduler = scheduler });
352 var target = new BufferBlock<int>(
353 new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
355 source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
357 // should be accepted
358 Assert.IsTrue (source.Post (42));
359 scheduler.ExecuteAll ();
361 // should be postponed, but counted into the limit
362 Assert.IsTrue (source.Post (43));
363 scheduler.ExecuteAll ();
365 // shouldn't be even offered for now
366 Assert.IsTrue (source.Post (44));
367 scheduler.ExecuteAll ();
370 Assert.IsTrue (target.TryReceive (out item));
371 Assert.AreEqual (42, item);
373 scheduler.ExecuteAll ();
374 Assert.IsTrue (target.TryReceive (out item));
375 Assert.AreEqual (44, item);
379 public void MaxNumberOfGroupsWithConsumeToAcceptTest ()
381 ITargetBlock<int> block = new BatchBlock<int> (1,
382 new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
384 var evt = new ManualResetEventSlim ();
386 Func<Task<Tuple<DataflowMessageStatus, bool>>> startTask =
387 () => Task.Factory.StartNew (
390 var sourceBlock = new TestSourceBlock<int> { ConsumeWaiter = evt.Wait };
391 var header = new DataflowMessageHeader (1);
392 sourceBlock.AddMessage (header, 1);
393 var status = block.OfferMessage (header, 1, sourceBlock, true);
395 return Tuple.Create (status, sourceBlock.WasConsumed (header));
398 var task1 = startTask ();
399 var task2 = startTask ();
403 Assert.IsFalse (task1.IsCompleted);
404 Assert.IsFalse (task2.IsCompleted);
408 Assert.IsTrue (Task.WaitAll (new Task[] { task1, task2 }, 100));
410 CollectionAssert.AreEquivalent (
413 Tuple.Create (DataflowMessageStatus.Accepted, true),
414 Tuple.Create (DataflowMessageStatus.DecliningPermanently, false)
416 new[] { task1.Result, task2.Result });