e9f23a8589760d4b6f0eab90f9b12ad9bbe0e2a8
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / DataflowBlock.cs
1 // DataflowBlock.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23
24 namespace System.Threading.Tasks.Dataflow {
25         public static class DataflowBlock {
26                 public static IObservable<TOutput> AsObservable<TOutput> (this ISourceBlock<TOutput> source)
27                 {
28                         if (source == null)
29                                 throw new ArgumentNullException ("source");
30
31                         return new ObservableDataflowBlock<TOutput> (source);
32                 }
33
34                 public static IObserver<TInput> AsObserver<TInput> (this ITargetBlock<TInput> target)
35                 {
36                         if (target == null)
37                                 throw new ArgumentNullException ("target");
38
39                         return new ObserverDataflowBlock<TInput> (target);
40                 }
41
42                 public static Task<int> Choose<T1, T2> (
43                         ISourceBlock<T1> source1, Action<T1> action1,
44                         ISourceBlock<T2> source2, Action<T2> action2)
45                 {
46                         return Choose (source1, action1, source2, action2,
47                                 DataflowBlockOptions.Default);
48                 }
49
50                 public static Task<int> Choose<T1, T2> (
51                         ISourceBlock<T1> source1, Action<T1> action1,
52                         ISourceBlock<T2> source2, Action<T2> action2,
53                         DataflowBlockOptions dataflowBlockOptions)
54                 {
55                         if (source1 == null)
56                                 throw new ArgumentNullException ("source1");
57                         if (source2 == null)
58                                 throw new ArgumentNullException ("source2");
59                         if (action1 == null)
60                                 throw new ArgumentNullException ("action1");
61                         if (action2 == null)
62                                 throw new ArgumentNullException ("action2");
63                         if (dataflowBlockOptions == null)
64                                 throw new ArgumentNullException ("dataflowBlockOptions");
65
66                         var chooser = new ChooserBlock<T1, T2, object> (action1, action2, null, dataflowBlockOptions);
67                         source1.LinkTo (chooser.Target1);
68                         source2.LinkTo (chooser.Target2);
69
70                         Task.WhenAll (source1.Completion, source2.Completion)
71                                 .ContinueWith (_ => chooser.AllSourcesCompleted ());
72
73                         return chooser.Completion;
74                 }
75
76                 public static Task<int> Choose<T1, T2, T3> (
77                         ISourceBlock<T1> source1, Action<T1> action1,
78                         ISourceBlock<T2> source2, Action<T2> action2,
79                         ISourceBlock<T3> source3, Action<T3> action3)
80                 {
81                         return Choose (source1, action1, source2, action2, source3, action3,
82                                 DataflowBlockOptions.Default);
83                 }
84
85                 public static Task<int> Choose<T1, T2, T3> (
86                         ISourceBlock<T1> source1, Action<T1> action1,
87                         ISourceBlock<T2> source2, Action<T2> action2,
88                         ISourceBlock<T3> source3, Action<T3> action3,
89                         DataflowBlockOptions dataflowBlockOptions)
90                 {
91                         if (source1 == null)
92                                 throw new ArgumentNullException ("source1");
93                         if (source2 == null)
94                                 throw new ArgumentNullException ("source2");
95                         if (source3 == null)
96                                 throw new ArgumentNullException ("source3");
97                         if (action1 == null)
98                                 throw new ArgumentNullException ("action1");
99                         if (action2 == null)
100                                 throw new ArgumentNullException ("action2");
101                         if (action3 == null)
102                                 throw new ArgumentNullException ("action3");
103                         if (dataflowBlockOptions == null)
104                                 throw new ArgumentNullException ("dataflowBlockOptions");
105
106                         var chooser = new ChooserBlock<T1, T2, T3> (action1, action2, action3, dataflowBlockOptions);
107                         source1.LinkTo (chooser.Target1);
108                         source2.LinkTo (chooser.Target2);
109                         source3.LinkTo (chooser.Target3);
110
111                         Task.WhenAll (source1.Completion, source2.Completion, source3.Completion)
112                                 .ContinueWith (_ => chooser.AllSourcesCompleted ());
113
114                         return chooser.Completion;
115                 }
116
117                 public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput> (
118                         ITargetBlock<TInput> target, ISourceBlock<TOutput> source)
119                 {
120                         return new PropagatorWrapperBlock<TInput, TOutput> (target, source);
121                 }
122
123                 public static IDisposable LinkTo<TOutput> (this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target)
124                 {
125                         if (source == null)
126                                 throw new ArgumentNullException ("source");
127
128                         return source.LinkTo (target, DataflowLinkOptions.Default);
129                 }
130
131                 public static IDisposable LinkTo<TOutput> (
132                         this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target,
133                         Predicate<TOutput> predicate)
134                 {
135                         if (source == null)
136                                 throw new ArgumentNullException ("source");
137
138                         return source.LinkTo (target, DataflowLinkOptions.Default, predicate);
139                 }
140
141                 public static IDisposable LinkTo<TOutput> (
142                         this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target,
143                         DataflowLinkOptions linkOptions, Predicate<TOutput> predicate)
144                 {
145                         if (source == null)
146                                 throw new ArgumentNullException ("source");
147                         if (predicate == null)
148                                 throw new ArgumentNullException ("predicate");
149                         if (target == null)
150                                 throw new ArgumentNullException ("target");
151
152                         var predicateBlock = new PredicateBlock<TOutput> (source, target, predicate);
153
154                         return source.LinkTo (predicateBlock, linkOptions);
155                 }
156
157                 public static Task<bool> OutputAvailableAsync<TOutput> (
158                         this ISourceBlock<TOutput> source)
159                 {
160                         return OutputAvailableAsync (source, CancellationToken.None);
161                 }
162
163                 public static Task<bool> OutputAvailableAsync<TOutput> (
164                         this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
165                 {
166                         if (source == null)
167                                 throw new ArgumentNullException ("source");
168
169                         cancellationToken.ThrowIfCancellationRequested ();
170
171                         if (source.Completion.IsCompleted || source.Completion.IsCanceled
172                             || source.Completion.IsFaulted)
173                                 return Task.FromResult (false);
174
175                         var block = new OutputAvailableBlock<TOutput> ();
176                         var bridge = source.LinkTo (block,
177                                 new DataflowLinkOptions { PropagateCompletion = true });
178                         return block.AsyncGet (bridge, cancellationToken);
179                 }
180
181                 public static bool Post<TInput> (this ITargetBlock<TInput> target, TInput item)
182                 {
183                         if (target == null)
184                                 throw new ArgumentNullException ("target");
185
186                         return target.OfferMessage (new DataflowMessageHeader(1), item, null, false)
187                                == DataflowMessageStatus.Accepted;
188                 }
189
190                 public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source)
191                 {
192                         return Receive (source, TimeSpan.FromMilliseconds (-1), CancellationToken.None);
193                 }
194
195                 public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
196                 {
197                         return Receive (source, TimeSpan.FromMilliseconds (-1), cancellationToken);
198                 }
199
200                 public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout)
201                 {
202                         return Receive (source, timeout, CancellationToken.None);
203                 }
204
205                 public static TOutput Receive<TOutput> (
206                         this ISourceBlock<TOutput> source, TimeSpan timeout,
207                         CancellationToken cancellationToken)
208                 {
209                         if (source == null)
210                                 throw new ArgumentNullException ("source");
211                         if (timeout.TotalMilliseconds < -1)
212                                 throw new ArgumentOutOfRangeException ("timeout");
213                         if (timeout.TotalMilliseconds > int.MaxValue)
214                                 throw new ArgumentOutOfRangeException ("timeout");
215
216                         cancellationToken.ThrowIfCancellationRequested ();
217
218                         TOutput item;
219                         var receivableSource = source as IReceivableSourceBlock<TOutput>;
220                         if (receivableSource != null && receivableSource.TryReceive (null, out item))
221                                 return item;
222
223                         if (source.Completion.IsCompleted || source.Completion.IsCanceled
224                             || source.Completion.IsFaulted)
225                                 throw new InvalidOperationException (
226                                         "No item could be received from the source.");
227
228                         int timeoutMilliseconds = (int)timeout.TotalMilliseconds;
229                         var block = new ReceiveBlock<TOutput> (cancellationToken, timeoutMilliseconds);
230                         var bridge = source.LinkTo (block,
231                                 new DataflowLinkOptions { PropagateCompletion = true });
232                         return block.WaitAndGet (bridge);
233                 }
234
235                 public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source)
236                 {
237                         return ReceiveAsync (source, TimeSpan.FromMilliseconds (-1), CancellationToken.None);
238                 }
239
240                 public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
241                 {
242                         return ReceiveAsync (source, TimeSpan.FromMilliseconds (-1), cancellationToken);
243                 }
244
245                 public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout)
246                 {
247                         return ReceiveAsync (source, timeout, CancellationToken.None);
248                 }
249
250                 public static Task<TOutput> ReceiveAsync<TOutput> (
251                         this ISourceBlock<TOutput> source, TimeSpan timeout,
252                         CancellationToken cancellationToken)
253                 {
254                         if (source == null)
255                                 throw new ArgumentNullException ("source");
256                         if (timeout.TotalMilliseconds < -1)
257                                 throw new ArgumentOutOfRangeException ("timeout");
258                         if (timeout.TotalMilliseconds > int.MaxValue)
259                                 throw new ArgumentOutOfRangeException ("timeout");
260
261                         cancellationToken.ThrowIfCancellationRequested ();
262
263                         int timeoutMilliseconds = (int)timeout.TotalMilliseconds;
264                         var block = new ReceiveBlock<TOutput> (cancellationToken, timeoutMilliseconds);
265                         var bridge = source.LinkTo (block);
266                         return block.AsyncGet (bridge);
267                 }
268
269                 public static bool TryReceive<TOutput> (this IReceivableSourceBlock<TOutput> source, out TOutput item)
270                 {
271                         item = default (TOutput);
272                         if (source == null)
273                                 throw new ArgumentNullException ("source");
274
275                         return source.TryReceive (null, out item);
276                 }
277
278                 public static Task<bool> SendAsync<TInput> (
279                         this ITargetBlock<TInput> target, TInput item)
280                 {
281                         return SendAsync (target, item, CancellationToken.None);
282                 }
283
284                 public static Task<bool> SendAsync<TInput> (
285                         this ITargetBlock<TInput> target, TInput item,
286                         CancellationToken cancellationToken)
287                 {
288                         if (target == null)
289                                 throw new ArgumentNullException ("target");
290
291                         cancellationToken.ThrowIfCancellationRequested ();
292
293                         var status = target.OfferMessage (
294                                 new DataflowMessageHeader (1), item, null, false);
295
296                         if (status == DataflowMessageStatus.Accepted)
297                                 return Task.FromResult (true);
298                         if (status != DataflowMessageStatus.Declined
299                             && status != DataflowMessageStatus.Postponed)
300                                 return Task.FromResult (false);
301
302                         var block = new SendBlock<TInput> (target, item, cancellationToken);
303                         return block.Send ();
304                 }
305
306                 public static ITargetBlock<TInput> NullTarget<TInput>()
307                 {
308                         return new NullTargetBlock<TInput> ();
309                 }
310         }
311 }