5 // Jérémie "garuma" Laval <jeremie.laval@gmail.com>
6 // Petr Onderka <gsvick@gmail.com>
8 // Copyright (c) 2011 Jérémie "garuma" Laval
9 // Copyright (c) 2012 Petr Onderka
11 // Permission is hereby granted, free of charge, to any person obtaining a copy
12 // of this software and associated documentation files (the "Software"), to deal
13 // in the Software without restriction, including without limitation the rights
14 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 // copies of the Software, and to permit persons to whom the Software is
16 // furnished to do so, subject to the following conditions:
18 // The above copyright notice and this permission notice shall be included in
19 // all copies or substantial portions of the Software.
21 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
30 using System.Threading;
31 using System.Threading.Tasks.Dataflow;
32 using NUnit.Framework;
34 namespace MonoTests.System.Threading.Tasks.Dataflow {
36 public class BatchBlockTest {
38 public void BasicUsageTest ()
41 var evt = new ManualResetEventSlim (false);
43 var buffer = new BatchBlock<int> (10);
44 var block = new ActionBlock<int[]> (i =>
49 buffer.LinkTo<int[]> (block);
51 for (int i = 0; i < 9; i++)
52 Assert.IsTrue (buffer.Post (i));
54 Assert.IsFalse (evt.Wait (100));
56 Assert.IsNull (array);
58 Assert.IsTrue (buffer.Post (42));
59 Assert.IsTrue (evt.Wait (1000));
61 Assert.IsNotNull (array);
62 CollectionAssert.AreEqual (new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 42 }, array);
66 public void TriggerBatchTest ()
69 var evt = new ManualResetEventSlim (false);
71 var buffer = new BatchBlock<int> (10);
72 var block = new ActionBlock<int[]> (i =>
77 buffer.LinkTo (block);
79 for (int i = 0; i < 9; i++)
80 Assert.IsTrue (buffer.Post (i));
82 buffer.TriggerBatch ();
85 Assert.IsNotNull (array);
86 Assert.IsTrue (buffer.Post (42));
89 CollectionAssert.AreEquivalent (new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8 },
94 public void TriggerBatchLateBinding ()
97 var evt = new ManualResetEventSlim (false);
99 var buffer = new BatchBlock<int> (10);
100 var block = new ActionBlock<int[]> (i =>
106 for (int i = 0; i < 9; i++)
107 Assert.IsTrue (buffer.Post (i));
109 buffer.TriggerBatch ();
110 buffer.LinkTo (block);
113 Assert.IsNotNull (array);
115 CollectionAssert.AreEquivalent (new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8 },
120 public void LateTriggerBatchKeepCountTest ()
123 var evt = new ManualResetEventSlim (false);
125 var buffer = new BatchBlock<int> (15);
126 var block = new ActionBlock<int[]> (i =>
132 for (int i = 0; i < 9; i++)
133 Assert.IsTrue (buffer.Post (i));
134 buffer.TriggerBatch ();
135 Assert.IsTrue (buffer.Post (42));
136 buffer.LinkTo (block);
140 Assert.IsNotNull (array);
141 CollectionAssert.AreEquivalent (new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8 },
146 public void TriggerBatchWhenEmpty ()
148 var scheduler = new TestScheduler ();
149 var block = new BatchBlock<int> (5,
150 new GroupingDataflowBlockOptions { TaskScheduler = scheduler });
151 block.TriggerBatch ();
153 scheduler.ExecuteAll ();
156 Assert.IsFalse (block.TryReceive (out batch));
157 Assert.IsNull (batch);
161 public void NonGreedyBatchWithBoundedCapacityTest ()
163 var scheduler = new TestScheduler ();
164 var block = new BatchBlock<int> (2,
165 new GroupingDataflowBlockOptions
166 { Greedy = false, BoundedCapacity = 2, TaskScheduler = scheduler });
168 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
170 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
171 Assert.IsNotNull (source1.LinkTo (block));
172 Assert.IsNotNull (source2.LinkTo (block));
174 Assert.IsTrue (source1.Post (11));
175 Assert.IsTrue (source2.Post (21));
177 scheduler.ExecuteAll ();
179 Assert.IsTrue (source1.Post (12));
180 Assert.IsTrue (source2.Post (22));
182 scheduler.ExecuteAll ();
185 Assert.IsTrue (source1.TryReceive (out i));
186 Assert.AreEqual (12, i);
188 Assert.IsTrue (source1.Post (13));
191 Assert.IsTrue (block.TryReceive (out batch));
192 CollectionAssert.AreEquivalent (new[] { 11, 21 }, batch);
194 scheduler.ExecuteAll ();
196 Assert.IsTrue (block.TryReceive (out batch));
197 CollectionAssert.AreEquivalent (new[] { 13, 22 }, batch);
201 public void GreedyBatchWithBoundedCapacityTest ()
203 var scheduler = new TestScheduler ();
204 var block = new BatchBlock<int> (3,
205 new GroupingDataflowBlockOptions
206 { Greedy = true, BoundedCapacity = 3, TaskScheduler = scheduler });
208 Assert.IsTrue (block.Post (1));
209 Assert.IsTrue (block.Post (2));
211 block.TriggerBatch ();
213 scheduler.ExecuteAll ();
215 Assert.IsTrue (block.Post (3));
216 Assert.IsFalse (block.Post (4));
219 Assert.IsTrue (block.TryReceive (out batch));
220 CollectionAssert.AreEqual (new[] { 1, 2 }, batch);
222 Assert.IsTrue (block.Post (5));
223 Assert.IsTrue (block.Post (6));
227 public void NonGreedyBatchWithBoundedCapacityTriggerTest ()
229 var scheduler = new TestScheduler ();
230 var block = new BatchBlock<int> (3,
231 new GroupingDataflowBlockOptions
232 { Greedy = false, BoundedCapacity = 3, TaskScheduler = scheduler });
234 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
236 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
237 Assert.IsNotNull (source1.LinkTo (block));
238 Assert.IsNotNull (source2.LinkTo (block));
240 // trigger 2 and then trigger 1 with capacity of 3
242 Assert.IsTrue (source1.Post (11));
243 Assert.IsTrue (source2.Post (21));
244 block.TriggerBatch ();
246 scheduler.ExecuteAll ();
248 Assert.IsTrue (source1.Post (12));
249 block.TriggerBatch ();
251 scheduler.ExecuteAll ();
254 Assert.IsFalse (source1.TryReceive (out i));
257 Assert.IsTrue (block.TryReceive (out batch));
258 CollectionAssert.AreEquivalent (new[] { 11, 21 }, batch);
260 Assert.IsTrue (block.TryReceive (out batch));
261 CollectionAssert.AreEquivalent (new[] { 12 }, batch);
265 public void NonGreedyBatchWithBoundedCapacityTriggerTest2 ()
267 var scheduler = new TestScheduler ();
268 var block = new BatchBlock<int> (3,
269 new GroupingDataflowBlockOptions
270 { Greedy = false, BoundedCapacity = 3, TaskScheduler = scheduler });
272 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
274 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
275 Assert.IsNotNull (source1.LinkTo (block));
276 Assert.IsNotNull (source2.LinkTo (block));
278 // trigger 2, then trigger another 2 and then trigger 2 once more
279 // while havaing capacity of 3
281 Assert.IsTrue (source1.Post (11));
282 Assert.IsTrue (source2.Post (21));
283 block.TriggerBatch ();
285 scheduler.ExecuteAll ();
287 Assert.IsTrue (source1.Post (12));
288 Assert.IsTrue (source2.Post (22));
289 block.TriggerBatch ();
291 scheduler.ExecuteAll ();
293 Assert.IsTrue (source1.Post (13));
294 Assert.IsTrue (source2.Post (23));
295 block.TriggerBatch ();
297 scheduler.ExecuteAll ();
300 Assert.IsTrue (source1.TryReceive (out i));
301 Assert.AreEqual (13, i);
302 Assert.IsTrue (source2.TryReceive (out i));
303 Assert.AreEqual (23, i);
306 Assert.IsTrue (block.TryReceive (out batch));
307 CollectionAssert.AreEquivalent (new[] { 11, 21 }, batch);
309 Assert.IsTrue (block.TryReceive (out batch));
310 CollectionAssert.AreEquivalent (new[] { 12, 22 }, batch);
312 Assert.IsFalse (block.TryReceive (out batch));
316 public void MaxNumberOfGroupsTest ()
318 var scheduler = new TestScheduler ();
320 var block = new BatchBlock<int> (2,
321 new GroupingDataflowBlockOptions
322 { MaxNumberOfGroups = 2, TaskScheduler = scheduler });
324 Assert.IsTrue (block.Post (1));
325 Assert.IsTrue (block.Post (2));
327 Assert.IsTrue (block.Post (3));
328 Assert.IsTrue (block.Post (4));
330 Assert.IsFalse (block.Post (5));
332 scheduler.ExecuteAll ();
335 Assert.IsTrue (block.TryReceive (out batch));
336 CollectionAssert.AreEqual (new[] { 1, 2 }, batch);
338 Assert.IsTrue (block.TryReceive (out batch));
339 CollectionAssert.AreEqual (new[] { 3, 4 }, batch);
341 scheduler.ExecuteAll ();
343 Assert.IsTrue (block.Completion.Wait (1000));
347 public void CompletionWithTriggerTest ()
349 var block = new BatchBlock<int> (2);
351 Assert.IsTrue (block.Post (1));
353 block.TriggerBatch ();
357 CollectionAssert.AreEqual (new[] { 1 },
358 block.Receive (TimeSpan.FromMilliseconds (2000)));
360 Assert.IsTrue (block.Completion.Wait (1000));
364 public void CompletionWithoutTriggerTest ()
366 var block = new BatchBlock<int> (2);
368 Assert.IsTrue (block.Post (1));
369 Assert.IsTrue (block.Post (2));
373 CollectionAssert.AreEqual (new[] { 1, 2 },
374 block.Receive (TimeSpan.FromMilliseconds (2000)));
376 Assert.IsTrue (block.Completion.Wait (1000));
380 public void CompleteTriggersBatchTest ()
382 var block = new BatchBlock<int> (2);
384 Assert.IsTrue (block.Post (1));
388 CollectionAssert.AreEqual (new[] { 1 },
389 block.Receive (TimeSpan.FromMilliseconds (2000)));
391 Assert.IsTrue (block.Completion.Wait (1000));
395 public void NonGreedyCompleteDoesnTriggerBatchTest ()
397 var scheduler = new TestScheduler ();
398 var block = new BatchBlock<int> (2,
399 new GroupingDataflowBlockOptions
400 { Greedy = false, TaskScheduler = scheduler });
402 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
404 Assert.IsNotNull (source.LinkTo (block));
406 Assert.IsTrue (source.Post (1));
411 Assert.IsFalse (block.TryReceive (out batch));
413 Assert.IsTrue (block.Completion.Wait (1000));
417 public void NonGreedyMaxNumberOfGroupsTest ()
419 var scheduler = new TestScheduler ();
420 var block = new BatchBlock<int> (2,
421 new GroupingDataflowBlockOptions
422 { MaxNumberOfGroups = 1, Greedy = false, TaskScheduler = scheduler });
423 ITargetBlock<int> target = block;
424 var source1 = new TestSourceBlock<int> ();
425 var source2 = new TestSourceBlock<int> ();
427 var header1 = new DataflowMessageHeader (1);
428 source1.AddMessage (header1, 11);
429 source2.AddMessage (header1, 21);
431 Assert.AreEqual (DataflowMessageStatus.Postponed,
432 target.OfferMessage (header1, 11, source1, false));
433 Assert.AreEqual (DataflowMessageStatus.Postponed,
434 target.OfferMessage (header1, 21, source2, false));
436 scheduler.ExecuteAll ();
438 Assert.IsTrue (source1.WasConsumed (header1));
439 Assert.IsTrue (source2.WasConsumed (header1));
441 var header2 = new DataflowMessageHeader (2);
442 Assert.AreEqual (DataflowMessageStatus.DecliningPermanently,
443 target.OfferMessage (header2, 21, source1, false));
446 Assert.IsTrue (block.TryReceive (out batch));
447 CollectionAssert.AreEquivalent (new[] { 11, 21 }, batch);
449 Assert.IsTrue (block.Completion.Wait (1000));