1 // BoundedCapacityTest.cs
3 // Copyright (c) 2012 Petr Onderka
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 using System.Collections.Generic;
25 using System.Threading;
26 using System.Threading.Tasks;
27 using System.Threading.Tasks.Dataflow;
28 using NUnit.Framework;
30 namespace MonoTests.System.Threading.Tasks.Dataflow {
32 public class BoundedCapacityTest {
34 public void PostTest ()
37 new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
38 Assert.IsTrue (block.Post (1));
39 Assert.IsFalse (block.Post (2));
40 Assert.AreEqual (1, block.Receive ());
41 Assert.IsTrue (block.Post (3));
42 Assert.AreEqual (3, block.Receive ());
46 public void OfferMessageTest ()
49 new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
50 ITargetBlock<int> target = block;
52 Assert.AreEqual (DataflowMessageStatus.Accepted,
53 target.OfferMessage (new DataflowMessageHeader (1), 42, null, false));
54 Assert.AreEqual (DataflowMessageStatus.Declined,
55 target.OfferMessage (new DataflowMessageHeader (2), 43, null, false));
57 Assert.AreEqual (42, block.Receive ());
59 Assert.AreEqual (DataflowMessageStatus.Accepted,
60 target.OfferMessage (new DataflowMessageHeader (3), 44, null, false));
62 Assert.AreEqual (44, block.Receive ());
66 public void OfferMessageWithSourceTest ()
69 new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
70 ITargetBlock<int> target = block;
71 var source = new TestSourceBlock<int> ();
73 Assert.AreEqual (DataflowMessageStatus.Accepted,
74 target.OfferMessage (new DataflowMessageHeader (1), 42, source, false));
75 var header = new DataflowMessageHeader (2);
76 source.AddMessage (header, 43);
77 Assert.AreEqual (DataflowMessageStatus.Postponed,
78 target.OfferMessage (header, 43, source, false));
80 Assert.AreEqual (42, block.Receive (TimeSpan.FromMilliseconds (1000)));
82 Assert.IsFalse (block.Completion.Wait (100));
84 Assert.IsTrue (source.WasConsumed (header));
86 Assert.AreEqual (43, block.Receive (TimeSpan.FromMilliseconds (1000)));
88 Assert.AreEqual (DataflowMessageStatus.Accepted,
89 target.OfferMessage (new DataflowMessageHeader (3), 44, source, false));
91 Assert.AreEqual (44, block.Receive ());
95 public void TransformManyBlockTest ()
97 var block = new TransformManyBlock<int, int> (
99 new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
101 Assert.IsTrue (block.Post (1));
102 Assert.IsFalse (block.Post (2));
104 Assert.IsFalse (block.Completion.Wait (100));
106 Assert.IsFalse (block.Post (3));
108 Assert.AreEqual (-1, block.Receive ());
110 Assert.IsFalse (block.Post (4));
112 Assert.AreEqual (1, block.Receive ());
114 Assert.IsTrue (block.Post (5));
116 Assert.AreEqual (-5, block.Receive ());
117 Assert.AreEqual (5, block.Receive ());
121 public void TransformFullTest ()
123 var scheduler = new TestScheduler ();
126 var transform = new TransformBlock<int, int> (
127 i => Interlocked.Increment (ref n),
128 new ExecutionDataflowBlockOptions
129 { BoundedCapacity = 2, TaskScheduler = scheduler });
131 Assert.IsTrue (transform.Post (1));
132 Assert.IsTrue (transform.Post (2));
134 AssertHelper.GreaterOrEqual (scheduler.ExecuteAll (), 1);
136 Assert.AreEqual (2, Volatile.Read (ref n));
140 public void TransformManyOverfullTest ()
142 var scheduler = new TestScheduler ();
145 var transform = new TransformManyBlock<int, int> (
148 Interlocked.Increment (ref n);
149 return new[] { -i, i };
151 new ExecutionDataflowBlockOptions
152 { BoundedCapacity = 2, TaskScheduler = scheduler });
154 Assert.IsTrue (transform.Post (1));
155 Assert.IsTrue (transform.Post (2));
157 AssertHelper.GreaterOrEqual (scheduler.ExecuteAll (), 1);
159 Assert.AreEqual (2, Volatile.Read (ref n));
165 public void TransformManyOverfullTest2 ()
167 var scheduler = new TestScheduler ();
170 var transform = new TransformManyBlock<int, int> (
171 i => ComputeResults (),
172 new ExecutionDataflowBlockOptions
173 { BoundedCapacity = 100, TaskScheduler = scheduler });
175 for (int i = 0; i < 100; i++)
176 Assert.IsTrue (transform.Post (i));
178 Assert.IsFalse (transform.Post (101));
180 AssertHelper.GreaterOrEqual (scheduler.ExecuteAll (), 1);
182 Assert.IsFalse (transform.Post (102));
184 Assert.AreEqual (10000, Volatile.Read (ref n));
187 IEnumerable<int> ComputeResults ()
189 for (int j = 0; j < 100; j++)
190 yield return Interlocked.Increment (ref n);
194 public void MultipleOffersTest ()
196 var scheduler = new TestScheduler ();
197 var block = new BufferBlock<int> (
198 new DataflowBlockOptions { BoundedCapacity = 1, TaskScheduler = scheduler });
199 var target = (ITargetBlock<int>)block;
200 var source = new TestSourceBlock<int> ();
202 var header1 = new DataflowMessageHeader (1);
203 Assert.AreEqual (DataflowMessageStatus.Accepted,
204 target.OfferMessage (header1, 41, source, false));
206 var header2 = new DataflowMessageHeader (2);
207 source.AddMessage (header2, 42);
208 Assert.AreEqual (DataflowMessageStatus.Postponed,
209 target.OfferMessage (header2, 42, source, false));
211 var header3 = new DataflowMessageHeader (3);
212 source.AddMessage (header3, 43);
213 Assert.AreEqual (DataflowMessageStatus.Postponed,
214 target.OfferMessage (header3, 43, source, false));
216 Assert.AreEqual (41, block.Receive ());
217 scheduler.ExecuteAll ();
218 Assert.IsTrue (source.WasConsumed (header3));
219 Assert.IsFalse (source.WasConsumed (header2));
223 public void DontConsumePostponedAfterCompleteTest ()
225 var scheduler = new TestScheduler ();
226 var block = new BufferBlock<int> (
227 new DataflowBlockOptions { BoundedCapacity = 1, TaskScheduler = scheduler });
228 var target = (ITargetBlock<int>)block;
229 var source = new TestSourceBlock<int> ();
231 Assert.IsTrue (block.Post (11));
233 var header = new DataflowMessageHeader (1);
234 source.AddMessage (header, 12);
235 Assert.AreEqual (DataflowMessageStatus.Postponed,
236 target.OfferMessage (header, 12, source, false));
240 Assert.AreEqual (11, block.Receive ());
242 scheduler.ExecuteAll ();
244 Assert.IsFalse (source.WasConsumed (header));
248 class TestSourceBlock<T> : ISourceBlock<T> {
249 readonly Dictionary<DataflowMessageHeader, T> messages =
250 new Dictionary<DataflowMessageHeader, T> ();
252 readonly HashSet<DataflowMessageHeader> consumed =
253 new HashSet<DataflowMessageHeader> ();
254 readonly HashSet<DataflowMessageHeader> reserved =
255 new HashSet<DataflowMessageHeader> ();
257 public void Complete ()
259 throw new NotImplementedException ();
262 public void Fault (Exception exception)
264 throw new NotImplementedException ();
267 public Task Completion { get; private set; }
269 public void AddMessage (DataflowMessageHeader header, T item)
271 messages.Add (header, item);
274 public bool WasConsumed (DataflowMessageHeader header)
276 return consumed.Contains (header);
279 public bool WasReserved (DataflowMessageHeader header)
281 return reserved.Contains (header);
284 public Action ConsumeWaiter { get; set; }
286 public T ConsumeMessage (DataflowMessageHeader messageHeader,
287 ITargetBlock<T> target, out bool messageConsumed)
290 if (messages.TryGetValue (messageHeader, out item)) {
291 if (ConsumeWaiter != null)
293 messages.Remove (messageHeader);
294 consumed.Add (messageHeader);
295 messageConsumed = true;
298 messageConsumed = false;
302 public IDisposable LinkTo (ITargetBlock<T> target,
303 DataflowLinkOptions linkOptions)
305 throw new NotImplementedException ();
308 public bool ReserveMessage (DataflowMessageHeader messageHeader,
309 ITargetBlock<T> target)
311 reserved.Add (messageHeader);
312 return messages.ContainsKey (messageHeader);
315 public void ReleaseReservation (DataflowMessageHeader messageHeader,
316 ITargetBlock<T> target)
318 throw new NotImplementedException ();