// MessageBox.cs
//
// Copyright (c) 2011 Jérémie "garuma" Laval
// Copyright (c) 2012 Petr Onderka
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
using System.Collections.Concurrent;
using System.Linq;
namespace System.Threading.Tasks.Dataflow {
///
/// In MessageBox we store message that have been offered to us so that they can be
/// later processed
///
internal abstract class MessageBox {
protected ITargetBlock Target { get; set; }
protected CompletionHelper CompHelper { get; private set; }
readonly Func externalCompleteTester;
readonly DataflowBlockOptions options;
readonly bool greedy;
readonly Func canAccept;
readonly ConcurrentDictionary, DataflowMessageHeader>
postponedMessages =
new ConcurrentDictionary, DataflowMessageHeader> ();
int itemCount;
readonly AtomicBoolean postponedProcessing = new AtomicBoolean ();
// these two fields are used only in one special case
SpinLock consumingLock;
// this is necessary, because canAccept is not pure
bool canAcceptFromBefore;
protected BlockingCollection MessageQueue { get; private set; }
protected MessageBox (
ITargetBlock target, BlockingCollection messageQueue,
CompletionHelper compHelper, Func externalCompleteTester,
DataflowBlockOptions options, bool greedy = true, Func canAccept = null)
{
this.Target = target;
this.CompHelper = compHelper;
this.MessageQueue = messageQueue;
this.externalCompleteTester = externalCompleteTester;
this.options = options;
this.greedy = greedy;
this.canAccept = canAccept;
}
public DataflowMessageStatus OfferMessage (
DataflowMessageHeader messageHeader, TInput messageValue,
ISourceBlock source, bool consumeToAccept)
{
if (!messageHeader.IsValid)
throw new ArgumentException ("The messageHeader is not valid.",
"messageHeader");
if (consumeToAccept && source == null)
throw new ArgumentException (
"consumeToAccept may only be true if provided with a non-null source.",
"consumeToAccept");
if (MessageQueue.IsAddingCompleted || !CompHelper.CanRun)
return DataflowMessageStatus.DecliningPermanently;
var full = options.BoundedCapacity != -1
&& Volatile.Read (ref itemCount) >= options.BoundedCapacity;
if (!greedy || full) {
if (source == null)
return DataflowMessageStatus.Declined;
postponedMessages [source] = messageHeader;
// necessary to avoid race condition
DecreaseCount (0);
if (!greedy && !full)
EnsureProcessing (true);
return DataflowMessageStatus.Postponed;
}
// in this case, we need to use locking to make sure
// we don't consume when we can't accept
if (consumeToAccept && canAccept != null) {
bool lockTaken = false;
try {
consumingLock.Enter (ref lockTaken);
if (!canAcceptFromBefore && !canAccept ())
return DataflowMessageStatus.DecliningPermanently;
bool consummed;
messageValue = source.ConsumeMessage (messageHeader, Target, out consummed);
if (!consummed) {
canAcceptFromBefore = true;
return DataflowMessageStatus.NotAvailable;
}
canAcceptFromBefore = false;
} finally {
if (lockTaken)
consumingLock.Exit ();
}
} else {
if (consumeToAccept) {
bool consummed;
messageValue = source.ConsumeMessage (messageHeader, Target, out consummed);
if (!consummed)
return DataflowMessageStatus.NotAvailable;
}
if (canAccept != null && !canAccept ())
return DataflowMessageStatus.DecliningPermanently;
}
try {
MessageQueue.Add (messageValue);
} catch (InvalidOperationException) {
// This is triggered either if the underlying collection didn't accept the item
// or if the messageQueue has been marked complete, either way it corresponds to a false
return DataflowMessageStatus.DecliningPermanently;
}
IncreaseCount ();
EnsureProcessing (true);
VerifyCompleteness ();
return DataflowMessageStatus.Accepted;
}
///
/// Increses the count of items in the block by 1.
///
public void IncreaseCount ()
{
Interlocked.Increment (ref itemCount);
}
///
/// Decreses the number of items in the block by the given count.
///
///
/// The parameter is used when one object
/// can represent many items, like a batch in .
///
public void DecreaseCount (int count = 1)
{
int decreased = Interlocked.Add (ref itemCount, -count);
// if BoundedCapacity is -1, there is no need to do this
if (decreased < options.BoundedCapacity && !postponedMessages.IsEmpty) {
if (greedy)
EnsurePostponedProcessing ();
else
EnsureProcessing (false);
}
}
///
/// The number of messages that were postponed
/// and can be attempted to be consumed.
///
public int PostponedMessagesCount {
get { return postponedMessages.Count; }
}
///
/// Reserves a message from those that were postponed.
/// Does not guarantee any order of the messages being reserved.
///
///
/// An object representing the reservation on success,
/// null on failure.
///
public Tuple, DataflowMessageHeader> ReserveMessage()
{
while (!postponedMessages.IsEmpty) {
// KeyValuePair is a struct, so default value is not null
var block = postponedMessages.FirstOrDefault () .Key;
// collection is empty
if (block == null)
break;
DataflowMessageHeader header;
bool removed = postponedMessages.TryRemove (block, out header);
// another thread was faster, try again
if (!removed)
continue;
bool reserved = block.ReserveMessage (header, Target);
if (reserved)
return Tuple.Create (block, header);
}
return null;
}
///
/// Releases the given reservation.
///
public void RelaseReservation(Tuple, DataflowMessageHeader> reservation)
{
reservation.Item1.ReleaseReservation (reservation.Item2, Target);
}
///
/// Consumes previously reserved item.
///
public TInput ConsumeReserved(Tuple, DataflowMessageHeader> reservation)
{
bool consumed;
return reservation.Item1.ConsumeMessage (
reservation.Item2, Target, out consumed);
}
///
/// Makes sure retrieving items that were postponed,
/// because they would exceed ,
/// is currently running.
///
void EnsurePostponedProcessing ()
{
if (postponedProcessing.TrySet())
Task.Factory.StartNew (RetrievePostponed, options.CancellationToken,
TaskCreationOptions.PreferFairness, options.TaskScheduler);
}
///
/// Retrieves items that were postponed,
/// because they would exceed .
///
void RetrievePostponed ()
{
// BoundedCapacity can't be -1 here, because in that case there would be no postponing
while (Volatile.Read (ref itemCount) < options.BoundedCapacity
&& !postponedMessages.IsEmpty && !MessageQueue.IsAddingCompleted) {
var block = postponedMessages.First ().Key;
DataflowMessageHeader header;
postponedMessages.TryRemove (block, out header);
bool consumed;
var item = block.ConsumeMessage (header, Target, out consumed);
if (consumed) {
try {
MessageQueue.Add (item);
IncreaseCount ();
EnsureProcessing (false);
} catch (InvalidOperationException) {
break;
}
}
}
// release all postponed messages
if (MessageQueue.IsAddingCompleted) {
while (!postponedMessages.IsEmpty) {
var block = postponedMessages.First ().Key;
DataflowMessageHeader header;
postponedMessages.TryRemove (block, out header);
if (block.ReserveMessage (header, Target))
block.ReleaseReservation (header, Target);
}
}
postponedProcessing.Value = false;
// because of race
if ((Volatile.Read (ref itemCount) < options.BoundedCapacity
|| MessageQueue.IsAddingCompleted)
&& !postponedMessages.IsEmpty)
EnsurePostponedProcessing ();
}
///
/// Makes sure the input queue is processed the way it needs to.
///
/// Was new item just added?
protected abstract void EnsureProcessing (bool newItem);
///
/// Completes the box, no new messages will be accepted.
/// Also starts the process of completing the output queue.
///
public void Complete ()
{
// Make message queue complete
MessageQueue.CompleteAdding ();
OutgoingQueueComplete ();
VerifyCompleteness ();
if (!postponedMessages.IsEmpty)
EnsurePostponedProcessing ();
}
///
/// Notifies that outgoing queue should be completed, if possible.
///
protected virtual void OutgoingQueueComplete ()
{
}
///
/// Makes sure the block is completed if it should be.
///
protected virtual void VerifyCompleteness ()
{
if (MessageQueue.IsCompleted && externalCompleteTester ())
CompHelper.Complete ();
}
}
}