1 // BroadcastOutgoingQueue.cs
3 // Copyright (c) 2012 Petr Onderka
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 using System.Collections.Concurrent;
25 namespace System.Threading.Tasks.Dataflow {
27 /// Version of <see cref="OutgoingQueueBase{T}"/> for broadcast blocks.
29 class BroadcastOutgoingQueue<T> : OutgoingQueueBase<T> {
30 volatile bool hasCurrentItem;
31 // don't use directly, only through CurrentItem (and carefully)
33 SpinLock currentItemLock = new SpinLock();
35 readonly BroadcastTargetCollection<T> targets;
37 protected override TargetCollectionBase<T> Targets {
38 get { return targets; }
41 readonly ConcurrentDictionary<Tuple<DataflowMessageHeader, ITargetBlock<T>>, T>
43 new ConcurrentDictionary<Tuple<DataflowMessageHeader, ITargetBlock<T>>, T>();
45 public BroadcastOutgoingQueue (
46 ISourceBlock<T> block, CompletionHelper compHelper,
47 Func<bool> externalCompleteTester, Action<int> decreaseItemsCount,
48 DataflowBlockOptions options, bool hasCloner)
49 : base (compHelper, externalCompleteTester, decreaseItemsCount, options)
51 targets = new BroadcastTargetCollection<T> (block, hasCloner);
55 /// The current item that is to be sent to taget blocks.
60 bool lockTaken = false;
62 currentItemLock.Enter (ref lockTaken);
66 currentItemLock.Exit ();
71 hasCurrentItem = true;
73 bool lockTaken = false;
75 currentItemLock.Enter (ref lockTaken);
79 currentItemLock.Exit ();
85 /// Takes an item from the queue and sets it as <see cref="CurrentItem"/>.
87 public void DequeueItem ()
90 if (Outgoing.TryTake (out item)) {
91 DecreaseCounts (item);
92 targets.SetCurrentItem (item);
99 /// Manages sending items to the target blocks.
101 protected override void Process ()
104 ForceProcessing = false;
108 targets.OfferItemToTargets ();
109 } while (!Store.IsEmpty || targets.NeedsProcessing);
111 IsProcessing.Value = false;
113 // to guard against race condition
117 VerifyCompleteness ();
120 public T ConsumeMessage (DataflowMessageHeader messageHeader,
121 ITargetBlock<T> target, out bool messageConsumed)
123 if (!messageHeader.IsValid)
124 throw new ArgumentException ("The messageHeader is not valid.",
127 throw new ArgumentNullException("target");
130 if (reservedMessages.TryRemove (Tuple.Create (messageHeader, target), out item)) {
131 messageConsumed = true;
135 // if we first retrieve CurrentItem and then check the header,
136 // there will be no race condition
140 if (!targets.VerifyHeader (messageHeader)) {
141 targets.UnpostponeTargetNotConsumed (target);
143 messageConsumed = false;
147 targets.UnpostponeTargetConsumed (target, messageHeader);
150 messageConsumed = true;
154 public bool ReserveMessage (DataflowMessageHeader messageHeader,
155 ITargetBlock<T> target)
157 if (!messageHeader.IsValid)
158 throw new ArgumentException ("The messageHeader is not valid.",
161 throw new ArgumentNullException("target");
163 T item = CurrentItem;
165 if (!targets.VerifyHeader (messageHeader)) {
166 targets.UnpostponeTargetNotConsumed (target);
171 targets.ReserveTarget (target);
172 reservedMessages [Tuple.Create (messageHeader, target)] = item;
176 public void ReleaseReservation (DataflowMessageHeader messageHeader,
177 ITargetBlock<T> target)
179 if (!messageHeader.IsValid)
180 throw new ArgumentException ("The messageHeader is not valid.",
183 throw new ArgumentNullException("target");
186 if (!reservedMessages.TryRemove (Tuple.Create (messageHeader, target), out item))
187 throw new InvalidOperationException (
188 "The target did not have the message reserved.");
190 targets.UnpostponeTargetNotConsumed (target);
194 public bool TryReceive (Predicate<T> filter, out T retrievedItem)
196 retrievedItem = default(T);
198 if (!hasCurrentItem) {
202 T item = CurrentItem;
204 if (filter == null || filter(item)) {
205 retrievedItem = item;