3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 namespace System.Threading.Tasks.Dataflow {
25 public static class DataflowBlock {
26 public static IObservable<TOutput> AsObservable<TOutput> (this ISourceBlock<TOutput> source)
29 throw new ArgumentNullException ("source");
31 return new ObservableDataflowBlock<TOutput> (source);
34 public static IObserver<TInput> AsObserver<TInput> (this ITargetBlock<TInput> target)
37 throw new ArgumentNullException ("target");
39 return new ObserverDataflowBlock<TInput> (target);
42 public static Task<int> Choose<T1, T2> (
43 ISourceBlock<T1> source1, Action<T1> action1,
44 ISourceBlock<T2> source2, Action<T2> action2)
46 return Choose (source1, action1, source2, action2,
47 DataflowBlockOptions.Default);
50 public static Task<int> Choose<T1, T2> (
51 ISourceBlock<T1> source1, Action<T1> action1,
52 ISourceBlock<T2> source2, Action<T2> action2,
53 DataflowBlockOptions dataflowBlockOptions)
56 throw new ArgumentNullException ("source1");
58 throw new ArgumentNullException ("source2");
60 throw new ArgumentNullException ("action1");
62 throw new ArgumentNullException ("action2");
63 if (dataflowBlockOptions == null)
64 throw new ArgumentNullException ("dataflowBlockOptions");
66 var chooser = new ChooserBlock<T1, T2, object> (action1, action2, null, dataflowBlockOptions);
67 source1.LinkTo (chooser.Target1);
68 source2.LinkTo (chooser.Target2);
70 Task.WhenAll (source1.Completion, source2.Completion)
71 .ContinueWith (_ => chooser.AllSourcesCompleted ());
73 return chooser.Completion;
76 public static Task<int> Choose<T1, T2, T3> (
77 ISourceBlock<T1> source1, Action<T1> action1,
78 ISourceBlock<T2> source2, Action<T2> action2,
79 ISourceBlock<T3> source3, Action<T3> action3)
81 return Choose (source1, action1, source2, action2, source3, action3,
82 DataflowBlockOptions.Default);
85 public static Task<int> Choose<T1, T2, T3> (
86 ISourceBlock<T1> source1, Action<T1> action1,
87 ISourceBlock<T2> source2, Action<T2> action2,
88 ISourceBlock<T3> source3, Action<T3> action3,
89 DataflowBlockOptions dataflowBlockOptions)
92 throw new ArgumentNullException ("source1");
94 throw new ArgumentNullException ("source2");
96 throw new ArgumentNullException ("source3");
98 throw new ArgumentNullException ("action1");
100 throw new ArgumentNullException ("action2");
102 throw new ArgumentNullException ("action3");
103 if (dataflowBlockOptions == null)
104 throw new ArgumentNullException ("dataflowBlockOptions");
106 var chooser = new ChooserBlock<T1, T2, T3> (action1, action2, action3, dataflowBlockOptions);
107 source1.LinkTo (chooser.Target1);
108 source2.LinkTo (chooser.Target2);
109 source3.LinkTo (chooser.Target3);
111 Task.WhenAll (source1.Completion, source2.Completion, source3.Completion)
112 .ContinueWith (_ => chooser.AllSourcesCompleted ());
114 return chooser.Completion;
117 public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput> (
118 ITargetBlock<TInput> target, ISourceBlock<TOutput> source)
120 return new PropagatorWrapperBlock<TInput, TOutput> (target, source);
123 public static IDisposable LinkTo<TOutput> (this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target)
126 throw new ArgumentNullException ("source");
128 return source.LinkTo (target, DataflowLinkOptions.Default);
131 public static IDisposable LinkTo<TOutput> (
132 this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target,
133 Predicate<TOutput> predicate)
136 throw new ArgumentNullException ("source");
138 return source.LinkTo (target, DataflowLinkOptions.Default, predicate);
141 public static IDisposable LinkTo<TOutput> (
142 this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target,
143 DataflowLinkOptions linkOptions, Predicate<TOutput> predicate)
146 throw new ArgumentNullException ("source");
147 if (predicate == null)
148 throw new ArgumentNullException ("predicate");
150 throw new ArgumentNullException ("target");
152 var predicateBlock = new PredicateBlock<TOutput> (source, target, predicate);
154 return source.LinkTo (predicateBlock, linkOptions);
157 public static Task<bool> OutputAvailableAsync<TOutput> (
158 this ISourceBlock<TOutput> source)
160 return OutputAvailableAsync (source, CancellationToken.None);
163 public static Task<bool> OutputAvailableAsync<TOutput> (
164 this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
167 throw new ArgumentNullException ("source");
169 cancellationToken.ThrowIfCancellationRequested ();
171 if (source.Completion.IsCompleted || source.Completion.IsCanceled
172 || source.Completion.IsFaulted)
173 return Task.FromResult (false);
175 var block = new OutputAvailableBlock<TOutput> ();
176 var bridge = source.LinkTo (block,
177 new DataflowLinkOptions { PropagateCompletion = true });
178 return block.AsyncGet (bridge, cancellationToken);
181 public static bool Post<TInput> (this ITargetBlock<TInput> target, TInput item)
184 throw new ArgumentNullException ("target");
186 return target.OfferMessage (new DataflowMessageHeader(1), item, null, false)
187 == DataflowMessageStatus.Accepted;
190 public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source)
192 return Receive (source, TimeSpan.FromMilliseconds (-1), CancellationToken.None);
195 public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
197 return Receive (source, TimeSpan.FromMilliseconds (-1), cancellationToken);
200 public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout)
202 return Receive (source, timeout, CancellationToken.None);
205 public static TOutput Receive<TOutput> (
206 this ISourceBlock<TOutput> source, TimeSpan timeout,
207 CancellationToken cancellationToken)
210 throw new ArgumentNullException ("source");
211 if (timeout.TotalMilliseconds < -1)
212 throw new ArgumentOutOfRangeException ("timeout");
213 if (timeout.TotalMilliseconds > int.MaxValue)
214 throw new ArgumentOutOfRangeException ("timeout");
216 cancellationToken.ThrowIfCancellationRequested ();
219 var receivableSource = source as IReceivableSourceBlock<TOutput>;
220 if (receivableSource != null && receivableSource.TryReceive (null, out item))
223 if (source.Completion.IsCompleted || source.Completion.IsCanceled
224 || source.Completion.IsFaulted)
225 throw new InvalidOperationException (
226 "No item could be received from the source.");
228 int timeoutMilliseconds = (int)timeout.TotalMilliseconds;
229 var block = new ReceiveBlock<TOutput> (cancellationToken, timeoutMilliseconds);
230 var bridge = source.LinkTo (block,
231 new DataflowLinkOptions { PropagateCompletion = true });
232 return block.WaitAndGet (bridge);
235 public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source)
237 return ReceiveAsync (source, TimeSpan.FromMilliseconds (-1), CancellationToken.None);
240 public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
242 return ReceiveAsync (source, TimeSpan.FromMilliseconds (-1), cancellationToken);
245 public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout)
247 return ReceiveAsync (source, timeout, CancellationToken.None);
250 public static Task<TOutput> ReceiveAsync<TOutput> (
251 this ISourceBlock<TOutput> source, TimeSpan timeout,
252 CancellationToken cancellationToken)
255 throw new ArgumentNullException ("source");
256 if (timeout.TotalMilliseconds < -1)
257 throw new ArgumentOutOfRangeException ("timeout");
258 if (timeout.TotalMilliseconds > int.MaxValue)
259 throw new ArgumentOutOfRangeException ("timeout");
261 cancellationToken.ThrowIfCancellationRequested ();
263 int timeoutMilliseconds = (int)timeout.TotalMilliseconds;
264 var block = new ReceiveBlock<TOutput> (cancellationToken, timeoutMilliseconds);
265 var bridge = source.LinkTo (block);
266 return block.AsyncGet (bridge);
269 public static bool TryReceive<TOutput> (this IReceivableSourceBlock<TOutput> source, out TOutput item)
271 item = default (TOutput);
273 throw new ArgumentNullException ("source");
275 return source.TryReceive (null, out item);
278 public static Task<bool> SendAsync<TInput> (
279 this ITargetBlock<TInput> target, TInput item)
281 return SendAsync (target, item, CancellationToken.None);
284 public static Task<bool> SendAsync<TInput> (
285 this ITargetBlock<TInput> target, TInput item,
286 CancellationToken cancellationToken)
289 throw new ArgumentNullException ("target");
291 cancellationToken.ThrowIfCancellationRequested ();
293 var status = target.OfferMessage (
294 new DataflowMessageHeader (1), item, null, false);
296 if (status == DataflowMessageStatus.Accepted)
297 return Task.FromResult (true);
298 if (status != DataflowMessageStatus.Declined
299 && status != DataflowMessageStatus.Postponed)
300 return Task.FromResult (false);
302 var block = new SendBlock<TInput> (target, item, cancellationToken);
303 return block.Send ();
306 public static ITargetBlock<TInput> NullTarget<TInput>()
308 return new NullTargetBlock<TInput> ();