Merge pull request #980 from StephenMcConnel/bug-18638
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / Test / System.Threading.Tasks.Dataflow / OptionsTest.cs
1 // OptionsTest.cs
2 //  
3 // Copyright (c) 2012 Petr Onderka
4 // 
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 // 
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 // 
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22
23 using System;
24 using System.Collections.Concurrent;
25 using System.Collections.Generic;
26 using System.Linq;
27 using System.Threading;
28 using System.Threading.Tasks;
29 using System.Threading.Tasks.Dataflow;
30 using NUnit.Framework;
31
32 namespace MonoTests.System.Threading.Tasks.Dataflow {
33         [TestFixture]
34         public class OptionsTest {
35                 [Test]
36                 public void NameFormatTest ()
37                 {
38                         var constant = "constant";
39                         foreach (var block in Blocks.CreateBlocksWithNameFormat (constant))
40                                 Assert.AreEqual (constant, block.ToString ());
41
42                         foreach (var block in Blocks.CreateBlocksWithNameFormat ("{0}"))
43                                 Assert.AreEqual (block.GetType ().Name, block.ToString ());
44
45                         foreach (var block in Blocks.CreateBlocksWithNameFormat ("{1}"))
46                                 Assert.AreEqual (block.Completion.Id.ToString (), block.ToString ());
47                 }
48
49                 [Test]
50                 public void CancellationTest ()
51                 {
52                         var source = new CancellationTokenSource ();
53                         var blocks = Blocks.CreateBlocksWithCancellationToken (source.Token).ToArray ();
54
55                         foreach (var block in blocks)
56                                 Assert.IsFalse (block.Completion.Wait (100));
57
58                         source.Cancel ();
59
60                         foreach (var block in blocks) {
61                                 var ae =
62                                         AssertEx.Throws<AggregateException> (() => block.Completion.Wait (100));
63                                 Assert.AreEqual (1, ae.InnerExceptions.Count);
64                                 Assert.IsInstanceOfType (typeof(TaskCanceledException), ae.InnerExceptions [0]);
65                                 Assert.IsTrue (block.Completion.IsCanceled);
66                         }
67                 }
68
69                 static IEnumerable<int[]> GetTaskIdsForExecutionsOptions (
70                         ExecutionDataflowBlockOptions options)
71                 {
72                         var blockFactories =
73                                 new Func<ConcurrentQueue<Tuple<int, int>>, ITargetBlock<int>>[]
74                                 {
75                                         q => new ActionBlock<int> (
76                                                      i => q.Enqueue (Tuple.Create (i, Task.CurrentId.Value)), options),
77                                         q => new TransformBlock<int, int> (i =>
78                                         {
79                                                 q.Enqueue (Tuple.Create (i, Task.CurrentId.Value));
80                                                 return i;
81                                         }, options),
82                                         q => new TransformManyBlock<int, int> (i =>
83                                         {
84                                                 q.Enqueue (Tuple.Create (i, Task.CurrentId.Value));
85                                                 return new[] { i };
86                                         }, options)
87                                 };
88
89                         foreach (var factory in blockFactories) {
90                                 var queue = new ConcurrentQueue<Tuple<int, int>> ();
91                                 var block = factory (queue);
92
93                                 Assert.IsEmpty (queue);
94
95                                 for (int i = 0; i < 100; i++)
96                                         block.Post (i);
97
98                                 block.Complete ();
99
100                                 var source = block as ISourceBlock<int>;
101                                 if (source != null) {
102                                         Assert.IsFalse (block.Completion.Wait (100));
103
104                                         source.LinkTo (new BufferBlock<int> ());
105                                 }
106                                 Assert.IsTrue (block.Completion.Wait (500));
107
108                                 CollectionAssert.AreEquivalent (
109                                         Enumerable.Range (0, 100), queue.Select (t => t.Item1));
110
111                                 yield return queue.Select (t => t.Item2).ToArray ();
112                         }
113                 }
114
115                 static int CalculateDegreeOfParallelism(IEnumerable<int> taskIds)
116                 {
117                         var firsts = new Dictionary<int, int> ();
118                         var lasts = new Dictionary<int, int> ();
119
120                         int i = 0;
121                         foreach (var taskId in taskIds) {
122                                 if (!firsts.ContainsKey (taskId))
123                                         firsts.Add (taskId, i);
124
125                                 lasts [taskId] = i;
126
127                                 i++;
128                         }
129
130                         int maxTime = i;
131
132                         var times =
133                                 Enumerable.Repeat (Tuple.Create<int?, int?> (null, null), maxTime).ToArray ();
134
135                         foreach (var first in firsts)
136                                 times [first.Value] = Tuple.Create<int?, int?> (
137                                         first.Key, times [first.Value].Item2);
138
139                         foreach (var last in lasts)
140                                 times [last.Value] = Tuple.Create<int?, int?> (
141                                         times [last.Value].Item1, last.Key);
142
143                         int maxDop = 0;
144                         int dop = 0;
145
146                         foreach (var time in times) {
147                                 if (time.Item1 != null)
148                                         dop++;
149
150                                 if (dop > maxDop)
151                                         maxDop = dop;
152
153                                 if (time.Item2 != null)
154                                         dop--;
155                         }
156
157                         return maxDop;
158                 }
159
160                 [Test]
161                 public void MaxDegreeOfParallelismTest()
162                 {
163                         // loop to better test for race conditions
164                         // some that showed in this test were quite rare
165                         for (int i = 0; i < 10; i++)
166                         {
167                                 var options = new ExecutionDataflowBlockOptions ();
168                                 foreach (var taskIds in GetTaskIdsForExecutionsOptions(options))
169                                         Assert.AreEqual (1, CalculateDegreeOfParallelism (taskIds));
170
171                                 options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 };
172                                 foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
173                                         Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 2);
174
175                                 options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 };
176                                 foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
177                                         Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 4);
178
179                                 options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 };
180                                 foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
181                                         Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), taskIds.Length);
182                         }
183                 }
184
185                 [Test]
186                 public void MaxMessagesPerTaskTest()
187                 {
188                         var options = new ExecutionDataflowBlockOptions ();
189                         foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
190                                 Assert.GreaterOrEqual (taskIds.Distinct ().Count (), 1);
191
192                         options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 1 };
193                         foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
194                                 Assert.AreEqual (100, taskIds.Distinct ().Count ());
195
196                         options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 2 };
197                         foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
198                                 Assert.GreaterOrEqual (taskIds.Distinct ().Count (), taskIds.Length / 2);
199
200                         options = new ExecutionDataflowBlockOptions { MaxMessagesPerTask = 4 };
201                         foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
202                                 Assert.GreaterOrEqual (taskIds.Distinct ().Count (), taskIds.Length / 4);
203                 }
204
205                 [Test]
206                 public void TaskSchedulerTest ()
207                 {
208                         var scheduler = new TestScheduler ();
209
210                         int n = 0;
211
212                         var action = new ActionBlock<int> (
213                                 i => Interlocked.Increment (ref n),
214                                 new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
215
216                         Assert.IsTrue (action.Post (1));
217
218                         Assert.AreEqual (0, Volatile.Read (ref n));
219
220                         Assert.AreEqual (1, scheduler.ExecuteAll ());
221                         Assert.AreEqual (1, Volatile.Read (ref n));
222                 }
223
224                 [Test]
225                 public void DefaultSchedulerIsDefaultTest ()
226                 {
227                         var scheduler = new TestScheduler ();
228                         var factory = new TaskFactory (scheduler);
229
230                         ActionBlock<int> action = null;
231
232                         var task = factory.StartNew (() =>
233                         {
234                                 Assert.AreEqual (scheduler, TaskScheduler.Current);
235
236                                 action = new ActionBlock<int> (
237                                         i => Assert.AreNotEqual (scheduler, TaskScheduler.Current));
238                                 Assert.IsTrue (action.Post (1));
239                                 action.Complete ();
240                         });
241
242                         Assert.AreEqual (1, scheduler.ExecuteAll ());
243
244                         Assert.IsNotNull (action);
245
246                         Assert.IsTrue (action.Completion.Wait (100));
247                         Assert.IsTrue (task.Wait (0));
248                 }
249
250                 [Test]
251                 public void MaxMessagesDirectTest ()
252                 {
253                         var scheduler = new TestScheduler ();
254                         var source =
255                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
256                         var target =
257                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
258                         Assert.IsNotNull (
259                                 source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 1 }));
260
261                         Assert.IsTrue (source.Post (42));
262                         scheduler.ExecuteAll ();
263
264                         int item;
265                         Assert.IsTrue (target.TryReceive (null, out item));
266                         Assert.AreEqual (42, item);
267
268                         Assert.IsTrue (source.Post (43));
269                         scheduler.ExecuteAll ();
270                         Assert.IsFalse (target.TryReceive (null, out item));
271                         Assert.IsTrue (source.TryReceive (null, out item));
272                         Assert.AreEqual (43, item);
273                 }
274
275                 [Test]
276                 public void MaxMessagesPostponedTest ()
277                 {
278                         var scheduler = new TestScheduler ();
279                         var source =
280                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
281                         var target = new BufferBlock<int> (
282                                 new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
283                         Assert.IsNotNull (
284                                 source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
285
286                         Assert.IsTrue (source.Post (42));
287                         Assert.IsTrue (source.Post (43));
288                         Assert.IsTrue (source.Post (44));
289                         scheduler.ExecuteAll ();
290
291                         int item;
292                         Assert.IsTrue (target.TryReceive (null, out item));
293                         Assert.AreEqual (42, item);
294                         Assert.IsFalse (target.TryReceive (null, out item));
295
296                         scheduler.ExecuteAll ();
297
298                         Assert.IsTrue (target.TryReceive (null, out item));
299                         Assert.AreEqual (43, item);
300
301                         scheduler.ExecuteAll ();
302
303                         Assert.IsFalse (target.TryReceive (null, out item));
304                         Assert.IsTrue (source.TryReceive (null, out item));
305                         Assert.AreEqual (44, item);
306                 }
307
308                 [Test]
309                 public void MaxMessagesPostponedUnconsumedTest ()
310                 {
311                         var scheduler = new TestScheduler ();
312                         var source =
313                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
314                         var target =
315                                 new BufferBlock<int> (
316                                         new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
317                         Assert.IsNotNull (
318                                 source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
319
320                         Assert.IsTrue (source.Post (42));
321                         Assert.IsTrue (source.Post (43));
322                         Assert.IsTrue (source.Post (44));
323                         Assert.IsTrue (source.Post (45));
324                         scheduler.ExecuteAll ();
325
326                         int item;
327                         Assert.IsTrue (source.TryReceive (null, out item));
328                         Assert.AreEqual (43, item);
329
330                         Assert.IsTrue (target.TryReceive (null, out item));
331                         Assert.AreEqual (42, item);
332                         Assert.IsFalse (target.TryReceive (null, out item));
333
334                         scheduler.ExecuteAll ();
335
336                         Assert.IsTrue (target.TryReceive (null, out item));
337                         Assert.AreEqual (44, item);
338
339                         scheduler.ExecuteAll ();
340
341                         Assert.IsFalse (target.TryReceive (null, out item));
342                         Assert.IsTrue (source.TryReceive (null, out item));
343                         Assert.AreEqual (45, item);
344                 }
345
346                 [Test]
347                 public void MaxMessagesBroadcastTest ()
348                 {
349                         var scheduler = new TestScheduler ();
350                         var source = new BroadcastBlock<int> (
351                                 null, new DataflowBlockOptions { TaskScheduler = scheduler });
352                         var target = new BufferBlock<int>(
353                                 new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
354                         Assert.IsNotNull (
355                                 source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 2 }));
356
357                         // should be accepted
358                         Assert.IsTrue (source.Post (42));
359                         scheduler.ExecuteAll ();
360
361                         // should be postponed, but counted into the limit
362                         Assert.IsTrue (source.Post (43));
363                         scheduler.ExecuteAll ();
364
365                         // shouldn't be even offered for now
366                         Assert.IsTrue (source.Post (44));
367                         scheduler.ExecuteAll ();
368
369                         int item;
370                         Assert.IsTrue (target.TryReceive (out item));
371                         Assert.AreEqual (42, item);
372
373                         scheduler.ExecuteAll ();
374                         Assert.IsTrue (target.TryReceive (out item));
375                         Assert.AreEqual (44, item);
376                 }
377
378                 [Test]
379                 public void MaxNumberOfGroupsWithConsumeToAcceptTest ()
380                 {
381                         ITargetBlock<int> block = new BatchBlock<int> (1,
382                                 new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
383
384                         var evt = new ManualResetEventSlim ();
385
386                         Func<Task<Tuple<DataflowMessageStatus, bool>>> startTask =
387                                 () => Task.Factory.StartNew (
388                                         () =>
389                                         {
390                                                 var sourceBlock = new TestSourceBlock<int> { ConsumeWaiter = evt.Wait };
391                                                 var header = new DataflowMessageHeader (1);
392                                                 sourceBlock.AddMessage (header, 1);
393                                                 var status = block.OfferMessage (header, 1, sourceBlock, true);
394
395                                                 return Tuple.Create (status, sourceBlock.WasConsumed (header));
396                                         });
397
398                         var task1 = startTask ();
399                         var task2 = startTask ();
400
401                         Thread.Sleep (100);
402
403                         Assert.IsFalse (task1.IsCompleted);
404                         Assert.IsFalse (task2.IsCompleted);
405
406                         evt.Set ();
407
408                         Assert.IsTrue (Task.WaitAll (new Task[] { task1, task2 }, 100));
409
410                         CollectionAssert.AreEquivalent (
411                                 new[]
412                                 {
413                                         Tuple.Create (DataflowMessageStatus.Accepted, true),
414                                         Tuple.Create (DataflowMessageStatus.DecliningPermanently, false)
415                                 },
416                                 new[] { task1.Result, task2.Result });
417                 }
418         }
419 }