column name and ordinal fix...tested on 10.1
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / Test / System.Threading.Tasks.Dataflow / DataflowBlockTest.cs
1 // DataflowBlockTest.cs
2 //  
3 // Author:
4 //       Jérémie "garuma" Laval <jeremie.laval@gmail.com>
5 //       Petr Onderka <gsvick@gmail.com>
6 // 
7 // Copyright (c) 2011 Jérémie "garuma" Laval
8 // Copyright (c) 2012 Petr Onderka
9 // 
10 // Permission is hereby granted, free of charge, to any person obtaining a copy
11 // of this software and associated documentation files (the "Software"), to deal
12 // in the Software without restriction, including without limitation the rights
13 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 // copies of the Software, and to permit persons to whom the Software is
15 // furnished to do so, subject to the following conditions:
16 // 
17 // The above copyright notice and this permission notice shall be included in
18 // all copies or substantial portions of the Software.
19 // 
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 // THE SOFTWARE.
27
28 using System;
29 using System.Threading;
30 using System.Threading.Tasks;
31 using System.Threading.Tasks.Dataflow;
32 using NUnit.Framework;
33
34 namespace MonoTests.System.Threading.Tasks.Dataflow {
35         [TestFixture]
36         public class DataflowBlockTest {
37                 [Test]
38                 public void TryReceiveTest ()
39                 {
40                         var block = new BufferBlock<int> ();
41                         int value = -1;
42
43                         block.Post (42);
44                         Thread.Sleep (500);
45                         Assert.IsTrue (block.TryReceive (out value));
46                         Assert.AreEqual (42, value);
47                 }
48
49                 [Test]
50                 public void ReceiveTest ()
51                 {
52                         var block = new BufferBlock<int> ();
53                         Task.Factory.StartNew (() => { Thread.Sleep (300); block.Post (42); });
54                         Assert.AreEqual (42, block.Receive ());
55                 }
56
57                 [Test]
58                 public void ReceiveCompletedTest ()
59                 {
60                         var block = new BufferBlock<int> ();
61                         block.Complete ();
62                         AssertEx.Throws<InvalidOperationException> (
63                                 () => block.Receive (TimeSpan.FromMilliseconds (1000)));
64                 }
65
66                 [Test]
67                 public void ReceiveTimeoutTest ()
68                 {
69                         var block = new BufferBlock<int> ();
70                         AssertEx.Throws<TimeoutException> (
71                                 () => block.Receive (TimeSpan.FromMilliseconds (1000)));
72                 }
73
74                 [Test]
75                 public void ReceiveCancelledTest ()
76                 {
77                         var block = new BufferBlock<int> ();
78                         var tokenSource = new CancellationTokenSource (200);
79
80                         AssertEx.Throws<OperationCanceledException> (
81                                 () => block.Receive (tokenSource.Token));
82                 }
83
84                 [Test]
85                 public void AsyncReceiveTest ()
86                 {
87                         int result = -1;
88                         var mre = new ManualResetEventSlim (false);
89
90                         var block = new WriteOnceBlock<int> (null);
91                         block.ReceiveAsync ().ContinueWith (i =>
92                         {
93                                 result = i.Result;
94                                 mre.Set ();
95                         });
96                         Task.Factory.StartNew (() =>
97                         {
98                                 Thread.Sleep (100);
99                                 block.Post (42);
100                         });
101                         Assert.IsTrue (mre.Wait (1000));
102
103                         Assert.AreEqual (42, result);
104                 }
105
106                 [Test]
107                 public void AsyncReceiveTestCanceled ()
108                 {
109                         var src = new CancellationTokenSource ();
110
111                         var block = new WriteOnceBlock<int> (null);
112                         var task = block.ReceiveAsync (src.Token);
113                         Task.Factory.StartNew (() =>
114                         {
115                                 Thread.Sleep (800);
116                                 block.Post (42);
117                         });
118                         Thread.Sleep (50);
119                         src.Cancel ();
120
121                         AggregateException ex = null;
122
123                         try {
124                                 task.Wait ();
125                         } catch (AggregateException e) {
126                                 ex = e;
127                         }
128
129                         Assert.IsNotNull (ex);
130                         Assert.IsNotNull (ex.InnerException);
131                         Assert.IsInstanceOfType (typeof(OperationCanceledException),
132                                 ex.InnerException);
133                         Assert.IsTrue (task.IsCompleted);
134                         Assert.AreEqual (TaskStatus.Canceled, task.Status);
135                 }
136
137                 [Test]
138                 public void SendAsyncAcceptedTest ()
139                 {
140                         var target = new BufferBlock<int> ();
141                         var task = target.SendAsync (1);
142
143                         Assert.IsTrue (task.Wait (0));
144                         Assert.IsTrue (task.Result);
145                 }
146
147                 [Test]
148                 public void SendAsyncDeclinedTest ()
149                 {
150                         var target = new BufferBlock<int> ();
151                         target.Complete ();
152                         var task = target.SendAsync (1);
153
154                         Assert.IsTrue (task.Wait (0));
155                         Assert.IsFalse (task.Result);
156                 }
157
158                 [Test]
159                 public void SendAsyncPostponedAcceptedTest ()
160                 {
161                         var target =
162                                 new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
163
164                         Assert.IsTrue (target.Post (1));
165
166                         var task = target.SendAsync (1);
167
168                         Assert.IsFalse (task.Wait (100));
169
170                         Assert.AreEqual (1, target.Receive ());
171
172                         Assert.IsTrue (task.Wait (1000));
173                         Assert.IsTrue (task.Result);
174                 }
175
176                 [Test]
177                 public void SendAsyncPostponedDeclinedTest ()
178                 {
179                         var target =
180                                 new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
181
182                         Assert.IsTrue (target.Post (1));
183
184                         var task = target.SendAsync (1);
185
186                         Assert.IsFalse (task.Wait (100));
187
188                         target.Complete ();
189
190                         Assert.IsTrue (task.Wait (1000));
191                         Assert.IsFalse (task.Result);
192                 }
193
194                 [Test]
195                 public void LinkToPredicateTest ()
196                 {
197                         var scheduler = new TestScheduler ();
198                         var source = new BufferBlock<int> (
199                                 new DataflowBlockOptions { TaskScheduler = scheduler });
200                         var target = new BufferBlock<int> ();
201                         source.LinkTo (target, i => i % 2 == 1);
202
203                         Assert.IsTrue (source.Post (1));
204                         Assert.IsTrue (source.Post (2));
205                         Assert.IsTrue (source.Post (3));
206
207                         scheduler.ExecuteAll ();
208
209                         int item;
210                         Assert.IsTrue (target.TryReceive (out item));
211                         Assert.AreEqual (1, item);
212                         Assert.IsFalse (target.TryReceive (out item));
213
214                         Assert.IsTrue (source.TryReceive (out item));
215                         Assert.AreEqual (2, item);
216
217                         scheduler.ExecuteAll ();
218
219                         Assert.IsTrue (target.TryReceive (out item));
220                         Assert.AreEqual (3, item);
221                 }
222
223                 [Test]
224                 public void LinkToPredicateMaxMessagesTest ()
225                 {
226                         var scheduler = new TestScheduler ();
227                         var source = new BufferBlock<int> (
228                                 new DataflowBlockOptions { TaskScheduler = scheduler });
229                         var target = new BufferBlock<int> ();
230                         source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 1 },
231                                 i => i % 2 == 1);
232
233                         Assert.IsTrue (source.Post (2));
234                         Assert.IsTrue (source.Post (1));
235                         Assert.IsTrue (source.Post (3));
236
237                         scheduler.ExecuteAll ();
238
239                         int item;
240                         Assert.IsFalse (target.TryReceive (out item));
241                         Assert.IsTrue (source.TryReceive (out item));
242                         Assert.AreEqual (2, item);
243
244                         scheduler.ExecuteAll ();
245
246                         Assert.IsTrue (target.TryReceive (out item));
247                         Assert.AreEqual (1, item);
248
249                         scheduler.ExecuteAll ();
250                         
251                         Assert.IsFalse (target.TryReceive (out item));
252                 }
253
254                 [Test]
255                 public void LinkToPredicatePostponed ()
256                 {
257                         var scheduler = new TestScheduler ();
258                         var source = new BufferBlock<int> (
259                                 new DataflowBlockOptions { TaskScheduler = scheduler });
260                         var target = new BufferBlock<int> (
261                                 new DataflowBlockOptions { BoundedCapacity = 1, TaskScheduler = scheduler });
262                         source.LinkTo (target, i => true);
263
264                         Assert.IsTrue (target.Post (1));
265                         Assert.IsTrue (source.Post (2));
266
267                         scheduler.ExecuteAll ();
268
269                         int item;
270                         Assert.IsTrue (target.TryReceive (out item));
271                         Assert.AreEqual (1, item);
272
273                         scheduler.ExecuteAll ();
274
275                         Assert.IsTrue (target.TryReceive (out item));
276                         Assert.AreEqual (2, item);
277                 }
278
279                 [Test]
280                 public void LinkToPredicateClonerTest ()
281                 {
282                         var scheduler = new TestScheduler ();
283                         var source = new BroadcastBlock<int> (i => i * 10,
284                                 new DataflowBlockOptions { TaskScheduler = scheduler });
285                         var target = new BufferBlock<int> ();
286                         source.LinkTo (target, i => i < 10);
287
288                         Assert.IsTrue (source.Post (1));
289
290                         scheduler.ExecuteAll ();
291
292                         int item;
293                         Assert.IsTrue (target.TryReceive (out item));
294                         Assert.AreEqual (10, item);
295                 }
296
297                 [Test]
298                 public void NullTargetTest ()
299                 {
300                         var target = DataflowBlock.NullTarget<int> ();
301                         Assert.IsTrue (target.Post (1));
302
303                         var source = new TestSourceBlock<int> ();
304                         var header = new DataflowMessageHeader (1);
305                         source.AddMessage (header, 2);
306
307                         Assert.IsFalse (source.WasConsumed (header));
308
309                         Assert.AreEqual (DataflowMessageStatus.Accepted,
310                                 target.OfferMessage (header, 2, source, true));
311                         Assert.IsTrue (source.WasConsumed (header));
312
313                         Assert.IsFalse (target.Completion.Wait (100));
314
315                         target.Complete ();
316
317                         Assert.IsFalse (target.Completion.Wait (100));
318
319                         target.Fault (new Exception ());
320
321                         Assert.IsFalse (target.Completion.Wait (100));
322                 }
323         }
324 }