3 // Copyright (c) 2012 Petr Onderka
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 using System.Collections.Concurrent;
24 using System.Collections.Generic;
26 namespace System.Threading.Tasks.Dataflow {
28 /// Base class for collection of target blocks for a source block.
29 /// Also handles sending messages to the target blocks.
31 abstract class TargetCollectionBase<T> {
33 /// Represents a target block with its options.
35 protected class Target : IDisposable {
36 readonly TargetCollectionBase<T> targetCollection;
37 volatile int remainingMessages;
38 readonly CancellationTokenSource cancellationTokenSource;
40 public ITargetBlock<T> TargetBlock { get; private set; }
42 public Target (TargetCollectionBase<T> targetCollection,
43 ITargetBlock<T> targetBlock, int maxMessages,
44 CancellationTokenSource cancellationTokenSource)
46 TargetBlock = targetBlock;
47 this.targetCollection = targetCollection;
48 remainingMessages = maxMessages;
49 this.cancellationTokenSource = cancellationTokenSource;
51 Postponed = new AtomicBoolean ();
52 Reserved = new AtomicBoolean ();
56 /// Is called after a message was sent, makes sure the linked is destroyed after
57 /// <see cref="DataflowLinkOptions.MaxMessages"/> were sent.
59 public void MessageSent()
61 if (remainingMessages != -1)
63 if (remainingMessages == 0)
67 readonly AtomicBoolean disabled = new AtomicBoolean ();
69 /// Is the link destroyed?
73 get { return disabled.Value; }
77 /// Destroys the link to this target.
79 public void Dispose ()
81 disabled.Value = true;
83 if (cancellationTokenSource != null)
84 cancellationTokenSource.Cancel ();
87 targetCollection.TargetDictionary.TryRemove (TargetBlock, out ignored);
89 // to avoid memory leak; it could take a long time
90 // before this object is actually removed from the collection
95 /// Does this target have a postponed message?
97 public AtomicBoolean Postponed { get; private set; }
100 /// Does this target have a reserved message?
102 /// <remarks>Used only by broadcast blocks.</remarks>
103 public AtomicBoolean Reserved { get; private set; }
106 readonly ISourceBlock<T> block;
107 readonly bool broadcast;
108 readonly bool consumeToAccept;
110 readonly ConcurrentQueue<Target> prependQueue = new ConcurrentQueue<Target> ();
111 readonly ConcurrentQueue<Target> appendQueue = new ConcurrentQueue<Target> ();
112 readonly LinkedList<Target> targets = new LinkedList<Target> ();
114 protected readonly ConcurrentDictionary<ITargetBlock<T>, Target> TargetDictionary =
115 new ConcurrentDictionary<ITargetBlock<T>, Target> ();
117 // lastMessageHeaderId will be always accessed only from one thread
118 long lastMessageHeaderId;
119 // currentMessageHeaderId can be read from multiple threads at the same time
120 long currentMessageHeaderId;
125 protected TargetCollectionBase (ISourceBlock<T> block, bool broadcast, bool consumeToAccept)
128 this.broadcast = broadcast;
129 this.consumeToAccept = consumeToAccept;
133 /// Adds a target block to send messages to.
136 /// An object that can be used to destroy the link to the added target.
138 public IDisposable AddTarget (ITargetBlock<T> targetBlock, DataflowLinkOptions options)
140 CancellationTokenSource cancellationTokenSource = null;
141 if (options.PropagateCompletion) {
142 cancellationTokenSource = new CancellationTokenSource();
143 block.Completion.ContinueWith (t =>
146 targetBlock.Fault (t.Exception);
148 targetBlock.Complete ();
149 }, cancellationTokenSource.Token);
152 var target = new Target (
153 this, targetBlock, options.MaxMessages, cancellationTokenSource);
154 TargetDictionary [targetBlock] = target;
156 appendQueue.Enqueue (target);
158 prependQueue.Enqueue (target);
164 /// Sets the current item to be offered to targets
166 public void SetCurrentItem (T item)
168 firstOffering = true;
170 Volatile.Write (ref currentMessageHeaderId, ++lastMessageHeaderId);
176 /// Clears the collection of "unpostponed" targets.
178 protected abstract void ClearUnpostponed ();
181 /// Resets the current item to be offered to targets.
182 /// This means there is currently nothing to offer.
184 public void ResetCurrentItem ()
186 currentItem = default(T);
187 Volatile.Write (ref currentMessageHeaderId, 0);
191 /// Is there an item to send right now?
193 public bool HasCurrentItem {
194 get { return Volatile.Read (ref currentMessageHeaderId) != 0; }
198 /// Offers the current item to all eligible targets.
200 /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
201 public bool OfferItemToTargets ()
203 // is there an item to offer?
207 var old = Tuple.Create (targets.First, targets.Last);
210 // order is important here, we want to make sure that prepended target
211 // added before appended target is processed first
212 var appended = PrependOrAppend (false);
213 var prepended = PrependOrAppend (true);
215 if (OfferItemToTargets (prepended))
219 if (OfferItemToTargets (old))
221 firstOffering = false;
223 if (OfferItemToUnpostponed ())
227 if (OfferItemToTargets (appended))
229 } while (NeedsProcessing);
235 /// Are there any targets that currently require a message to be sent to them?
237 public bool NeedsProcessing {
239 return !appendQueue.IsEmpty || !prependQueue.IsEmpty
240 || !UnpostponedIsEmpty;
245 /// Is the collection of unpostponed targets empty?
247 protected abstract bool UnpostponedIsEmpty { get; }
250 /// Prepends (appends) targets that should be prepended (appended) to the collection of targets.
252 /// <param name="prepend"><c>true</c> to prepend, <c>false</c> to append.</param>
254 /// Nodes that contain first and last target added to the list,
255 /// or <c>null</c> if no nodes were added.
257 Tuple<LinkedListNode<Target>, LinkedListNode<Target>> PrependOrAppend (
260 var queue = prepend ? prependQueue : appendQueue;
265 LinkedListNode<Target> first = null;
266 LinkedListNode<Target> last = null;
269 while (queue.TryDequeue (out target)) {
271 ? targets.AddFirst (target)
272 : targets.AddLast (target);
279 ? Tuple.Create (last, first)
280 : Tuple.Create (first, last);
284 /// Offers the current item to the targets between the given nodes (inclusive).
286 /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
287 bool OfferItemToTargets (
288 Tuple<LinkedListNode<Target>, LinkedListNode<Target>> targetPair)
290 if (targetPair == null
291 || targetPair.Item1 == null || targetPair.Item2 == null)
294 var node = targetPair.Item1;
295 while (node != targetPair.Item2.Next) {
296 if (node.Value.Disabled) {
297 var nodeToRemove = node;
299 targets.Remove (nodeToRemove);
303 if (OfferItem (node.Value) && !broadcast)
313 /// Offers the current item to unpostponed targets.
315 /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
316 protected abstract bool OfferItemToUnpostponed ();
319 /// Offers the current item to the given target.
321 /// <returns>Was the item accepted?</returns>
322 protected bool OfferItem (Target target)
324 if (target.Reserved.Value)
326 if (!broadcast && target.Postponed.Value)
329 var result = target.TargetBlock.OfferMessage (
330 // volatile read is not necessary here,
331 // because currentMessageHeaderId is always written from this thread
332 new DataflowMessageHeader (currentMessageHeaderId), currentItem, block,
336 case DataflowMessageStatus.Accepted:
337 target.MessageSent ();
339 case DataflowMessageStatus.Postponed:
340 target.Postponed.Value = true;
342 case DataflowMessageStatus.DecliningPermanently:
351 /// Returns whether the given header corresponds to the current item.
353 public bool VerifyHeader (DataflowMessageHeader header)
355 return header.Id == Volatile.Read (ref currentMessageHeaderId);
360 /// Target collection for non-broadcast blocks.
362 class TargetCollection<T> : TargetCollectionBase<T> {
363 readonly ConcurrentQueue<Target> unpostponedTargets =
364 new ConcurrentQueue<Target> ();
366 public TargetCollection (ISourceBlock<T> block)
367 : base (block, false, false)
372 /// Is the collection of unpostponed targets empty?
374 protected override bool UnpostponedIsEmpty {
375 get { return unpostponedTargets.IsEmpty; }
379 /// Returns whether the given header corresponds to the current item
380 /// and that the given target block postponed this item.
382 public bool VerifyHeader (DataflowMessageHeader header, ITargetBlock<T> targetBlock)
384 return VerifyHeader (header)
385 && TargetDictionary[targetBlock].Postponed.Value;
389 /// Unpostpones the given target.
391 /// <param name="targetBlock">Target to unpostpone.</param>
392 /// <param name="messageConsumed">Did the target consume an item?</param>
393 public void UnpostponeTarget (ITargetBlock<T> targetBlock, bool messageConsumed)
396 if (!TargetDictionary.TryGetValue (targetBlock, out target))
400 target.MessageSent ();
401 unpostponedTargets.Enqueue (target);
403 target.Postponed.Value = false;
407 /// Clears the collection of "unpostponed" targets.
409 protected override void ClearUnpostponed ()
412 while (unpostponedTargets.TryDequeue (out ignored)) {
417 /// Offers the current item to unpostponed targets.
419 /// <returns>Was the item accepted?</returns>
420 protected override bool OfferItemToUnpostponed ()
423 while (unpostponedTargets.TryDequeue (out target)) {
424 if (!target.Disabled && OfferItem (target))
433 /// Target collection for broadcast blocks.
435 class BroadcastTargetCollection<T> : TargetCollectionBase<T> {
436 // it's necessary to store the headers because of a race between
437 // UnpostponeTargetConsumed and SetCurrentItem
438 readonly ConcurrentQueue<Tuple<Target, DataflowMessageHeader>>
440 new ConcurrentQueue<Tuple<Target, DataflowMessageHeader>> ();
442 public BroadcastTargetCollection (ISourceBlock<T> block, bool consumeToAccept)
443 : base (block, true, consumeToAccept)
448 /// Is the collection of unpostponed targets empty?
450 protected override bool UnpostponedIsEmpty {
451 get { return unpostponedTargets.IsEmpty; }
455 /// Marks the target as having a reserved message.
457 public void ReserveTarget (ITargetBlock<T> targetBlock)
459 TargetDictionary [targetBlock].Reserved.Value = true;
463 /// Unpostpone target after it consumed a message.
465 /// <param name="targetBlock">The target to unpostpone.</param>
466 /// <param name="header">Header of the message the target consumed.</param>
467 public void UnpostponeTargetConsumed (ITargetBlock<T> targetBlock,
468 DataflowMessageHeader header)
470 Target target = TargetDictionary [targetBlock];
472 target.MessageSent ();
473 unpostponedTargets.Enqueue (Tuple.Create (target, header));
475 target.Postponed.Value = false;
476 target.Reserved.Value = false;
480 /// Unpostpone target in the case when it didn't successfuly consume a message.
482 public void UnpostponeTargetNotConsumed (ITargetBlock<T> targetBlock)
485 if (!TargetDictionary.TryGetValue (targetBlock, out target))
488 unpostponedTargets.Enqueue (Tuple.Create (target,
489 new DataflowMessageHeader ()));
491 target.Postponed.Value = false;
492 target.Reserved.Value = false;
496 /// Clears the collection of "unpostponed" targets.
498 protected override void ClearUnpostponed ()
500 Tuple<Target, DataflowMessageHeader> ignored;
501 while (unpostponedTargets.TryDequeue (out ignored)) {
506 /// Offers the current item to unpostponed targets.
508 /// <returns>Always <c>false</c>.</returns>
509 protected override bool OfferItemToUnpostponed ()
511 Tuple<Target, DataflowMessageHeader> tuple;
512 while (unpostponedTargets.TryDequeue (out tuple)) {
513 // offer to unconditionaly unpostponed
514 // and those that consumed some old value
515 if (!tuple.Item1.Disabled
516 && (!tuple.Item2.IsValid || !VerifyHeader (tuple.Item2)))
517 OfferItem (tuple.Item1);