3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 using System.Collections.Generic;
26 namespace System.Threading.Tasks.Dataflow {
28 /// Version of <see cref="OutgoingQueueBase{T}"/> for
29 /// non-broadcast blocks.
31 class OutgoingQueue<T> : OutgoingQueueBase<T> {
32 readonly Func<T, int> countSelector;
33 SpinLock firstItemLock = new SpinLock();
34 volatile ITargetBlock<T> reservedForTargetBlock;
35 readonly TargetCollection<T> targets;
37 protected override TargetCollectionBase<T> Targets {
38 get { return targets; }
41 public OutgoingQueue (
42 ISourceBlock<T> block, CompletionHelper compHelper,
43 Func<bool> externalCompleteTester, Action<int> decreaseItemsCount,
44 DataflowBlockOptions options, Func<T, int> countSelector = null)
45 : base (compHelper, externalCompleteTester,
46 decreaseItemsCount, options)
48 targets = new TargetCollection<T> (block);
49 this.countSelector = countSelector;
53 /// Calculates the count of items in the given object.
55 protected override int GetModifiedCount(T data)
57 if (countSelector == null)
60 return countSelector (data);
64 /// Sends messages to targets.
66 protected override void Process ()
70 ForceProcessing = false;
72 bool lockTaken = false;
74 firstItemLock.Enter (ref lockTaken);
77 if (!Store.TryPeek (out item))
80 if (!targets.HasCurrentItem)
81 targets.SetCurrentItem (item);
83 if (reservedForTargetBlock != null)
86 processed = targets.OfferItemToTargets ();
88 Outgoing.TryTake (out item);
89 DecreaseCounts (item);
94 firstItemLock.Exit ();
98 IsProcessing.Value = false;
100 // to guard against race condition
101 if (ForceProcessing && reservedForTargetBlock == null)
104 VerifyCompleteness ();
107 public T ConsumeMessage (DataflowMessageHeader messageHeader,
108 ITargetBlock<T> targetBlock, out bool messageConsumed)
110 if (!messageHeader.IsValid)
111 throw new ArgumentException ("The messageHeader is not valid.",
113 if (targetBlock == null)
114 throw new ArgumentNullException("target");
116 T result = default(T);
117 messageConsumed = false;
119 bool lockTaken = false;
121 firstItemLock.Enter (ref lockTaken);
123 if (targets.VerifyHeader (messageHeader, targetBlock)
124 && (reservedForTargetBlock == null
125 || reservedForTargetBlock == targetBlock)) {
126 // cannot consume from faulted block, unless reserved
127 if (reservedForTargetBlock == null && IsFaultedOrCancelled)
130 Outgoing.TryTake (out result);
131 messageConsumed = true;
132 DecreaseCounts (result);
133 reservedForTargetBlock = null;
138 firstItemLock.Exit ();
141 targets.UnpostponeTarget (targetBlock, messageConsumed);
143 VerifyCompleteness ();
148 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
150 if (!messageHeader.IsValid)
151 throw new ArgumentException ("The messageHeader is not valid.",
154 throw new ArgumentNullException("target");
156 bool lockTaken = false;
158 firstItemLock.Enter (ref lockTaken);
160 if (targets.VerifyHeader(messageHeader, target)) {
161 reservedForTargetBlock = target;
165 targets.UnpostponeTarget (target, false);
171 firstItemLock.Exit ();
175 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
177 if (!messageHeader.IsValid)
178 throw new ArgumentException ("The messageHeader is not valid.",
181 throw new ArgumentNullException("target");
183 bool lockTaken = false;
186 firstItemLock.Enter(ref lockTaken);
188 if (!targets.VerifyHeader(messageHeader, target)
189 || reservedForTargetBlock != target)
190 throw new InvalidOperationException(
191 "The target did not have the message reserved.");
193 reservedForTargetBlock = null;
196 firstItemLock.Exit ();
199 targets.UnpostponeTarget (target, false);
204 /// Notifies that the first item in the queue changed.
206 void FirstItemChanged ()
209 if (Store.TryPeek (out firstItem))
210 targets.SetCurrentItem (firstItem);
212 targets.ResetCurrentItem ();
215 public bool TryReceive (Predicate<T> filter, out T item)
217 bool success = false;
220 bool lockTaken = false;
222 firstItemLock.Enter (ref lockTaken);
224 if (reservedForTargetBlock != null)
228 if (Store.TryPeek (out result) && (filter == null || filter (result))) {
229 Outgoing.TryTake (out item);
231 DecreaseCounts (item);
236 firstItemLock.Exit ();
240 VerifyCompleteness ();
245 public bool TryReceiveAll (out IList<T> items)
252 bool lockTaken = false;
254 firstItemLock.Enter (ref lockTaken);
256 if (reservedForTargetBlock != null)
259 var list = new List<T> (Outgoing.Count);
262 while (Outgoing.TryTake (out item)) {
263 DecreaseCounts (item);
272 firstItemLock.Exit ();
276 VerifyCompleteness ();
278 return items.Count > 0;