3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 namespace System.Threading.Tasks.Dataflow {
26 /// Block used in all versions of <see cref="DataflowBlock.Choose"/>.
28 class ChooserBlock<T1, T2, T3> {
30 /// Target for one of the sources to choose from.
32 class ChooseTarget<TMessage> : ITargetBlock<TMessage> {
33 readonly ChooserBlock<T1, T2, T3> chooserBlock;
35 readonly Action<TMessage> action;
37 public ChooseTarget (ChooserBlock<T1, T2, T3> chooserBlock,
38 int index, Action<TMessage> action)
40 this.chooserBlock = chooserBlock;
45 public DataflowMessageStatus OfferMessage (
46 DataflowMessageHeader messageHeader, TMessage messageValue,
47 ISourceBlock<TMessage> source, bool consumeToAccept)
49 if (!chooserBlock.canAccept)
50 return DataflowMessageStatus.DecliningPermanently;
52 bool lockTaken = false;
54 chooserBlock.messageLock.Enter (ref lockTaken);
55 if (!chooserBlock.canAccept)
56 return DataflowMessageStatus.DecliningPermanently;
58 if (consumeToAccept) {
60 messageValue = source.ConsumeMessage (messageHeader, this, out consummed);
62 return DataflowMessageStatus.NotAvailable;
65 chooserBlock.canAccept = false;
68 chooserBlock.messageLock.Exit ();
71 chooserBlock.MessageArrived (index, action, messageValue);
72 return DataflowMessageStatus.Accepted;
75 public Task Completion {
79 public void Complete ()
83 public void Fault (Exception exception)
88 readonly TaskCompletionSource<int> completion = new TaskCompletionSource<int> ();
91 bool canAccept = true;
94 Action<T1> action1, Action<T2> action2, Action<T3> action3,
95 DataflowBlockOptions dataflowBlockOptions)
97 Target1 = new ChooseTarget<T1> (this, 0, action1);
98 Target2 = new ChooseTarget<T2> (this, 1, action2);
100 Target3 = new ChooseTarget<T3> (this, 2, action3);
102 if (dataflowBlockOptions.CancellationToken != CancellationToken.None)
103 dataflowBlockOptions.CancellationToken.Register (Cancelled);
107 /// Causes cancellation of <see cref="Completion"/>.
108 /// If a message is already being consumed (and the consumsing succeeds)
109 /// or if its action is being invoked, the Task is not cancelled.
116 bool lockTaken = false;
118 messageLock.Enter (ref lockTaken);
122 completion.SetCanceled ();
132 /// Called when all sources have completed,
133 /// causes cancellation of <see cref="Completion"/>.
135 public void AllSourcesCompleted ()
141 /// Called when message has arrived (and was consumed, if necessary).
142 /// This method can be called only once in the lifetime of this object.
144 void MessageArrived<TMessage> (
145 int index, Action<TMessage> action, TMessage value)
149 completion.SetResult (index);
150 } catch (Exception e) {
151 completion.SetException (e);
156 /// Target block for the first source block.
158 public ITargetBlock<T1> Target1 { get; private set; }
161 /// Target block for the second source block.
163 public ITargetBlock<T2> Target2 { get; private set; }
166 /// Target block for the third source block.
167 /// Is <c>null</c> if there are only two actions.
169 public ITargetBlock<T3> Target3 { get; private set; }
172 /// Task that signifies that an item was accepted and
173 /// its action has been called.
175 public Task<int> Completion {
176 get { return completion.Task; }