1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // A store of registered targets with a target block.
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
19 namespace System.Threading.Tasks.Dataflow.Internal
21 /// <summary>Stores targets registered with a source.</summary>
22 /// <typeparam name="T">Specifies the type of data accepted by the targets.</typeparam>
23 /// <remarks>This type is not thread-safe.</remarks>
24 [DebuggerDisplay("Count={Count}")]
25 [DebuggerTypeProxy(typeof(TargetRegistry<>.DebugView))]
26 internal sealed class TargetRegistry<T>
29 /// Information about a registered target. This class represents a self-sufficient node in a linked list.
31 internal sealed class LinkedTargetInfo
33 /// <summary>Initializes the LinkedTargetInfo.</summary>
34 /// <param name="target">The target block reference for this entry.</param>
35 /// <param name="linkOptions">The link options.</param>
36 internal LinkedTargetInfo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
38 Contract.Requires(target != null, "The target that is supposed to be linked must not be null.");
39 Contract.Requires(linkOptions != null, "The linkOptions must not be null.");
42 PropagateCompletion = linkOptions.PropagateCompletion;
43 RemainingMessages = linkOptions.MaxMessages;
46 /// <summary>The target block reference for this entry.</summary>
47 internal readonly ITargetBlock<T> Target;
48 /// <summary>The value of the PropagateCompletion link option.</summary>
49 internal readonly bool PropagateCompletion;
50 /// <summary>Number of remaining messages to propagate.
51 /// This counter is initialized to the MaxMessages option and
52 /// gets decremented after each successful propagation.</summary>
53 internal int RemainingMessages;
54 /// <summary>The previous node in the list.</summary>
55 internal LinkedTargetInfo Previous;
56 /// <summary>The next node in the list.</summary>
57 internal LinkedTargetInfo Next;
60 /// <summary>A reference to the owning source block.</summary>
61 private readonly ISourceBlock<T> _owningSource;
62 /// <summary>A mapping of targets to information about them.</summary>
63 private readonly Dictionary<ITargetBlock<T>, LinkedTargetInfo> _targetInformation;
64 /// <summary>The first node of an ordered list of targets. Messages should be offered to targets starting from First and following Next.</summary>
65 private LinkedTargetInfo _firstTarget;
66 /// <summary>The last node of the ordered list of targets. This field is used purely as a perf optimization to avoid traversing the list for each Add.</summary>
67 private LinkedTargetInfo _lastTarget;
68 /// <summary>Number of links with positive RemainingMessages counters.
69 /// This is an optimization that allows us to skip dictionary lookup when this counter is 0.</summary>
70 private int _linksWithRemainingMessages;
72 /// <summary>Initializes the registry.</summary>
73 internal TargetRegistry(ISourceBlock<T> owningSource)
75 Contract.Requires(owningSource != null, "The TargetRegistry instance must be owned by a source block.");
77 _owningSource = owningSource;
78 _targetInformation = new Dictionary<ITargetBlock<T>, LinkedTargetInfo>();
81 /// <summary>Adds a target to the registry.</summary>
82 /// <param name="target">The target to add.</param>
83 /// <param name="linkOptions">The link options.</param>
84 internal void Add(ref ITargetBlock<T> target, DataflowLinkOptions linkOptions)
86 Contract.Requires(target != null, "The target that is supposed to be linked must not be null.");
87 Contract.Requires(linkOptions != null, "The link options must not be null.");
89 LinkedTargetInfo targetInfo;
91 // If the target already exists in the registry, replace it with a new NopLinkPropagator to maintain uniqueness
92 if (_targetInformation.TryGetValue(target, out targetInfo)) target = new NopLinkPropagator(_owningSource, target);
94 // Add the target to both stores, the list and the dictionary, which are used for different purposes
95 var node = new LinkedTargetInfo(target, linkOptions);
96 AddToList(node, linkOptions.Append);
97 _targetInformation.Add(target, node);
99 // Increment the optimization counter if needed
100 Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
101 if (node.RemainingMessages > 0) _linksWithRemainingMessages++;
103 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
104 if (etwLog.IsEnabled())
106 etwLog.DataflowBlockLinking(_owningSource, target);
111 /// <summary>Gets whether the registry contains a particular target.</summary>
112 /// <param name="target">The target.</param>
113 /// <returns>true if the registry contains the target; otherwise, false.</returns>
114 internal bool Contains(ITargetBlock<T> target)
116 return _targetInformation.ContainsKey(target);
119 /// <summary>Removes the target from the registry.</summary>
120 /// <param name="target">The target to remove.</param>
121 /// <param name="onlyIfReachedMaxMessages">
122 /// Only remove the target if it's configured to be unlinked after one propagation.
124 internal void Remove(ITargetBlock<T> target, bool onlyIfReachedMaxMessages = false)
126 Contract.Requires(target != null, "Target to remove is required.");
128 // If we are implicitly unlinking and there is nothing to be unlinked implicitly, bail
129 Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
130 if (onlyIfReachedMaxMessages && _linksWithRemainingMessages == 0) return;
132 // Otherwise take the slow path
133 Remove_Slow(target, onlyIfReachedMaxMessages);
136 /// <summary>Actually removes the target from the registry.</summary>
137 /// <param name="target">The target to remove.</param>
138 /// <param name="onlyIfReachedMaxMessages">
139 /// Only remove the target if it's configured to be unlinked after one propagation.
141 private void Remove_Slow(ITargetBlock<T> target, bool onlyIfReachedMaxMessages)
143 Contract.Requires(target != null, "Target to remove is required.");
145 // Make sure we've intended to go the slow route
146 Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
147 Debug.Assert(!onlyIfReachedMaxMessages || _linksWithRemainingMessages > 0, "We shouldn't have ended on the slow path.");
149 // If the target is registered...
150 LinkedTargetInfo node;
151 if (_targetInformation.TryGetValue(target, out node))
153 Debug.Assert(node != null, "The LinkedTargetInfo node referenced in the Dictionary must be non-null.");
155 // Remove the target, if either there's no constraint on the removal
156 // or if this was the last remaining message.
157 if (!onlyIfReachedMaxMessages || node.RemainingMessages == 1)
159 RemoveFromList(node);
160 _targetInformation.Remove(target);
162 // Decrement the optimization counter if needed
163 if (node.RemainingMessages == 0) _linksWithRemainingMessages--;
164 Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
166 DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
167 if (etwLog.IsEnabled())
169 etwLog.DataflowBlockUnlinking(_owningSource, target);
173 // If the target is to stay and we are counting the remaining messages for this link, decrement the counter
174 else if (node.RemainingMessages > 0)
176 Debug.Assert(node.RemainingMessages > 1, "The target should have been removed, because there are no remaining messages.");
177 node.RemainingMessages--;
182 /// <summary>Clears the target registry entry points while allowing subsequent traversals of the linked list.</summary>
183 internal LinkedTargetInfo ClearEntryPoints()
185 // Save _firstTarget so we can return it
186 LinkedTargetInfo firstTarget = _firstTarget;
188 // Clear out the entry points
189 _firstTarget = _lastTarget = null;
190 _targetInformation.Clear();
191 Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
192 _linksWithRemainingMessages = 0;
197 /// <summary>Propagated completion to the targets of the given linked list.</summary>
198 /// <param name="firstTarget">The head of a saved linked list.</param>
199 internal void PropagateCompletion(LinkedTargetInfo firstTarget)
201 Debug.Assert(_owningSource.Completion.IsCompleted, "The owning source must have completed before propagating completion.");
203 // Cache the owning source's completion task to avoid calling the getter many times
204 Task owningSourceCompletion = _owningSource.Completion;
206 // Propagate completion to those targets that have requested it
207 for (LinkedTargetInfo node = firstTarget; node != null; node = node.Next)
209 if (node.PropagateCompletion) Common.PropagateCompletion(owningSourceCompletion, node.Target, Common.AsyncExceptionHandler);
213 /// <summary>Gets the first node of the ordered target list.</summary>
214 internal LinkedTargetInfo FirstTargetNode { get { return _firstTarget; } }
216 /// <summary>Adds a LinkedTargetInfo node to the doubly-linked list.</summary>
217 /// <param name="node">The node to be added.</param>
218 /// <param name="append">Whether to append or to prepend the node.</param>
219 internal void AddToList(LinkedTargetInfo node, bool append)
221 Contract.Requires(node != null, "Requires a node to be added.");
223 // If the list is empty, assign the ends to point to the new node and we are done
224 if (_firstTarget == null && _lastTarget == null)
226 _firstTarget = _lastTarget = node;
230 Debug.Assert(_firstTarget != null && _lastTarget != null, "Both first and last node must either be null or non-null.");
231 Debug.Assert(_lastTarget.Next == null, "The last node must not have a successor.");
232 Debug.Assert(_firstTarget.Previous == null, "The first node must not have a predecessor.");
236 // Link the new node to the end of the existing list
237 node.Previous = _lastTarget;
238 _lastTarget.Next = node;
243 // Link the new node to the front of the existing list
244 node.Next = _firstTarget;
245 _firstTarget.Previous = node;
250 Debug.Assert(_firstTarget != null && _lastTarget != null, "Both first and last node must be non-null after AddToList.");
253 /// <summary>Removes the LinkedTargetInfo node from the doubly-linked list.</summary>
254 /// <param name="node">The node to be removed.</param>
255 internal void RemoveFromList(LinkedTargetInfo node)
257 Contract.Requires(node != null, "Node to remove is required.");
258 Debug.Assert(_firstTarget != null && _lastTarget != null, "Both first and last node must be non-null before RemoveFromList.");
260 LinkedTargetInfo previous = node.Previous;
261 LinkedTargetInfo next = node.Next;
263 // Remove the node by linking the adjacent nodes
264 if (node.Previous != null)
266 node.Previous.Next = next;
267 node.Previous = null;
270 if (node.Next != null)
272 node.Next.Previous = previous;
276 // Adjust the list ends
277 if (_firstTarget == node) _firstTarget = next;
278 if (_lastTarget == node) _lastTarget = previous;
280 Debug.Assert((_firstTarget != null) == (_lastTarget != null), "Both first and last node must either be null or non-null after RemoveFromList.");
283 /// <summary>Gets the number of items in the registry.</summary>
284 private int Count { get { return _targetInformation.Count; } }
286 /// <summary>Converts the linked list of targets to an array for rendering in a debugger.</summary>
287 private ITargetBlock<T>[] TargetsForDebugger
291 var targets = new ITargetBlock<T>[Count];
293 for (LinkedTargetInfo node = _firstTarget; node != null; node = node.Next)
295 targets[i++] = node.Target;
304 /// <summary>Provides a nop passthrough for use with TargetRegistry.</summary>
305 [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
306 [DebuggerTypeProxy(typeof(TargetRegistry<>.NopLinkPropagator.DebugView))]
307 private sealed class NopLinkPropagator : IPropagatorBlock<T, T>, ISourceBlock<T>, IDebuggerDisplay
309 /// <summary>The source that encapsulates this block.</summary>
310 private readonly ISourceBlock<T> _owningSource;
311 /// <summary>The target with which this block is associated.</summary>
312 private readonly ITargetBlock<T> _target;
314 /// <summary>Initializes the passthrough.</summary>
315 /// <param name="owningSource">The source that encapsulates this block.</param>
316 /// <param name="target">The target to which messages should be forwarded.</param>
317 internal NopLinkPropagator(ISourceBlock<T> owningSource, ITargetBlock<T> target)
319 Contract.Requires(owningSource != null, "Propagator must be associated with a source.");
320 Contract.Requires(target != null, "Target to propagate to is required.");
322 // Store the arguments
323 _owningSource = owningSource;
327 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
328 DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
330 Debug.Assert(source == _owningSource, "Only valid to be used with the source for which it was created.");
331 return _target.OfferMessage(messageHeader, messageValue, this, consumeToAccept);
334 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
335 T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out Boolean messageConsumed)
337 return _owningSource.ConsumeMessage(messageHeader, this, out messageConsumed);
340 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
341 bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
343 return _owningSource.ReserveMessage(messageHeader, this);
346 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
347 void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
349 _owningSource.ReleaseReservation(messageHeader, this);
352 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
353 Task IDataflowBlock.Completion { get { return _owningSource.Completion; } }
354 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
355 void IDataflowBlock.Complete() { _target.Complete(); }
356 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
357 void IDataflowBlock.Fault(Exception exception) { _target.Fault(exception); }
359 /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
360 IDisposable ISourceBlock<T>.LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); }
362 /// <summary>The data to display in the debugger display attribute.</summary>
363 [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
364 private object DebuggerDisplayContent
368 var displaySource = _owningSource as IDebuggerDisplay;
369 var displayTarget = _target as IDebuggerDisplay;
370 return string.Format("{0} Source=\"{1}\", Target=\"{2}\"",
371 Common.GetNameForDebugger(this),
372 displaySource != null ? displaySource.Content : _owningSource,
373 displayTarget != null ? displayTarget.Content : _target);
376 /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
377 object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
379 /// <summary>Provides a debugger type proxy for a passthrough.</summary>
380 private sealed class DebugView
382 /// <summary>The passthrough.</summary>
383 private readonly NopLinkPropagator _passthrough;
385 /// <summary>Initializes the debug view.</summary>
386 /// <param name="passthrough">The passthrough to view.</param>
387 public DebugView(NopLinkPropagator passthrough)
389 Contract.Requires(passthrough != null, "Need a propagator with which to construct the debug view.");
390 _passthrough = passthrough;
393 /// <summary>The linked target for this block.</summary>
394 public ITargetBlock<T> LinkedTarget { get { return _passthrough._target; } }
399 /// <summary>Provides a debugger type proxy for the target registry.</summary>
400 private sealed class DebugView
402 /// <summary>The registry being debugged.</summary>
403 private readonly TargetRegistry<T> _registry;
405 /// <summary>Initializes the type proxy.</summary>
406 /// <param name="registry">The target registry.</param>
407 public DebugView(TargetRegistry<T> registry)
409 Contract.Requires(registry != null, "Need a registry with which to construct the debug view.");
410 _registry = registry;
413 /// <summary>Gets a list of all targets to show in the debugger.</summary>
414 [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
415 public ITargetBlock<T>[] Targets { get { return _registry.TargetsForDebugger; } }