// BatchedJoinBlock.cs // // Copyright (c) 2012 Petr Onderka // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. // // using System.Collections.Concurrent; using System.Collections.Generic; namespace System.Threading.Tasks.Dataflow { public sealed class BatchedJoinBlock : IReceivableSourceBlock, IList>> { GroupingDataflowBlockOptions options; CompletionHelper completionHelper = CompletionHelper.GetNew(); readonly MessageOutgoingQueue, IList>> outgoing; readonly MessageVault, IList>> vault = new MessageVault, IList>>(); readonly TargetBuffer, IList>> targets = new TargetBuffer, IList>>(); DataflowMessageHeader headers; SpinLock batchLock; readonly JoinTarget target1; readonly JoinTarget target2; int batchCount; public BatchedJoinBlock (int batchSize) : this (batchSize, GroupingDataflowBlockOptions.Default) { } public BatchedJoinBlock (int batchSize, GroupingDataflowBlockOptions dataflowBlockOptions) { if (batchSize <= 0) throw new ArgumentOutOfRangeException ( "batchSize", batchSize, "The batchSize must be positive."); if (dataflowBlockOptions == null) throw new ArgumentNullException ("dataflowBlockOptions"); BatchSize = batchSize; options = dataflowBlockOptions; target1 = new JoinTarget (this, SignalTarget, completionHelper, () => outgoing.IsCompleted); target2 = new JoinTarget (this, SignalTarget, completionHelper, () => outgoing.IsCompleted); outgoing = new MessageOutgoingQueue, IList>> ( completionHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted); } public int BatchSize { get; private set; } public ITargetBlock Target1 { get { return target1; } } public ITargetBlock Target2 { get { return target2; } } private void SignalTarget() { int current = Interlocked.Increment (ref batchCount); if (current % BatchSize != 0) return; Interlocked.Add (ref batchCount, -current); MakeBatch (BatchSize); } void MakeBatch (int batchSize) { var list1 = new List (); var list2 = new List (); // lock is necessary here to make sure items are in the correct order bool taken = false; try { batchLock.Enter (ref taken); int i = 0; T1 item1; while (i < batchSize && target1.Buffer.TryTake (out item1)) { list1.Add (item1); i++; } T2 item2; while (i < batchSize && target2.Buffer.TryTake (out item2)) { list2.Add (item2); i++; } if (i < batchSize) throw new InvalidOperationException("Unexpected count of items."); } finally { if (taken) batchLock.Exit (); } var batch = Tuple.Create, IList> (list1, list2); var target = targets.Current; if (target == null) outgoing.AddData(batch); else target.OfferMessage (headers.Increment (), batch, this, false); if (!outgoing.IsEmpty && targets.Current != null) outgoing.ProcessForTarget (targets.Current, this, false, ref headers); } public Task Completion { get { return completionHelper.Completion; } } public void Complete () { outgoing.Complete (); } void IDataflowBlock.Fault (Exception exception) { completionHelper.Fault (exception); } Tuple, IList> ISourceBlock, IList>>.ConsumeMessage ( DataflowMessageHeader messageHeader, ITargetBlock, IList>> target, out bool messageConsumed) { return vault.ConsumeMessage (messageHeader, target, out messageConsumed); } public IDisposable LinkTo (ITargetBlock, IList>> target, bool unlinkAfterOne) { var result = targets.AddTarget(target, unlinkAfterOne); outgoing.ProcessForTarget(target, this, false, ref headers); return result; } void ISourceBlock, IList>>.ReleaseReservation ( DataflowMessageHeader messageHeader, ITargetBlock, IList>> target) { vault.ReleaseReservation (messageHeader, target); } bool ISourceBlock, IList>>.ReserveMessage ( DataflowMessageHeader messageHeader, ITargetBlock, IList>> target) { return vault.ReserveMessage (messageHeader, target); } public bool TryReceive (Predicate, IList>> filter, out Tuple, IList> item) { return outgoing.TryReceive (filter, out item); } public bool TryReceiveAll (out IList, IList>> items) { return outgoing.TryReceiveAll (out items); } } }