Merge pull request #409 from Alkarex/patch-1
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / SendBlock.cs
1 // SendBlock.cs
2 //
3 // Copyright (c) 2012 Petr Onderka
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22
23 namespace System.Threading.Tasks.Dataflow {
24         /// <summary>
25         /// This block is used in <see cref="DataflowBlock.SendAsync"/>
26         /// to asynchronously wait until a single item is sent to a given target.
27         /// </summary>
28         class SendBlock<T> : ISourceBlock<T> {
29                 readonly ITargetBlock<T> sendTarget;
30                 readonly T item;
31                 CancellationToken cancellationToken;
32                 readonly TaskCompletionSource<bool> taskCompletionSource =
33                         new TaskCompletionSource<bool> ();
34                 readonly DataflowMessageHeader sendHeader = new DataflowMessageHeader (1);
35                 CancellationTokenRegistration cancellationTokenRegistration;
36
37                 bool isReserved;
38
39                 volatile bool cancelDisabled;
40
41                 public SendBlock (ITargetBlock<T> sendTarget, T item,
42                                   CancellationToken cancellationToken)
43                 {
44                         this.sendTarget = sendTarget;
45                         this.item = item;
46                         this.cancellationToken = cancellationToken;
47                 }
48
49                 /// <summary>
50                 /// Sends the item given in the constructor to the target block.
51                 /// </summary>
52                 /// <returns>Task that completes when the sending is done, or can't be performed.</returns>
53                 public Task<bool> Send ()
54                 {
55                         cancellationTokenRegistration = cancellationToken.Register (
56                                 () =>
57                                 {
58                                         if (!cancelDisabled)
59                                                 taskCompletionSource.SetCanceled ();
60                                 });
61
62                         PerformSend ();
63
64                         return taskCompletionSource.Task;
65                 }
66
67                 /// <summary>
68                 /// Offers the item to the target and hadles its response.
69                 /// </summary>
70                 void PerformSend ()
71                 {
72                         DisableCancel ();
73
74                         if (taskCompletionSource.Task.IsCanceled)
75                                 return;
76
77                         var status = sendTarget.OfferMessage (sendHeader, item, this, false);
78
79                         if (status == DataflowMessageStatus.Accepted)
80                                 SetResult (true);
81                         else if (status != DataflowMessageStatus.Postponed)
82                                 SetResult (false);
83                         else
84                                 EnableCancel ();
85                 }
86
87                 public Task Completion {
88                         get { throw new NotSupportedException (); }
89                 }
90
91                 public void Complete ()
92                 {
93                         throw new NotSupportedException ();
94                 }
95
96                 public void Fault (Exception exception)
97                 {
98                         throw new NotSupportedException ();
99                 }
100
101                 public T ConsumeMessage (DataflowMessageHeader messageHeader,
102                                          ITargetBlock<T> target, out bool messageConsumed)
103                 {
104                         if (!messageHeader.IsValid)
105                                 throw new ArgumentException ("The messageHeader is not valid.",
106                                         "messageHeader");
107                         if (target == null)
108                                 throw new ArgumentNullException("target");
109
110                         DisableCancel ();
111
112                         messageConsumed = false;
113
114                         if (taskCompletionSource.Task.IsCanceled)
115                                 return default(T);
116
117                         if (messageHeader != sendHeader || target != sendTarget) {
118                                 EnableCancel ();
119                                 return default(T);
120                         }
121
122                         SetResult (true);
123
124                         messageConsumed = true;
125                         return item;
126                 }
127
128                 public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)
129                 {
130                         throw new NotSupportedException ();
131                 }
132
133                 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
134                 {
135                         if (messageHeader != sendHeader || target != sendTarget || !isReserved)
136                                 throw new InvalidOperationException (
137                                         "The target did not have the message reserved.");
138
139                         isReserved = false;
140                         EnableCancel ();
141                         PerformSend ();
142                 }
143
144                 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
145                 {
146                         DisableCancel ();
147
148                         if (messageHeader == sendHeader && target == sendTarget) {
149                                 isReserved = true;
150                                 return true;
151                         }
152
153                         EnableCancel ();
154
155                         return false;
156                 }
157
158                 /// <summary>
159                 /// Temporarily disables cancelling.
160                 /// </summary>
161                 void DisableCancel ()
162                 {
163                         cancelDisabled = true;
164                 }
165
166                 /// <summary>
167                 /// Enables cancelling after it was disabled.
168                 /// If cancellation was attempted in the meantime,
169                 /// actually performs the cancelling.
170                 /// </summary>
171                 void EnableCancel ()
172                 {
173                         cancelDisabled = false;
174
175                         if (cancellationToken.IsCancellationRequested)
176                                 taskCompletionSource.SetCanceled ();
177                 }
178
179                 /// <summary>
180                 /// Sets the result of the operation.
181                 /// </summary>
182                 void SetResult (bool result)
183                 {
184                         cancellationTokenRegistration.Dispose ();
185                         taskCompletionSource.SetResult (result);
186                 }
187         }
188 }