1 // TransformManyBlock.cs
3 // Copyright (c) 2011 Jérémie "garuma" Laval
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 using System.Threading.Tasks;
28 using System.Collections.Generic;
29 using System.Collections.Concurrent;
31 namespace System.Threading.Tasks.Dataflow
33 public sealed class TransformManyBlock<TInput, TOutput> :
34 IPropagatorBlock<TInput, TOutput>, ITargetBlock<TInput>, IDataflowBlock, ISourceBlock<TOutput>, IReceivableSourceBlock<TOutput>
36 static readonly ExecutionDataflowBlockOptions defaultOptions = new ExecutionDataflowBlockOptions ();
38 CompletionHelper compHelper = CompletionHelper.GetNew ();
39 BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
40 MessageBox<TInput> messageBox;
41 MessageVault<TOutput> vault;
42 ExecutionDataflowBlockOptions dataflowBlockOptions;
43 readonly Func<TInput, IEnumerable<TOutput>> transformer;
44 MessageOutgoingQueue<TOutput> outgoing;
45 TargetBuffer<TOutput> targets = new TargetBuffer<TOutput> ();
46 DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
48 public TransformManyBlock (Func<TInput, IEnumerable<TOutput>> transformer) : this (transformer, defaultOptions)
53 public TransformManyBlock (Func<TInput, IEnumerable<TOutput>> transformer, ExecutionDataflowBlockOptions dataflowBlockOptions)
55 if (dataflowBlockOptions == null)
56 throw new ArgumentNullException ("dataflowBlockOptions");
58 this.transformer = transformer;
59 this.dataflowBlockOptions = dataflowBlockOptions;
60 this.messageBox = new ExecutingMessageBox<TInput> (messageQueue,
62 () => outgoing.IsCompleted,
64 dataflowBlockOptions);
65 this.outgoing = new MessageOutgoingQueue<TOutput> (compHelper, () => messageQueue.IsCompleted);
66 this.vault = new MessageVault<TOutput> ();
69 public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
71 ISourceBlock<TInput> source,
74 return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
77 public IDisposable LinkTo (ITargetBlock<TOutput> target, bool unlinkAfterOne)
79 var result = targets.AddTarget (target, unlinkAfterOne);
80 outgoing.ProcessForTarget (target, this, false, ref headers);
84 public TOutput ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed)
86 return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
89 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
91 vault.ReleaseReservation (messageHeader, target);
94 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
96 return vault.ReserveMessage (messageHeader, target);
99 public bool TryReceive (Predicate<TOutput> filter, out TOutput item)
101 return outgoing.TryReceive (filter, out item);
104 public bool TryReceiveAll (out IList<TOutput> items)
106 return outgoing.TryReceiveAll (out items);
109 void TransformProcess ()
111 ITargetBlock<TOutput> target;
114 while (messageQueue.TryTake (out input)) {
115 foreach (var item in transformer (input)) {
116 if ((target = targets.Current) != null)
117 target.OfferMessage (headers.Increment (), item, this, false);
119 outgoing.AddData (item);
123 if (!outgoing.IsEmpty && (target = targets.Current) != null)
124 outgoing.ProcessForTarget (target, this, false, ref headers);
127 public void Complete ()
129 messageBox.Complete ();
132 public void Fault (Exception ex)
134 compHelper.Fault (ex);
137 public Task Completion {
139 return compHelper.Completion;
143 public int OutputCount {
145 return outgoing.Count;
149 public int InputCount {
151 return messageQueue.Count;