// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-//
-//
using System.Collections.Generic;
namespace System.Threading.Tasks.Dataflow {
public sealed class BatchedJoinBlock<T1, T2> :
IReceivableSourceBlock<Tuple<IList<T1>, IList<T2>>> {
- GroupingDataflowBlockOptions options;
+ readonly GroupingDataflowBlockOptions options;
- CompletionHelper completionHelper;
- readonly MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
- DataflowMessageHeader headers;
+ readonly CompletionHelper completionHelper;
+ readonly OutgoingQueue<Tuple<IList<T1>, IList<T2>>> outgoing;
SpinLock batchLock;
readonly JoinTarget<T1> target1;
readonly JoinTarget<T2> target2;
int batchCount;
+ long numberOfGroups;
+ SpinLock batchCountLock;
public BatchedJoinBlock (int batchSize)
: this (batchSize, GroupingDataflowBlockOptions.Default)
"batchSize", batchSize, "The batchSize must be positive.");
if (dataflowBlockOptions == null)
throw new ArgumentNullException ("dataflowBlockOptions");
+ if (!dataflowBlockOptions.Greedy)
+ throw new ArgumentException (
+ "Greedy must be true for this dataflow block.", "dataflowBlockOptions");
+ if (dataflowBlockOptions.BoundedCapacity != DataflowBlockOptions.Unbounded)
+ throw new ArgumentException (
+ "BoundedCapacity must be Unbounded or -1 for this dataflow block.",
+ "dataflowBlockOptions");
BatchSize = batchSize;
options = dataflowBlockOptions;
completionHelper = CompletionHelper.GetNew (dataflowBlockOptions);
target1 = new JoinTarget<T1> (
- this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+ this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+ dataflowBlockOptions, true, TryAdd);
target2 = new JoinTarget<T2> (
- this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+ this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+ dataflowBlockOptions, true, TryAdd);
- outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
+ outgoing = new OutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
this, completionHelper,
- () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted, options);
+ () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
+ _ =>
+ {
+ target1.DecreaseCount ();
+ target2.DecreaseCount ();
+ }, options);
}
public int BatchSize { get; private set; }
get { return target2; }
}
- private void SignalTarget()
+ /// <summary>
+ /// Returns whether a new item can be accepted, and increments a counter if it can.
+ /// </summary>
+ bool TryAdd ()
{
- int current = Interlocked.Increment (ref batchCount);
+ bool lockTaken = false;
+ try {
+ batchCountLock.Enter (ref lockTaken);
- if (current % BatchSize != 0)
- return;
+ if (options.MaxNumberOfGroups != -1
+ && numberOfGroups + batchCount / BatchSize >= options.MaxNumberOfGroups)
+ return false;
- Interlocked.Add (ref batchCount, -current);
+ batchCount++;
+ return true;
+ } finally {
+ if (lockTaken)
+ batchCountLock.Exit();
+ }
+ }
+
+ /// <summary>
+ /// Decides whether to create a new batch or not.
+ /// </summary>
+ void SignalTarget ()
+ {
+ bool lockTaken = false;
+ try {
+ batchCountLock.Enter (ref lockTaken);
+
+ if (batchCount < BatchSize)
+ return;
+
+ batchCount -= BatchSize;
+ numberOfGroups++;
+ } finally {
+ if (lockTaken)
+ batchCountLock.Exit();
+ }
MakeBatch (BatchSize);
}
+ /// <summary>
+ /// Creates a batch of the given size and adds the resulting batch to the output queue.
+ /// </summary>
void MakeBatch (int batchSize)
{
+ if (batchSize == 0)
+ return;
+
var list1 = new List<T1> ();
var list2 = new List<T2> ();
var batch = Tuple.Create<IList<T1>, IList<T2>> (list1, list2);
outgoing.AddData (batch);
+
+ VerifyMaxNumberOfGroups ();
+ }
+
+ /// <summary>
+ /// Verifies whether <see cref="GroupingDataflowBlockOptions.MaxNumberOfGroups"/>
+ /// has been reached. If it did, <see cref="Complete"/>s the block.
+ /// </summary>
+ void VerifyMaxNumberOfGroups ()
+ {
+ if (options.MaxNumberOfGroups == -1)
+ return;
+
+ bool shouldComplete;
+
+ bool lockTaken = false;
+ try {
+ batchCountLock.Enter (ref lockTaken);
+
+ shouldComplete = numberOfGroups >= options.MaxNumberOfGroups;
+ } finally {
+ if (lockTaken)
+ batchCountLock.Exit ();
+ }
+
+ if (shouldComplete)
+ Complete ();
}
public Task Completion {
public void Complete ()
{
+ target1.Complete ();
+ target2.Complete ();
+ MakeBatch (batchCount);
outgoing.Complete ();
}
return outgoing.TryReceiveAll (out items);
}
+ public int OutputCount {
+ get { return outgoing.Count; }
+ }
+
public override string ToString ()
{
return NameHelper.GetName (this, options);