3 // Copyright (c) 2012 Petr Onderka
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 namespace System.Threading.Tasks.Dataflow {
25 /// This block is used in <see cref="DataflowBlock.SendAsync"/>
26 /// to asynchronously wait until a single item is sent to a given target.
28 class SendBlock<T> : ISourceBlock<T> {
29 readonly ITargetBlock<T> sendTarget;
31 CancellationToken cancellationToken;
32 readonly TaskCompletionSource<bool> taskCompletionSource =
33 new TaskCompletionSource<bool> ();
34 readonly DataflowMessageHeader sendHeader = new DataflowMessageHeader (1);
35 CancellationTokenRegistration cancellationTokenRegistration;
39 volatile bool cancelDisabled;
41 public SendBlock (ITargetBlock<T> sendTarget, T item,
42 CancellationToken cancellationToken)
44 this.sendTarget = sendTarget;
46 this.cancellationToken = cancellationToken;
50 /// Sends the item given in the constructor to the target block.
52 /// <returns>Task that completes when the sending is done, or can't be performed.</returns>
53 public Task<bool> Send ()
55 cancellationTokenRegistration = cancellationToken.Register (
59 taskCompletionSource.SetCanceled ();
64 return taskCompletionSource.Task;
68 /// Offers the item to the target and hadles its response.
74 if (taskCompletionSource.Task.IsCanceled)
77 var status = sendTarget.OfferMessage (sendHeader, item, this, false);
79 if (status == DataflowMessageStatus.Accepted)
81 else if (status != DataflowMessageStatus.Postponed)
87 public Task Completion {
88 get { throw new NotSupportedException (); }
91 public void Complete ()
93 throw new NotSupportedException ();
96 public void Fault (Exception exception)
98 throw new NotSupportedException ();
101 public T ConsumeMessage (DataflowMessageHeader messageHeader,
102 ITargetBlock<T> target, out bool messageConsumed)
104 if (!messageHeader.IsValid)
105 throw new ArgumentException ("The messageHeader is not valid.",
108 throw new ArgumentNullException("target");
112 messageConsumed = false;
114 if (taskCompletionSource.Task.IsCanceled)
117 if (messageHeader != sendHeader || target != sendTarget) {
124 messageConsumed = true;
128 public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)
130 throw new NotSupportedException ();
133 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
135 if (messageHeader != sendHeader || target != sendTarget || !isReserved)
136 throw new InvalidOperationException (
137 "The target did not have the message reserved.");
144 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
148 if (messageHeader == sendHeader && target == sendTarget) {
159 /// Temporarily disables cancelling.
161 void DisableCancel ()
163 cancelDisabled = true;
167 /// Enables cancelling after it was disabled.
168 /// If cancellation was attempted in the meantime,
169 /// actually performs the cancelling.
173 cancelDisabled = false;
175 if (cancellationToken.IsCancellationRequested)
176 taskCompletionSource.SetCanceled ();
180 /// Sets the result of the operation.
182 void SetResult (bool result)
184 cancellationTokenRegistration.Dispose ();
185 taskCompletionSource.SetResult (result);