Add NoData as valid return state for Statements with params
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / Test / System.Threading.Tasks.Dataflow / ReceivingTest.cs
1 // ReceivingTest.cs
2 //  
3 // Copyright (c) 2012 Petr Onderka
4 // 
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 // 
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 // 
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22
23 using System;
24 using System.Collections.Generic;
25 using System.Threading;
26 using System.Threading.Tasks;
27 using System.Threading.Tasks.Dataflow;
28 using NUnit.Framework;
29
30 namespace MonoTests.System.Threading.Tasks.Dataflow {
31         [TestFixture]
32         public class ReceivingTest {
33                 [Test]
34                 public void PostponeTest ()
35                 {
36                         var scheduler = new TestScheduler ();
37                         var source =
38                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
39                         var target = new TestTargetBlock<int> { Postpone = true };
40                         Assert.IsNotNull (source.LinkTo (target));
41                         Assert.IsFalse (target.HasPostponed);
42
43                         Assert.IsTrue (source.Post (42));
44                         scheduler.ExecuteAll ();
45                         Assert.IsTrue (target.HasPostponed);
46
47                         int i;
48                         Assert.IsTrue (target.RetryPostponed (out i));
49                         Assert.AreEqual (42, i);
50                 }
51
52                 [Test]
53                 public void PostponeTwoTargetsTest ()
54                 {
55                         var scheduler = new TestScheduler ();
56                         var source =
57                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
58                         var target1 = new TestTargetBlock<int> { Postpone = true };
59                         var target2 = new TestTargetBlock<int> { Postpone = true };
60                         Assert.IsNotNull (source.LinkTo (target1));
61                         Assert.IsNotNull (source.LinkTo (target2));
62                         Assert.IsFalse (target1.HasPostponed);
63                         Assert.IsFalse (target2.HasPostponed);
64
65                         Assert.IsTrue (source.Post (42));
66                         scheduler.ExecuteAll ();
67                         Assert.IsTrue (target1.HasPostponed);
68                         Assert.IsTrue (target2.HasPostponed);
69
70                         int i;
71                         Assert.IsTrue (target2.RetryPostponed (out i));
72                         Assert.AreEqual (42, i);
73
74                         Assert.IsFalse (target1.RetryPostponed (out i));
75                         Assert.AreEqual (default(int), i);
76                 }
77
78                 [Test]
79                 public void DecliningTest ()
80                 {
81                         var scheduler = new TestScheduler ();
82                         var source =
83                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
84                         var target1 = new TestTargetBlock<int> { Decline = true };
85                         var target2 = new TestTargetBlock<int> ();
86                         Assert.IsNotNull (source.LinkTo (target1));
87                         Assert.IsNotNull (source.LinkTo (target2));
88                         Assert.AreEqual (default(int), target1.DirectlyAccepted);
89                         Assert.AreEqual (default(int), target2.DirectlyAccepted);
90
91                         Assert.IsTrue (source.Post (42));
92                         scheduler.ExecuteAll ();
93                         Assert.AreEqual (default(int), target1.DirectlyAccepted);
94                         Assert.AreEqual (42, target2.DirectlyAccepted);
95                 }
96
97                 [Test]
98                 public void ConditionalDecliningTest ()
99                 {
100                         var scheduler = new TestScheduler ();
101                         var source =
102                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
103                         var target = new TestTargetBlock<int> { Decline = true };
104                         Assert.IsNotNull (source.LinkTo (target));
105                         Assert.AreEqual (default(int), target.DirectlyAccepted);
106
107                         Assert.IsTrue (source.Post (42));
108                         scheduler.ExecuteAll ();
109                         Assert.AreEqual (default(int), target.DirectlyAccepted);
110
111                         target.Decline = false;
112                         Assert.IsTrue (source.Post (43));
113                         scheduler.ExecuteAll ();
114                         Assert.AreEqual (default(int), target.DirectlyAccepted);
115
116                         Assert.AreEqual (42, source.Receive (TimeSpan.FromMilliseconds (100)));
117                         scheduler.ExecuteAll ();
118                         Assert.AreEqual (43, target.DirectlyAccepted);
119                 }
120
121                 [Test]
122                 public void TryReceiveWithPredicateTest ()
123                 {
124                         var source = new BufferBlock<int> ();
125                         Assert.IsTrue (source.Post (42));
126                         Assert.IsTrue (source.Post (43));
127
128                         int item;
129                         Assert.IsFalse (source.TryReceive (i => i == 43, out item));
130                         Assert.AreEqual (default(int), item);
131
132                         Assert.AreEqual (42, source.Receive ());
133
134                         Assert.IsTrue (source.TryReceive (i => i == 43, out item));
135                         Assert.AreEqual (43, item);
136                 }
137
138                 [Test]
139                 public void ReserveTest ()
140                 {
141                         var scheduler = new TestScheduler ();
142                         var source =
143                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
144                         var target = new TestTargetBlock<int> { Postpone = true };
145                         Assert.IsNotNull (source.LinkTo (target));
146                         Assert.IsFalse (target.HasPostponed);
147
148                         Assert.IsTrue (source.Post (42));
149                         Assert.IsTrue (source.Post (43));
150                         scheduler.ExecuteAll ();
151                         Assert.IsTrue (target.HasPostponed);
152
153                         Assert.IsTrue (target.ReservePostponed ());
154                         int i;
155                         Assert.IsFalse (source.TryReceive (null, out i));
156                         Assert.AreEqual (default(int), i);
157                         IList<int> items;
158                         Assert.IsFalse (source.TryReceiveAll (out items));
159                         Assert.AreEqual (default(IList<int>), items);
160
161                         Assert.IsTrue (target.RetryPostponed (out i));
162                         Assert.AreEqual (42, i);
163
164                         Assert.IsTrue (source.TryReceive (null, out i));
165                         Assert.AreEqual (43, i);
166                 }
167
168                 [Test]
169                 public void ConsumeAfterReceiveTest ()
170                 {
171                         var scheduler = new TestScheduler ();
172                         var source =
173                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
174                         var target = new TestTargetBlock<int> { Postpone = true };
175                         Assert.IsNotNull (source.LinkTo (target));
176                         Assert.IsFalse (target.HasPostponed);
177
178                         Assert.IsTrue (source.Post (42));
179                         scheduler.ExecuteAll ();
180                         Assert.IsTrue (target.HasPostponed);
181
182                         target.Postpone = false;
183
184                         Assert.AreEqual (42, source.Receive ());
185                         Assert.IsTrue (source.Post (43));
186
187                         Assert.AreEqual (default(int), target.DirectlyAccepted);
188
189                         int i;
190                         Assert.IsFalse (target.RetryPostponed (out i));
191                         Assert.AreEqual (default(int), i);
192
193                         scheduler.ExecuteAll ();
194
195                         Assert.AreEqual (43, target.DirectlyAccepted);
196                 }
197
198                 [Test]
199                 public void FaultConsume ()
200                 {
201                         var scheduler = new TestScheduler ();
202                         var source =
203                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
204                         var target = new TestTargetBlock<int> { Postpone = true };
205                         Assert.IsNotNull (source.LinkTo (target));
206
207                         Assert.IsTrue (source.Post (42));
208                         scheduler.ExecuteAll ();
209                         Assert.IsTrue (target.HasPostponed);
210
211                         ((IDataflowBlock)source).Fault (new Exception ());
212
213                         scheduler.ExecuteAll ();
214                         Thread.Sleep (100);
215
216                         int value;
217                         Assert.IsFalse (target.RetryPostponed (out value));
218                 }
219
220                 [Test]
221                 public void FaultConsumeBroadcast ()
222                 {
223                         var scheduler = new TestScheduler ();
224                         var source = new BroadcastBlock<int> (null,
225                                 new DataflowBlockOptions { TaskScheduler = scheduler });
226                         var target = new TestTargetBlock<int> { Postpone = true };
227                         Assert.IsNotNull (source.LinkTo (target));
228
229                         Assert.IsTrue (source.Post (42));
230                         scheduler.ExecuteAll ();
231                         Assert.IsTrue (target.HasPostponed);
232
233                         var exception = new Exception ();
234                         ((IDataflowBlock)source).Fault (exception);
235
236                         scheduler.ExecuteAll ();
237
238                         try {
239                                 source.Completion.Wait (1000);
240                                 Assert.Fail ("Task must be faulted");
241                         } catch (AggregateException ex) {
242                                 Assert.AreEqual (exception, ex.InnerException, "#9");
243                         }
244
245                         Assert.IsTrue (source.Completion.IsFaulted);
246                         int value;
247                         Assert.IsTrue (target.RetryPostponed (out value));
248                         Assert.AreEqual (42, value);
249                 }
250
251                 [Test]
252                 public void FaultExecutingConsume ()
253                 {
254                         var evt = new ManualResetEventSlim ();
255                         var source = new TransformBlock<int, int> (i =>
256                         {
257                                 if (i == 2)
258                                         evt.Wait ();
259                                 return i;
260                         });
261                         var target = new TestTargetBlock<int> { Postpone = true };
262                         Assert.IsNotNull (source.LinkTo (target));
263
264                         Assert.IsTrue (source.Post (1), "#1");
265                         Assert.IsTrue (source.Post (2), "#2");
266                         Assert.IsTrue (source.Post (3), "#3");
267                         target.PostponedEvent.Wait (1000);
268                         Assert.IsTrue (target.HasPostponed, "#4");
269
270                         var exception = new Exception ();
271                         ((IDataflowBlock)source).Fault (exception);
272
273                         source.Completion.Wait (1000);
274
275                         Assert.IsFalse (source.Completion.IsFaulted, "#5");
276                         int value;
277                         Assert.IsTrue (target.RetryPostponed (out value), "#6");
278                         Assert.AreEqual (1, value, "#7");
279
280                         evt.Set ();
281
282                         try {
283                                 source.Completion.Wait (1000);
284                                 Assert.Fail ("Task must be faulted");
285                         } catch (AggregateException ex) {
286                                 Assert.AreEqual (exception, ex.InnerException, "#9");
287                         }
288
289                         Assert.IsTrue (source.Completion.IsFaulted, "#10");
290                         Assert.IsFalse (target.RetryPostponed (out value), "#11");
291                 }
292
293                 [Test]
294                 public void ReserveFaultConsume ()
295                 {
296                         var scheduler = new TestScheduler ();
297                         var source =
298                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
299                         var target = new TestTargetBlock<int> { Postpone = true };
300                         Assert.IsNotNull (source.LinkTo (target));
301
302                         Assert.IsTrue (source.Post (42));
303                         scheduler.ExecuteAll ();
304                         Assert.IsTrue (target.HasPostponed);
305
306                         Assert.IsTrue (target.ReservePostponed ());
307
308                         ((IDataflowBlock)source).Fault (new Exception ());
309
310                         scheduler.ExecuteAll ();
311                         Thread.Sleep (100);
312
313                         int value;
314                         Assert.IsTrue (target.RetryPostponed (out value));
315                 }
316
317                 [Test]
318                 public void ReserveFaultConsumeBroadcast ()
319                 {
320                         var scheduler = new TestScheduler ();
321                         var source = new BroadcastBlock<int> (null,
322                                 new DataflowBlockOptions { TaskScheduler = scheduler });
323                         var target = new TestTargetBlock<int> { Postpone = true };
324                         Assert.IsNotNull (source.LinkTo (target));
325
326                         Assert.IsTrue (source.Post (42));
327                         scheduler.ExecuteAll ();
328                         Assert.IsTrue (target.HasPostponed);
329
330                         Assert.IsTrue (target.ReservePostponed ());
331
332                         ((IDataflowBlock)source).Fault (new Exception ());
333
334                         scheduler.ExecuteAll ();
335                         Thread.Sleep (100);
336
337                         int value;
338                         Assert.IsTrue (target.RetryPostponed (out value));
339                 }
340
341                 [Test]
342                 public void PostAfterTimeout ()
343                 {
344                         var scheduler = new TestScheduler ();
345                         var block =
346                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
347
348                         AssertEx.Throws<TimeoutException> (
349                                 () => block.Receive (TimeSpan.FromMilliseconds (100)));
350
351                         block.Post (1);
352
353                         scheduler.ExecuteAll ();
354
355                         int item;
356                         Assert.IsTrue (block.TryReceive (out item));
357                         Assert.AreEqual (1, item);
358                 }
359
360                 [Test]
361                 public void PostAfterCancellation ()
362                 {
363                         var scheduler = new TestScheduler ();
364                         var block =
365                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
366
367                         var tokenSource = new CancellationTokenSource ();
368
369                         Task.Factory.StartNew (
370                                 () =>
371                                 {
372                                         Thread.Sleep (100);
373                                         tokenSource.Cancel ();
374                                 });
375
376                         AssertEx.Throws<OperationCanceledException> (
377                                 () => block.Receive (tokenSource.Token));
378
379                         block.Post (1);
380
381                         scheduler.ExecuteAll ();
382
383                         int item;
384                         Assert.IsTrue (block.TryReceive (out item));
385                         Assert.AreEqual (1, item);
386                 }
387         }
388
389         class TestTargetBlock<T> : ITargetBlock<T> {
390                 public bool Postpone { get; set; }
391
392                 public bool Decline { get; set; }
393
394                 Tuple<ISourceBlock<T>, DataflowMessageHeader> postponed;
395
396                 public T DirectlyAccepted { get; private set; }
397
398                 public ManualResetEventSlim PostponedEvent = new ManualResetEventSlim ();
399
400                 public DataflowMessageStatus OfferMessage (
401                         DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
402                         bool consumeToAccept)
403                 {
404                         if (Decline)
405                                 return DataflowMessageStatus.Declined;
406
407                         if (Postpone) {
408                                 postponed = Tuple.Create (source, messageHeader);
409                                 PostponedEvent.Set ();
410                                 return DataflowMessageStatus.Postponed;
411                         }
412
413                         DirectlyAccepted = messageValue;
414                         return DataflowMessageStatus.Accepted;
415                 }
416
417                 public bool HasPostponed
418                 {
419                         get { return postponed != null; }
420                 }
421
422                 public bool RetryPostponed (out T value)
423                 {
424                         bool consumed;
425                         value = postponed.Item1.ConsumeMessage (
426                                 postponed.Item2, this, out consumed);
427                         postponed = null;
428                         return consumed;
429                 }
430
431                 public bool ReservePostponed ()
432                 {
433                         return postponed.Item1.ReserveMessage (postponed.Item2, this);
434                 }
435
436                 public void Complete ()
437                 {
438                 }
439
440                 public void Fault (Exception exception)
441                 {
442                 }
443
444                 public Task Completion { get; private set; }
445         }
446 }