3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 using System.Collections.Concurrent;
27 namespace System.Threading.Tasks.Dataflow {
29 /// In MessageBox we store message that have been offered to us so that they can be
32 internal abstract class MessageBox<TInput> {
33 protected ITargetBlock<TInput> Target { get; set; }
34 protected CompletionHelper CompHelper { get; private set; }
35 readonly Func<bool> externalCompleteTester;
36 readonly DataflowBlockOptions options;
38 readonly Func<bool> canAccept;
40 readonly ConcurrentDictionary<ISourceBlock<TInput>, DataflowMessageHeader>
42 new ConcurrentDictionary<ISourceBlock<TInput>, DataflowMessageHeader> ();
44 readonly AtomicBoolean postponedProcessing = new AtomicBoolean ();
46 // these two fields are used only in one special case
47 SpinLock consumingLock;
48 // this is necessary, because canAccept is not pure
49 bool canAcceptFromBefore;
51 protected BlockingCollection<TInput> MessageQueue { get; private set; }
53 protected MessageBox (
54 ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
55 CompletionHelper compHelper, Func<bool> externalCompleteTester,
56 DataflowBlockOptions options, bool greedy = true, Func<bool> canAccept = null)
59 this.CompHelper = compHelper;
60 this.MessageQueue = messageQueue;
61 this.externalCompleteTester = externalCompleteTester;
62 this.options = options;
64 this.canAccept = canAccept;
67 public DataflowMessageStatus OfferMessage (
68 DataflowMessageHeader messageHeader, TInput messageValue,
69 ISourceBlock<TInput> source, bool consumeToAccept)
71 if (!messageHeader.IsValid)
72 throw new ArgumentException ("The messageHeader is not valid.",
74 if (consumeToAccept && source == null)
75 throw new ArgumentException (
76 "consumeToAccept may only be true if provided with a non-null source.",
79 if (MessageQueue.IsAddingCompleted || !CompHelper.CanRun)
80 return DataflowMessageStatus.DecliningPermanently;
82 var full = options.BoundedCapacity != -1
83 && Volatile.Read (ref itemCount) >= options.BoundedCapacity;
84 if (!greedy || full) {
86 return DataflowMessageStatus.Declined;
88 postponedMessages [source] = messageHeader;
90 // necessary to avoid race condition
94 EnsureProcessing (true);
96 return DataflowMessageStatus.Postponed;
99 // in this case, we need to use locking to make sure
100 // we don't consume when we can't accept
101 if (consumeToAccept && canAccept != null) {
102 bool lockTaken = false;
104 consumingLock.Enter (ref lockTaken);
105 if (!canAcceptFromBefore && !canAccept ())
106 return DataflowMessageStatus.DecliningPermanently;
109 messageValue = source.ConsumeMessage (messageHeader, Target, out consummed);
111 canAcceptFromBefore = true;
112 return DataflowMessageStatus.NotAvailable;
115 canAcceptFromBefore = false;
118 consumingLock.Exit ();
121 if (consumeToAccept) {
123 messageValue = source.ConsumeMessage (messageHeader, Target, out consummed);
125 return DataflowMessageStatus.NotAvailable;
128 if (canAccept != null && !canAccept ())
129 return DataflowMessageStatus.DecliningPermanently;
133 MessageQueue.Add (messageValue);
134 } catch (InvalidOperationException) {
135 // This is triggered either if the underlying collection didn't accept the item
136 // or if the messageQueue has been marked complete, either way it corresponds to a false
137 return DataflowMessageStatus.DecliningPermanently;
142 EnsureProcessing (true);
144 VerifyCompleteness ();
146 return DataflowMessageStatus.Accepted;
150 /// Increses the count of items in the block by 1.
152 public void IncreaseCount ()
154 Interlocked.Increment (ref itemCount);
158 /// Decreses the number of items in the block by the given count.
161 /// The <paramref name="count"/> parameter is used when one object
162 /// can represent many items, like a batch in <see cref="BatchBlock{T}"/>.
164 public void DecreaseCount (int count = 1)
166 int decreased = Interlocked.Add (ref itemCount, -count);
168 // if BoundedCapacity is -1, there is no need to do this
169 if (decreased < options.BoundedCapacity && !postponedMessages.IsEmpty) {
171 EnsurePostponedProcessing ();
173 EnsureProcessing (false);
178 /// The number of messages that were postponed
179 /// and can be attempted to be consumed.
181 public int PostponedMessagesCount {
182 get { return postponedMessages.Count; }
186 /// Reserves a message from those that were postponed.
187 /// Does not guarantee any order of the messages being reserved.
190 /// An object representing the reservation on success,
191 /// <c>null</c> on failure.
193 public Tuple<ISourceBlock<TInput>, DataflowMessageHeader> ReserveMessage()
195 while (!postponedMessages.IsEmpty) {
196 // KeyValuePair is a struct, so default value is not null
197 var block = postponedMessages.FirstOrDefault () .Key;
199 // collection is empty
203 DataflowMessageHeader header;
204 bool removed = postponedMessages.TryRemove (block, out header);
206 // another thread was faster, try again
210 bool reserved = block.ReserveMessage (header, Target);
212 return Tuple.Create (block, header);
219 /// Releases the given reservation.
221 public void RelaseReservation(Tuple<ISourceBlock<TInput>, DataflowMessageHeader> reservation)
223 reservation.Item1.ReleaseReservation (reservation.Item2, Target);
227 /// Consumes previously reserved item.
229 public TInput ConsumeReserved(Tuple<ISourceBlock<TInput>, DataflowMessageHeader> reservation)
232 return reservation.Item1.ConsumeMessage (
233 reservation.Item2, Target, out consumed);
237 /// Makes sure retrieving items that were postponed,
238 /// because they would exceed <see cref="DataflowBlockOptions.BoundedCapacity"/>,
239 /// is currently running.
241 void EnsurePostponedProcessing ()
243 if (postponedProcessing.TrySet())
244 Task.Factory.StartNew (RetrievePostponed, options.CancellationToken,
245 TaskCreationOptions.PreferFairness, options.TaskScheduler);
249 /// Retrieves items that were postponed,
250 /// because they would exceed <see cref="DataflowBlockOptions.BoundedCapacity"/>.
252 void RetrievePostponed ()
254 // BoundedCapacity can't be -1 here, because in that case there would be no postponing
255 while (Volatile.Read (ref itemCount) < options.BoundedCapacity
256 && !postponedMessages.IsEmpty && !MessageQueue.IsAddingCompleted) {
257 var block = postponedMessages.First ().Key;
258 DataflowMessageHeader header;
259 postponedMessages.TryRemove (block, out header);
262 var item = block.ConsumeMessage (header, Target, out consumed);
265 MessageQueue.Add (item);
267 EnsureProcessing (false);
268 } catch (InvalidOperationException) {
274 // release all postponed messages
275 if (MessageQueue.IsAddingCompleted) {
276 while (!postponedMessages.IsEmpty) {
277 var block = postponedMessages.First ().Key;
278 DataflowMessageHeader header;
279 postponedMessages.TryRemove (block, out header);
281 if (block.ReserveMessage (header, Target))
282 block.ReleaseReservation (header, Target);
286 postponedProcessing.Value = false;
289 if ((Volatile.Read (ref itemCount) < options.BoundedCapacity
290 || MessageQueue.IsAddingCompleted)
291 && !postponedMessages.IsEmpty)
292 EnsurePostponedProcessing ();
296 /// Makes sure the input queue is processed the way it needs to.
298 /// <param name="newItem">Was new item just added?</param>
299 protected abstract void EnsureProcessing (bool newItem);
302 /// Completes the box, no new messages will be accepted.
303 /// Also starts the process of completing the output queue.
305 public void Complete ()
307 // Make message queue complete
308 MessageQueue.CompleteAdding ();
309 OutgoingQueueComplete ();
310 VerifyCompleteness ();
312 if (!postponedMessages.IsEmpty)
313 EnsurePostponedProcessing ();
317 /// Notifies that outgoing queue should be completed, if possible.
319 protected virtual void OutgoingQueueComplete ()
324 /// Makes sure the block is completed if it should be.
326 protected virtual void VerifyCompleteness ()
328 if (MessageQueue.IsCompleted && externalCompleteTester ())
329 CompHelper.Complete ();