Merge pull request #409 from Alkarex/patch-1
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / Test / System.Threading.Tasks.Dataflow / ReceivingTest.cs
1 // ReceivingTest.cs
2 //  
3 // Copyright (c) 2012 Petr Onderka
4 // 
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 // 
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 // 
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22
23 using System;
24 using System.Collections.Generic;
25 using System.Threading;
26 using System.Threading.Tasks;
27 using System.Threading.Tasks.Dataflow;
28 using NUnit.Framework;
29
30 namespace MonoTests.System.Threading.Tasks.Dataflow {
31         [TestFixture]
32         public class ReceivingTest {
33                 [Test]
34                 public void PostponeTest ()
35                 {
36                         var scheduler = new TestScheduler ();
37                         var source =
38                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
39                         var target = new TestTargetBlock<int> { Postpone = true };
40                         Assert.IsNotNull (source.LinkTo (target));
41                         Assert.IsFalse (target.HasPostponed);
42
43                         Assert.IsTrue (source.Post (42));
44                         scheduler.ExecuteAll ();
45                         Assert.IsTrue (target.HasPostponed);
46
47                         int i;
48                         Assert.IsTrue (target.RetryPostponed (out i));
49                         Assert.AreEqual (42, i);
50                 }
51
52                 [Test]
53                 public void PostponeTwoTargetsTest ()
54                 {
55                         var scheduler = new TestScheduler ();
56                         var source =
57                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
58                         var target1 = new TestTargetBlock<int> { Postpone = true };
59                         var target2 = new TestTargetBlock<int> { Postpone = true };
60                         Assert.IsNotNull (source.LinkTo (target1));
61                         Assert.IsNotNull (source.LinkTo (target2));
62                         Assert.IsFalse (target1.HasPostponed);
63                         Assert.IsFalse (target2.HasPostponed);
64
65                         Assert.IsTrue (source.Post (42));
66                         scheduler.ExecuteAll ();
67                         Assert.IsTrue (target1.HasPostponed);
68                         Assert.IsTrue (target2.HasPostponed);
69
70                         int i;
71                         Assert.IsTrue (target2.RetryPostponed (out i));
72                         Assert.AreEqual (42, i);
73
74                         Assert.IsFalse (target1.RetryPostponed (out i));
75                         Assert.AreEqual (default(int), i);
76                 }
77
78                 [Test]
79                 public void DecliningTest ()
80                 {
81                         var scheduler = new TestScheduler ();
82                         var source =
83                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
84                         var target1 = new TestTargetBlock<int> { Decline = true };
85                         var target2 = new TestTargetBlock<int> ();
86                         Assert.IsNotNull (source.LinkTo (target1));
87                         Assert.IsNotNull (source.LinkTo (target2));
88                         Assert.AreEqual (default(int), target1.DirectlyAccepted);
89                         Assert.AreEqual (default(int), target2.DirectlyAccepted);
90
91                         Assert.IsTrue (source.Post (42));
92                         scheduler.ExecuteAll ();
93                         Assert.AreEqual (default(int), target1.DirectlyAccepted);
94                         Assert.AreEqual (42, target2.DirectlyAccepted);
95                 }
96
97                 [Test]
98                 public void ConditionalDecliningTest ()
99                 {
100                         var scheduler = new TestScheduler ();
101                         var source =
102                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
103                         var target = new TestTargetBlock<int> { Decline = true };
104                         Assert.IsNotNull (source.LinkTo (target));
105                         Assert.AreEqual (default(int), target.DirectlyAccepted);
106
107                         Assert.IsTrue (source.Post (42));
108                         scheduler.ExecuteAll ();
109                         Assert.AreEqual (default(int), target.DirectlyAccepted);
110
111                         target.Decline = false;
112                         Assert.IsTrue (source.Post (43));
113                         scheduler.ExecuteAll ();
114                         Assert.AreEqual (default(int), target.DirectlyAccepted);
115
116                         Assert.AreEqual (42, source.Receive (TimeSpan.FromMilliseconds (100)));
117                         scheduler.ExecuteAll ();
118                         Assert.AreEqual (43, target.DirectlyAccepted);
119                 }
120
121                 [Test]
122                 public void TryReceiveWithPredicateTest ()
123                 {
124                         var source = new BufferBlock<int> ();
125                         Assert.IsTrue (source.Post (42));
126                         Assert.IsTrue (source.Post (43));
127
128                         int item;
129                         Assert.IsFalse (source.TryReceive (i => i == 43, out item));
130                         Assert.AreEqual (default(int), item);
131
132                         Assert.AreEqual (42, source.Receive ());
133
134                         Assert.IsTrue (source.TryReceive (i => i == 43, out item));
135                         Assert.AreEqual (43, item);
136                 }
137
138                 [Test]
139                 public void ReserveTest ()
140                 {
141                         var scheduler = new TestScheduler ();
142                         var source =
143                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
144                         var target = new TestTargetBlock<int> { Postpone = true };
145                         Assert.IsNotNull (source.LinkTo (target));
146                         Assert.IsFalse (target.HasPostponed);
147
148                         Assert.IsTrue (source.Post (42));
149                         Assert.IsTrue (source.Post (43));
150                         scheduler.ExecuteAll ();
151                         Assert.IsTrue (target.HasPostponed);
152
153                         Assert.IsTrue (target.ReservePostponed ());
154                         int i;
155                         Assert.IsFalse (source.TryReceive (null, out i));
156                         Assert.AreEqual (default(int), i);
157                         IList<int> items;
158                         Assert.IsFalse (source.TryReceiveAll (out items));
159                         Assert.AreEqual (default(IList<int>), items);
160
161                         Assert.IsTrue (target.RetryPostponed (out i));
162                         Assert.AreEqual (42, i);
163
164                         Assert.IsTrue (source.TryReceive (null, out i));
165                         Assert.AreEqual (43, i);
166                 }
167
168                 [Test]
169                 public void ConsumeAfterReceiveTest ()
170                 {
171                         var scheduler = new TestScheduler ();
172                         var source =
173                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
174                         var target = new TestTargetBlock<int> { Postpone = true };
175                         Assert.IsNotNull (source.LinkTo (target));
176                         Assert.IsFalse (target.HasPostponed);
177
178                         Assert.IsTrue (source.Post (42));
179                         scheduler.ExecuteAll ();
180                         Assert.IsTrue (target.HasPostponed);
181
182                         target.Postpone = false;
183
184                         Assert.AreEqual (42, source.Receive ());
185                         Assert.IsTrue (source.Post (43));
186
187                         Assert.AreEqual (default(int), target.DirectlyAccepted);
188
189                         int i;
190                         Assert.IsFalse (target.RetryPostponed (out i));
191                         Assert.AreEqual (default(int), i);
192
193                         scheduler.ExecuteAll ();
194
195                         Assert.AreEqual (43, target.DirectlyAccepted);
196                 }
197
198                 [Test]
199                 public void FaultConsume ()
200                 {
201                         var scheduler = new TestScheduler ();
202                         var source =
203                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
204                         var target = new TestTargetBlock<int> { Postpone = true };
205                         Assert.IsNotNull (source.LinkTo (target));
206
207                         Assert.IsTrue (source.Post (42));
208                         scheduler.ExecuteAll ();
209                         Assert.IsTrue (target.HasPostponed);
210
211                         ((IDataflowBlock)source).Fault (new Exception ());
212
213                         scheduler.ExecuteAll ();
214                         Thread.Sleep (100);
215
216                         int value;
217                         Assert.IsFalse (target.RetryPostponed (out value));
218                 }
219
220                 [Test]
221                 public void FaultConsumeBroadcast ()
222                 {
223                         var scheduler = new TestScheduler ();
224                         var source = new BroadcastBlock<int> (null,
225                                 new DataflowBlockOptions { TaskScheduler = scheduler });
226                         var target = new TestTargetBlock<int> { Postpone = true };
227                         Assert.IsNotNull (source.LinkTo (target));
228
229                         Assert.IsTrue (source.Post (42));
230                         scheduler.ExecuteAll ();
231                         Assert.IsTrue (target.HasPostponed);
232
233                         ((IDataflowBlock)source).Fault (new Exception ());
234
235                         scheduler.ExecuteAll ();
236                         Thread.Sleep (100);
237
238                         Assert.IsTrue (source.Completion.IsFaulted);
239                         int value;
240                         Assert.IsTrue (target.RetryPostponed (out value));
241                         Assert.AreEqual (42, value);
242                 }
243
244                 [Test]
245                 public void FaultExecutingConsume ()
246                 {
247                         var evt = new ManualResetEventSlim ();
248                         var source = new TransformBlock<int, int> (i =>
249                         {
250                                 if (i == 2)
251                                         evt.Wait ();
252                                 return i;
253                         });
254                         var target = new TestTargetBlock<int> { Postpone = true };
255                         Assert.IsNotNull (source.LinkTo (target));
256
257                         Assert.IsTrue (source.Post (1));
258                         Assert.IsTrue (source.Post (2));
259                         Assert.IsTrue (source.Post (3));
260                         Thread.Sleep (500);
261                         Assert.IsTrue (target.HasPostponed);
262
263                         ((IDataflowBlock)source).Fault (new Exception ());
264
265                         Thread.Sleep (100);
266
267                         Assert.IsFalse (source.Completion.IsFaulted);
268                         int value;
269                         Assert.IsTrue (target.RetryPostponed (out value));
270                         Assert.AreEqual (1, value);
271
272                         evt.Set ();
273
274                         Thread.Sleep (100);
275
276                         Assert.IsTrue (source.Completion.IsFaulted);
277                         Assert.IsFalse (target.RetryPostponed (out value));
278                 }
279
280                 [Test]
281                 public void ReserveFaultConsume ()
282                 {
283                         var scheduler = new TestScheduler ();
284                         var source =
285                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
286                         var target = new TestTargetBlock<int> { Postpone = true };
287                         Assert.IsNotNull (source.LinkTo (target));
288
289                         Assert.IsTrue (source.Post (42));
290                         scheduler.ExecuteAll ();
291                         Assert.IsTrue (target.HasPostponed);
292
293                         Assert.IsTrue (target.ReservePostponed ());
294
295                         ((IDataflowBlock)source).Fault (new Exception ());
296
297                         scheduler.ExecuteAll ();
298                         Thread.Sleep (100);
299
300                         int value;
301                         Assert.IsTrue (target.RetryPostponed (out value));
302                 }
303
304                 [Test]
305                 public void ReserveFaultConsumeBroadcast ()
306                 {
307                         var scheduler = new TestScheduler ();
308                         var source = new BroadcastBlock<int> (null,
309                                 new DataflowBlockOptions { TaskScheduler = scheduler });
310                         var target = new TestTargetBlock<int> { Postpone = true };
311                         Assert.IsNotNull (source.LinkTo (target));
312
313                         Assert.IsTrue (source.Post (42));
314                         scheduler.ExecuteAll ();
315                         Assert.IsTrue (target.HasPostponed);
316
317                         Assert.IsTrue (target.ReservePostponed ());
318
319                         ((IDataflowBlock)source).Fault (new Exception ());
320
321                         scheduler.ExecuteAll ();
322                         Thread.Sleep (100);
323
324                         int value;
325                         Assert.IsTrue (target.RetryPostponed (out value));
326                 }
327
328                 [Test]
329                 public void PostAfterTimeout ()
330                 {
331                         var scheduler = new TestScheduler ();
332                         var block =
333                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
334
335                         AssertEx.Throws<TimeoutException> (
336                                 () => block.Receive (TimeSpan.FromMilliseconds (100)));
337
338                         block.Post (1);
339
340                         scheduler.ExecuteAll ();
341
342                         int item;
343                         Assert.IsTrue (block.TryReceive (out item));
344                         Assert.AreEqual (1, item);
345                 }
346
347                 [Test]
348                 public void PostAfterCancellation ()
349                 {
350                         var scheduler = new TestScheduler ();
351                         var block =
352                                 new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
353
354                         var tokenSource = new CancellationTokenSource ();
355
356                         Task.Factory.StartNew (
357                                 () =>
358                                 {
359                                         Thread.Sleep (100);
360                                         tokenSource.Cancel ();
361                                 });
362
363                         AssertEx.Throws<OperationCanceledException> (
364                                 () => block.Receive (tokenSource.Token));
365
366                         block.Post (1);
367
368                         scheduler.ExecuteAll ();
369
370                         int item;
371                         Assert.IsTrue (block.TryReceive (out item));
372                         Assert.AreEqual (1, item);
373                 }
374         }
375
376         class TestTargetBlock<T> : ITargetBlock<T> {
377                 public bool Postpone { get; set; }
378
379                 public bool Decline { get; set; }
380
381                 Tuple<ISourceBlock<T>, DataflowMessageHeader> postponed;
382
383                 public T DirectlyAccepted { get; private set; }
384
385                 public DataflowMessageStatus OfferMessage (
386                         DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
387                         bool consumeToAccept)
388                 {
389                         if (Decline)
390                                 return DataflowMessageStatus.Declined;
391
392                         if (Postpone) {
393                                 postponed = Tuple.Create (source, messageHeader);
394                                 return DataflowMessageStatus.Postponed;
395                         }
396
397                         DirectlyAccepted = messageValue;
398                         return DataflowMessageStatus.Accepted;
399                 }
400
401                 public bool HasPostponed
402                 {
403                         get { return postponed != null; }
404                 }
405
406                 public bool RetryPostponed (out T value)
407                 {
408                         bool consumed;
409                         value = postponed.Item1.ConsumeMessage (
410                                 postponed.Item2, this, out consumed);
411                         postponed = null;
412                         return consumed;
413                 }
414
415                 public bool ReservePostponed ()
416                 {
417                         return postponed.Item1.ReserveMessage (postponed.Item2, this);
418                 }
419
420                 public void Complete ()
421                 {
422                 }
423
424                 public void Fault (Exception exception)
425                 {
426                 }
427
428                 public Task Completion { get; private set; }
429         }
430 }