Merge pull request #409 from Alkarex/patch-1
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / ChooserBlock.cs
1 // JoinBlock.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23
24 namespace System.Threading.Tasks.Dataflow {
25         /// <summary>
26         /// Block used in all versions of <see cref="DataflowBlock.Choose"/>.
27         /// </summary>
28         class ChooserBlock<T1, T2, T3> {
29                 /// <summary>
30                 /// Target for one of the sources to choose from.
31                 /// </summary>
32                 class ChooseTarget<TMessage> : ITargetBlock<TMessage> {
33                         readonly ChooserBlock<T1, T2, T3> chooserBlock;
34                         readonly int index;
35                         readonly Action<TMessage> action;
36
37                         public ChooseTarget (ChooserBlock<T1, T2, T3> chooserBlock,
38                                              int index, Action<TMessage> action)
39                         {
40                                 this.chooserBlock = chooserBlock;
41                                 this.index = index;
42                                 this.action = action;
43                         }
44
45                         public DataflowMessageStatus OfferMessage (
46                                 DataflowMessageHeader messageHeader, TMessage messageValue,
47                                 ISourceBlock<TMessage> source, bool consumeToAccept)
48                         {
49                                 if (!chooserBlock.canAccept)
50                                         return DataflowMessageStatus.DecliningPermanently;
51
52                                 bool lockTaken = false;
53                                 try {
54                                         chooserBlock.messageLock.Enter (ref lockTaken);
55                                         if (!chooserBlock.canAccept)
56                                                 return DataflowMessageStatus.DecliningPermanently;
57
58                                         if (consumeToAccept) {
59                                                 bool consummed;
60                                                 messageValue = source.ConsumeMessage (messageHeader, this, out consummed);
61                                                 if (!consummed)
62                                                         return DataflowMessageStatus.NotAvailable;
63                                         }
64
65                                         chooserBlock.canAccept = false;
66                                 } finally {
67                                         if (lockTaken)
68                                                 chooserBlock.messageLock.Exit ();
69                                 }
70
71                                 chooserBlock.MessageArrived (index, action, messageValue);
72                                 return DataflowMessageStatus.Accepted;
73                         }
74
75                         public Task Completion {
76                                 get { return null; }
77                         }
78
79                         public void Complete ()
80                         {
81                         }
82
83                         public void Fault (Exception exception)
84                         {
85                         }
86                 }
87
88                 readonly TaskCompletionSource<int> completion = new TaskCompletionSource<int> ();
89
90                 SpinLock messageLock;
91                 bool canAccept = true;
92
93                 public ChooserBlock (
94                         Action<T1> action1, Action<T2> action2, Action<T3> action3,
95                         DataflowBlockOptions dataflowBlockOptions)
96                 {
97                         Target1 = new ChooseTarget<T1> (this, 0, action1);
98                         Target2 = new ChooseTarget<T2> (this, 1, action2);
99                         if (action3 != null)
100                                 Target3 = new ChooseTarget<T3> (this, 2, action3);
101
102                         if (dataflowBlockOptions.CancellationToken != CancellationToken.None)
103                                 dataflowBlockOptions.CancellationToken.Register (Cancelled);
104                 }
105
106                 /// <summary>
107                 /// Causes cancellation of <see cref="Completion"/>.
108                 /// If a message is already being consumed (and the consumsing succeeds)
109                 /// or if its action is being invoked, the Task is not cancelled.
110                 /// </summary>
111                 void Cancelled ()
112                 {
113                         if (!canAccept)
114                                 return;
115
116                         bool lockTaken = false;
117                         try {
118                                 messageLock.Enter (ref lockTaken);
119                                 if (!canAccept)
120                                         return;
121
122                                 completion.SetCanceled ();
123
124                                 canAccept = false;
125                         } finally {
126                                 if (lockTaken)
127                                         messageLock.Exit ();
128                         }
129                 }
130
131                 /// <summary>
132                 /// Called when all sources have completed,
133                 /// causes cancellation of <see cref="Completion"/>.
134                 /// </summary>
135                 public void AllSourcesCompleted ()
136                 {
137                         Cancelled ();
138                 }
139
140                 /// <summary>
141                 /// Called when message has arrived (and was consumed, if necessary).
142                 /// This method can be called only once in the lifetime of this object.
143                 /// </summary>
144                 void MessageArrived<TMessage> (
145                         int index, Action<TMessage> action, TMessage value)
146                 {
147                         try {
148                                 action (value);
149                                 completion.SetResult (index);
150                         } catch (Exception e) {
151                                 completion.SetException (e);
152                         }
153                 }
154
155                 /// <summary>
156                 /// Target block for the first source block.
157                 /// </summary>
158                 public ITargetBlock<T1> Target1 { get; private set; }
159
160                 /// <summary>
161                 /// Target block for the second source block.
162                 /// </summary>
163                 public ITargetBlock<T2> Target2 { get; private set; }
164
165                 /// <summary>
166                 /// Target block for the third source block.
167                 /// Is <c>null</c> if there are only two actions.
168                 /// </summary>
169                 public ITargetBlock<T3> Target3 { get; private set; }
170
171                 /// <summary>
172                 /// Task that signifies that an item was accepted and
173                 /// its action has been called.
174                 /// </summary>
175                 public Task<int> Completion {
176                         get { return completion.Task; }
177                 }
178         }
179 }