// SendBlock.cs
//
// Copyright (c) 2012 Petr Onderka
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
namespace System.Threading.Tasks.Dataflow {
///
/// This block is used in
/// to asynchronously wait until a single item is sent to a given target.
///
class SendBlock : ISourceBlock {
readonly ITargetBlock sendTarget;
readonly T item;
CancellationToken cancellationToken;
readonly TaskCompletionSource taskCompletionSource =
new TaskCompletionSource ();
readonly DataflowMessageHeader sendHeader = new DataflowMessageHeader (1);
CancellationTokenRegistration cancellationTokenRegistration;
bool isReserved;
volatile bool cancelDisabled;
public SendBlock (ITargetBlock sendTarget, T item,
CancellationToken cancellationToken)
{
this.sendTarget = sendTarget;
this.item = item;
this.cancellationToken = cancellationToken;
}
///
/// Sends the item given in the constructor to the target block.
///
/// Task that completes when the sending is done, or can't be performed.
public Task Send ()
{
cancellationTokenRegistration = cancellationToken.Register (
() =>
{
if (!cancelDisabled)
taskCompletionSource.SetCanceled ();
});
PerformSend ();
return taskCompletionSource.Task;
}
///
/// Offers the item to the target and hadles its response.
///
void PerformSend ()
{
DisableCancel ();
if (taskCompletionSource.Task.IsCanceled)
return;
var status = sendTarget.OfferMessage (sendHeader, item, this, false);
if (status == DataflowMessageStatus.Accepted)
SetResult (true);
else if (status != DataflowMessageStatus.Postponed)
SetResult (false);
else
EnableCancel ();
}
public Task Completion {
get { throw new NotSupportedException (); }
}
public void Complete ()
{
throw new NotSupportedException ();
}
public void Fault (Exception exception)
{
throw new NotSupportedException ();
}
public T ConsumeMessage (DataflowMessageHeader messageHeader,
ITargetBlock target, out bool messageConsumed)
{
if (!messageHeader.IsValid)
throw new ArgumentException ("The messageHeader is not valid.",
"messageHeader");
if (target == null)
throw new ArgumentNullException("target");
DisableCancel ();
messageConsumed = false;
if (taskCompletionSource.Task.IsCanceled)
return default(T);
if (messageHeader != sendHeader || target != sendTarget) {
EnableCancel ();
return default(T);
}
SetResult (true);
messageConsumed = true;
return item;
}
public IDisposable LinkTo (ITargetBlock target, DataflowLinkOptions linkOptions)
{
throw new NotSupportedException ();
}
public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock target)
{
if (messageHeader != sendHeader || target != sendTarget || !isReserved)
throw new InvalidOperationException (
"The target did not have the message reserved.");
isReserved = false;
EnableCancel ();
PerformSend ();
}
public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock target)
{
DisableCancel ();
if (messageHeader == sendHeader && target == sendTarget) {
isReserved = true;
return true;
}
EnableCancel ();
return false;
}
///
/// Temporarily disables cancelling.
///
void DisableCancel ()
{
cancelDisabled = true;
}
///
/// Enables cancelling after it was disabled.
/// If cancellation was attempted in the meantime,
/// actually performs the cancelling.
///
void EnableCancel ()
{
cancelDisabled = false;
if (cancellationToken.IsCancellationRequested)
taskCompletionSource.SetCanceled ();
}
///
/// Sets the result of the operation.
///
void SetResult (bool result)
{
cancellationTokenRegistration.Dispose ();
taskCompletionSource.SetResult (result);
}
}
}