3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 using System.Collections.Concurrent;
25 using System.Collections.Generic;
28 namespace System.Threading.Tasks.Dataflow {
30 /// Used to implement Dataflow completion tracking,
31 /// that is the Completion property, Complete/Fault method combo
32 /// and the CancellationToken option.
34 class CompletionHelper {
35 readonly TaskCompletionSource<object> source =
36 new TaskCompletionSource<object> ();
38 readonly AtomicBoolean canFaultOrCancelImmediatelly =
39 new AtomicBoolean { Value = true };
40 readonly AtomicBoolean requestedFaultOrCancel =
41 new AtomicBoolean { Value = false };
43 readonly ConcurrentQueue<Tuple<Exception, bool>> requestedExceptions =
44 new ConcurrentQueue<Tuple<Exception, bool>> ();
46 public CompletionHelper (DataflowBlockOptions options)
48 if (options != null && options.CancellationToken != CancellationToken.None)
49 options.CancellationToken.Register (RequestCancel);
52 [Obsolete ("Use ctor")]
53 public static CompletionHelper GetNew (DataflowBlockOptions options)
55 return new CompletionHelper (options);
58 public Task Completion {
59 get { return source.Task; }
63 /// Whether <see cref="Completion"/> can be faulted or cancelled immediatelly.
64 /// It can't for example when a block is currently executing user action.
65 /// In that case, the fault (or cancellation) is queued,
66 /// and is actually acted upon when this property is set back to <c>true</c>.
68 public bool CanFaultOrCancelImmediatelly {
69 get { return canFaultOrCancelImmediatelly.Value; }
72 if (canFaultOrCancelImmediatelly.TrySet () && requestedFaultOrCancel.Value) {
73 bool canAllBeIgnored = requestedExceptions.All (t => t.Item2);
74 if (canAllBeIgnored) {
75 Tuple<Exception, bool> tuple;
76 requestedExceptions.TryDequeue (out tuple);
77 var exception = tuple.Item1;
78 if (exception == null)
83 Tuple<Exception, bool> tuple;
85 var exceptions = new List<Exception> (requestedExceptions.Count);
86 while (requestedExceptions.TryDequeue (out tuple)) {
87 var exception = tuple.Item1;
88 bool canBeIgnored = tuple.Item2;
89 if (first || !canBeIgnored) {
90 if (exception != null)
91 exceptions.Add (exception);
99 canFaultOrCancelImmediatelly.Value = false;
104 /// Whether the block can act as if it's not completed
105 /// (accept new items, start executing user action).
108 get { return !Completion.IsCompleted && !requestedFaultOrCancel.Value; }
112 /// Sets the block as completed.
113 /// Should be called only when the block is really completed
114 /// (e.g. the output queue is empty) and not right after
115 /// the user calls <see cref="IDataflowBlock.Complete"/>.
117 public void Complete ()
119 source.TrySetResult (null);
123 /// Requests faulting of the block using a given exception.
124 /// If the block can't be faulted immediatelly (see <see cref="CanFaultOrCancelImmediatelly"/>),
125 /// the exception will be queued, and the block will fault as soon as it can.
127 /// <param name="exception">The exception that is the cause of the fault.</param>
128 /// <param name="canBeIgnored">Can this exception be ignored, if there are more exceptions?</param>
130 /// When calling <see cref="IDataflowBlock.Fault"/> repeatedly, only the first exception counts,
131 /// even in the cases where the block can't be faulted immediatelly.
132 /// But exceptions from user actions in execution blocks count always,
133 /// which is the reason for the <paramref name="canBeIgnored"/> parameter.
135 public void RequestFault (Exception exception, bool canBeIgnored = true)
137 if (exception == null)
138 throw new ArgumentNullException ("exception");
140 if (CanFaultOrCancelImmediatelly)
143 // still need to store canBeIgnored, if we don't want to add locking here
144 if (!canBeIgnored || requestedExceptions.Count == 0)
145 requestedExceptions.Enqueue (Tuple.Create (exception, canBeIgnored));
146 requestedFaultOrCancel.Value = true;
151 /// Actually faults the block with a single exception.
154 /// Should be only called when <see cref="CanFaultOrCancelImmediatelly"/> is <c>true</c>.
156 void Fault (Exception exception)
158 source.TrySetException (exception);
162 /// Actually faults the block with a multiple exceptions.
165 /// Should be only called when <see cref="CanFaultOrCancelImmediatelly"/> is <c>true</c>.
167 void Fault (IEnumerable<Exception> exceptions)
169 source.TrySetException (exceptions);
173 /// Requests cancellation of the block.
174 /// If the block can't be cancelled immediatelly (see <see cref="CanFaultOrCancelImmediatelly"/>),
175 /// the cancellation will be queued, and the block will cancel as soon as it can.
177 void RequestCancel ()
179 if (CanFaultOrCancelImmediatelly)
182 if (requestedExceptions.Count == 0)
183 requestedExceptions.Enqueue (Tuple.Create<Exception, bool> (null, true));
184 requestedFaultOrCancel.Value = true;
189 /// Actually cancels the block.
192 /// Should be only called when <see cref="CanFaultOrCancelImmediatelly"/> is <c>true</c>.
196 source.TrySetCanceled ();