// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-//
-//
-using System.Collections.Concurrent;
using System.Collections.Generic;
namespace System.Threading.Tasks.Dataflow {
public sealed class BatchedJoinBlock<T1, T2> :
IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>> {
- GroupingDataflowBlockOptions options;
+ readonly GroupingDataflowBlockOptions options;
- CompletionHelper completionHelper = CompletionHelper.GetNew();
- readonly MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
- readonly MessageVault<Tuple<IList<T1>, IList<T2>>> vault = new MessageVault<Tuple<IList<T1>, IList<T2>>>();
- readonly TargetBuffer<Tuple<IList<T1>, IList<T2>>> targets = new TargetBuffer<Tuple<IList<T1>, IList<T2>>>();
- DataflowMessageHeader headers;
+ readonly CompletionHelper completionHelper;
+ readonly OutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
SpinLock batchLock;
readonly JoinTarget<T1> target1;
readonly JoinTarget<T2> target2;
int batchCount;
+ long numberOfGroups;
+ SpinLock batchCountLock;
public BatchedJoinBlock (int batchSize)
: this (batchSize, GroupingDataflowBlockOptions.Default)
"batchSize", batchSize, "The batchSize must be positive.");
if (dataflowBlockOptions == null)
throw new ArgumentNullException ("dataflowBlockOptions");
+ if (!dataflowBlockOptions.Greedy)
+ throw new ArgumentException (
+ "Greedy must be true for this dataflow block.", "dataflowBlockOptions");
+ if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
+ throw new ArgumentException (
+ "BoundedCapacity must be Unbounded or -1 for this dataflow block.",
+ "dataflowBlockOptions");
BatchSize = batchSize;
options = dataflowBlockOptions;
-
- target1 = new JoinTarget<T1> (this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
- target2 = new JoinTarget<T2> (this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
-
- outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
- completionHelper,
- () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
+ completionHelper = CompletionHelper.GetNew (dataflowBlockOptions);
+
+ target1 = new JoinTarget<T1> (
+ this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+ dataflowBlockOptions, true, TryAdd);
+ target2 = new JoinTarget<T2> (
+ this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+ dataflowBlockOptions, true, TryAdd);
+
+ outgoing = new OutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
+ this, completionHelper,
+ () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
+ _ =>
+ {
+ target1.DecreaseCount ();
+ target2.DecreaseCount ();
+ }, options);
}
public int BatchSize { get; private set; }
get { return target2; }
}
- private void SignalTarget()
+ /// <summary>
+ /// Returns whether a new item can be accepted, and increments a counter if it can.
+ /// </summary>
+ bool TryAdd ()
{
- int current = Interlocked.Increment (ref batchCount);
+ bool lockTaken = false;
+ try {
+ batchCountLock.Enter (ref lockTaken);
- if (current % BatchSize != 0)
- return;
+ if (options.MaxNumberOfGroups != -1
+ && numberOfGroups + batchCount / BatchSize >= options.MaxNumberOfGroups)
+ return false;
- Interlocked.Add (ref batchCount, -current);
+ batchCount++;
+ return true;
+ } finally {
+ if (lockTaken)
+ batchCountLock.Exit();
+ }
+ }
+
+ /// <summary>
+ /// Decides whether to create a new batch or not.
+ /// </summary>
+ void SignalTarget ()
+ {
+ bool lockTaken = false;
+ try {
+ batchCountLock.Enter (ref lockTaken);
+
+ if (batchCount < BatchSize)
+ return;
+
+ batchCount -= BatchSize;
+ numberOfGroups++;
+ } finally {
+ if (lockTaken)
+ batchCountLock.Exit();
+ }
MakeBatch (BatchSize);
}
+ /// <summary>
+ /// Creates a batch of the given size and adds the resulting batch to the output queue.
+ /// </summary>
void MakeBatch (int batchSize)
{
+ if (batchSize == 0)
+ return;
+
var list1 = new List<T1> ();
var list2 = new List<T2> ();
var batch = Tuple.Create<IList<T1>, IList<T2>> (list1, list2);
- var target = targets.Current;
- if (target == null)
- outgoing.AddData(batch);
- else
- target.OfferMessage (headers.Increment (), batch, this, false);
+ outgoing.AddData (batch);
- if (!outgoing.IsEmpty && targets.Current != null)
- outgoing.ProcessForTarget (targets.Current, this, false, ref headers);
+ VerifyMaxNumberOfGroups ();
+ }
+
+ /// <summary>
+ /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+ /// has been reached. If it did, <see cref="Complete"/>s the block.
+ /// </summary>
+ void VerifyMaxNumberOfGroups ()
+ {
+ if (options.MaxNumberOfGroups == -1)
+ return;
+
+ bool shouldComplete;
+
+ bool lockTaken = false;
+ try {
+ batchCountLock.Enter (ref lockTaken);
+
+ shouldComplete = numberOfGroups >= options.MaxNumberOfGroups;
+ } finally {
+ if (lockTaken)
+ batchCountLock.Exit ();
+ }
+
+ if (shouldComplete)
+ Complete ();
}
public Task Completion {
public void Complete ()
{
+ target1.Complete ();
+ target2.Complete ();
+ MakeBatch (batchCount);
outgoing.Complete ();
}
void IDataflowBlock.Fault (Exception exception)
{
- completionHelper.Fault (exception);
+ completionHelper.RequestFault (exception);
}
Tuple<IList<T1>, IList<T2>> ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage (
ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
out bool messageConsumed)
{
- return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+ return outgoing.ConsumeMessage (messageHeader, target, out messageConsumed);
}
public IDisposable LinkTo (ITargetBlock<Tuple<IList<T1>, IList<T2>>> target,
- bool unlinkAfterOne)
+ DataflowLinkOptions linkOptions)
{
- var result = targets.AddTarget(target, unlinkAfterOne);
- outgoing.ProcessForTarget(target, this, false, ref headers);
- return result;
+ return outgoing.AddTarget(target, linkOptions);
}
void ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReleaseReservation (
DataflowMessageHeader messageHeader,
ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
{
- vault.ReleaseReservation (messageHeader, target);
+ outgoing.ReleaseReservation (messageHeader, target);
}
bool ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ReserveMessage (
DataflowMessageHeader messageHeader,
ITargetBlock<Tuple<IList<T1>, IList<T2>>> target)
{
- return vault.ReserveMessage (messageHeader, target);
+ return outgoing.ReserveMessage (messageHeader, target);
}
public bool TryReceive (Predicate<Tuple<IList<T1>, IList<T2>>> filter,
{
return outgoing.TryReceiveAll (out items);
}
+
+ public int OutputCount {
+ get { return outgoing.Count; }
+ }
+
+ public override string ToString ()
+ {
+ return NameHelper.GetName (this, options);
+ }
}
}
\ No newline at end of file