// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-//
-//
-
-using System;
-using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;
namespace System.Threading.Tasks.Dataflow
{
public sealed class TransformBlock<TInput, TOutput> :
- IPropagatorBlock<TInput, TOutput>, ITargetBlock<TInput>, IDataflowBlock, ISourceBlock<TOutput>, IReceivableSourceBlock<TOutput>
+ IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>
{
static readonly ExecutionDataflowBlockOptions defaultOptions = new ExecutionDataflowBlockOptions ();
- ExecutionDataflowBlockOptions dataflowBlockOptions;
- CompletionHelper compHelper = CompletionHelper.GetNew ();
- BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
- MessageBox<TInput> messageBox;
- MessageVault<TOutput> vault;
- MessageOutgoingQueue<TOutput> outgoing;
- TargetBuffer<TOutput> targets = new TargetBuffer<TOutput> ();
- DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
+ readonly ExecutionDataflowBlockOptions dataflowBlockOptions;
+ readonly CompletionHelper compHelper;
+ readonly BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
+ readonly MessageBox<TInput> messageBox;
+ readonly MessageOutgoingQueue<TOutput> outgoing;
readonly Func<TInput, TOutput> transformer;
public TransformBlock (Func<TInput, TOutput> transformer) : this (transformer, defaultOptions)
public TransformBlock (Func<TInput, TOutput> transformer, ExecutionDataflowBlockOptions dataflowBlockOptions)
{
+ if (transformer == null)
+ throw new ArgumentNullException("transformer");
if (dataflowBlockOptions == null)
throw new ArgumentNullException ("dataflowBlockOptions");
this.transformer = transformer;
this.dataflowBlockOptions = dataflowBlockOptions;
- this.messageBox = new ExecutingMessageBox<TInput> (messageQueue,
- compHelper,
- () => outgoing.IsCompleted,
- TransformProcess,
- dataflowBlockOptions);
- this.outgoing = new MessageOutgoingQueue<TOutput> (compHelper, () => messageQueue.IsCompleted);
- this.vault = new MessageVault<TOutput> ();
+ this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
+ this.messageBox = new ExecutingMessageBox<TInput> (
+ this, messageQueue, compHelper,
+ () => outgoing.IsCompleted, TransformProcess, () => outgoing.Complete (),
+ dataflowBlockOptions);
+ this.outgoing = new MessageOutgoingQueue<TOutput> (this, compHelper,
+ () => messageQueue.IsCompleted, messageBox.DecreaseCount,
+ dataflowBlockOptions);
}
public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
ISourceBlock<TInput> source,
bool consumeToAccept)
{
- return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+ return messageBox.OfferMessage (messageHeader, messageValue, source, consumeToAccept);
}
- public IDisposable LinkTo (ITargetBlock<TOutput> target, bool unlinkAfterOne)
+ public IDisposable LinkTo (ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
{
- var result = targets.AddTarget (target, unlinkAfterOne);
- outgoing.ProcessForTarget (target, this, false, ref headers);
- return result;
+ return outgoing.AddTarget (target, linkOptions);
}
public TOutput ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed)
{
- return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+ return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
}
public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
{
- vault.ReleaseReservation (messageHeader, target);
+ outgoing.ReleaseReservation (messageHeader, target);
}
public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
{
- return vault.ReserveMessage (messageHeader, target);
+ return outgoing.ReserveMessage (messageHeader, target);
}
public bool TryReceive (Predicate<TOutput> filter, out TOutput item)
return outgoing.TryReceiveAll (out items);
}
- void TransformProcess ()
+ bool TransformProcess ()
{
- ITargetBlock<TOutput> target;
TInput input;
- while (messageQueue.TryTake (out input)) {
- TOutput output = transformer (input);
-
- if ((target = targets.Current) != null)
- target.OfferMessage (headers.Increment (), output, this, false);
- else
- outgoing.AddData (output);
- }
+ var dequeued = messageQueue.TryTake (out input);
+ if (dequeued)
+ outgoing.AddData (transformer (input));
- if (!outgoing.IsEmpty && (target = targets.Current) != null)
- outgoing.ProcessForTarget (target, this, false, ref headers);
+ return dequeued;
}
public void Complete ()
messageBox.Complete ();
}
- public void Fault (Exception ex)
+ public void Fault (Exception exception)
{
- compHelper.Fault (ex);
+ compHelper.RequestFault (exception);
}
public Task Completion {
return messageQueue.Count;
}
}
- }
-}
+ public override string ToString ()
+ {
+ return NameHelper.GetName (this, dataflowBlockOptions);
+ }
+ }
+}
\ No newline at end of file