[System.Threading.Tasks.Dataflow] Replace implementation with CoreFx version
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Internal / TargetRegistry.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // TargetRegistry.cs
7 //
8 //
9 // A store of registered targets with a target block.
10 //
11 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
12
13 using System.Collections.Generic;
14 using System.Diagnostics;
15 using System.Diagnostics.CodeAnalysis;
16 using System.Diagnostics.Contracts;
17 using System.Linq;
18
19 namespace System.Threading.Tasks.Dataflow.Internal
20 {
21     /// <summary>Stores targets registered with a source.</summary>
22     /// <typeparam name="T">Specifies the type of data accepted by the targets.</typeparam>
23     /// <remarks>This type is not thread-safe.</remarks>
24     [DebuggerDisplay("Count={Count}")]
25     [DebuggerTypeProxy(typeof(TargetRegistry<>.DebugView))]
26     internal sealed class TargetRegistry<T>
27     {
28         /// <summary>
29         /// Information about a registered target. This class represents a self-sufficient node in a linked list.
30         /// </summary>
31         internal sealed class LinkedTargetInfo
32         {
33             /// <summary>Initializes the LinkedTargetInfo.</summary>
34             /// <param name="target">The target block reference for this entry.</param>
35             /// <param name="linkOptions">The link options.</param>
36             internal LinkedTargetInfo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
37             {
38                 Contract.Requires(target != null, "The target that is supposed to be linked must not be null.");
39                 Contract.Requires(linkOptions != null, "The linkOptions must not be null.");
40
41                 Target = target;
42                 PropagateCompletion = linkOptions.PropagateCompletion;
43                 RemainingMessages = linkOptions.MaxMessages;
44             }
45
46             /// <summary>The target block reference for this entry.</summary>
47             internal readonly ITargetBlock<T> Target;
48             /// <summary>The value of the PropagateCompletion link option.</summary>
49             internal readonly bool PropagateCompletion;
50             /// <summary>Number of remaining messages to propagate. 
51             /// This counter is initialized to the MaxMessages option and 
52             /// gets decremented after each successful propagation.</summary>
53             internal int RemainingMessages;
54             /// <summary>The previous node in the list.</summary>
55             internal LinkedTargetInfo Previous;
56             /// <summary>The next node in the list.</summary>
57             internal LinkedTargetInfo Next;
58         }
59
60         /// <summary>A reference to the owning source block.</summary>
61         private readonly ISourceBlock<T> _owningSource;
62         /// <summary>A mapping of targets to information about them.</summary>
63         private readonly Dictionary<ITargetBlock<T>, LinkedTargetInfo> _targetInformation;
64         /// <summary>The first node of an ordered list of targets. Messages should be offered to targets starting from First and following Next.</summary>
65         private LinkedTargetInfo _firstTarget;
66         /// <summary>The last node of the ordered list of targets. This field is used purely as a perf optimization to avoid traversing the list for each Add.</summary>
67         private LinkedTargetInfo _lastTarget;
68         /// <summary>Number of links with positive RemainingMessages counters.
69         /// This is an optimization that allows us to skip dictionary lookup when this counter is 0.</summary>
70         private int _linksWithRemainingMessages;
71
72         /// <summary>Initializes the registry.</summary>
73         internal TargetRegistry(ISourceBlock<T> owningSource)
74         {
75             Contract.Requires(owningSource != null, "The TargetRegistry instance must be owned by a source block.");
76
77             _owningSource = owningSource;
78             _targetInformation = new Dictionary<ITargetBlock<T>, LinkedTargetInfo>();
79         }
80
81         /// <summary>Adds a target to the registry.</summary>
82         /// <param name="target">The target to add.</param>
83         /// <param name="linkOptions">The link options.</param>
84         internal void Add(ref ITargetBlock<T> target, DataflowLinkOptions linkOptions)
85         {
86             Contract.Requires(target != null, "The target that is supposed to be linked must not be null.");
87             Contract.Requires(linkOptions != null, "The link options must not be null.");
88
89             LinkedTargetInfo targetInfo;
90
91             // If the target already exists in the registry, replace it with a new NopLinkPropagator to maintain uniqueness
92             if (_targetInformation.TryGetValue(target, out targetInfo)) target = new NopLinkPropagator(_owningSource, target);
93
94             // Add the target to both stores, the list and the dictionary, which are used for different purposes
95             var node = new LinkedTargetInfo(target, linkOptions);
96             AddToList(node, linkOptions.Append);
97             _targetInformation.Add(target, node);
98
99             // Increment the optimization counter if needed
100             Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
101             if (node.RemainingMessages > 0) _linksWithRemainingMessages++;
102 #if FEATURE_TRACING
103             DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
104             if (etwLog.IsEnabled())
105             {
106                 etwLog.DataflowBlockLinking(_owningSource, target);
107             }
108 #endif
109         }
110
111         /// <summary>Gets whether the registry contains a particular target.</summary>
112         /// <param name="target">The target.</param>
113         /// <returns>true if the registry contains the target; otherwise, false.</returns>
114         internal bool Contains(ITargetBlock<T> target)
115         {
116             return _targetInformation.ContainsKey(target);
117         }
118
119         /// <summary>Removes the target from the registry.</summary>
120         /// <param name="target">The target to remove.</param>
121         /// <param name="onlyIfReachedMaxMessages">
122         /// Only remove the target if it's configured to be unlinked after one propagation.
123         /// </param>
124         internal void Remove(ITargetBlock<T> target, bool onlyIfReachedMaxMessages = false)
125         {
126             Contract.Requires(target != null, "Target to remove is required.");
127
128             // If we are implicitly unlinking and there is nothing to be unlinked implicitly, bail
129             Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
130             if (onlyIfReachedMaxMessages && _linksWithRemainingMessages == 0) return;
131
132             // Otherwise take the slow path
133             Remove_Slow(target, onlyIfReachedMaxMessages);
134         }
135
136         /// <summary>Actually removes the target from the registry.</summary>
137         /// <param name="target">The target to remove.</param>
138         /// <param name="onlyIfReachedMaxMessages">
139         /// Only remove the target if it's configured to be unlinked after one propagation.
140         /// </param>
141         private void Remove_Slow(ITargetBlock<T> target, bool onlyIfReachedMaxMessages)
142         {
143             Contract.Requires(target != null, "Target to remove is required.");
144
145             // Make sure we've intended to go the slow route
146             Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
147             Debug.Assert(!onlyIfReachedMaxMessages || _linksWithRemainingMessages > 0, "We shouldn't have ended on the slow path.");
148
149             // If the target is registered...
150             LinkedTargetInfo node;
151             if (_targetInformation.TryGetValue(target, out node))
152             {
153                 Debug.Assert(node != null, "The LinkedTargetInfo node referenced in the Dictionary must be non-null.");
154
155                 // Remove the target, if either there's no constraint on the removal
156                 // or if this was the last remaining message.
157                 if (!onlyIfReachedMaxMessages || node.RemainingMessages == 1)
158                 {
159                     RemoveFromList(node);
160                     _targetInformation.Remove(target);
161
162                     // Decrement the optimization counter if needed
163                     if (node.RemainingMessages == 0) _linksWithRemainingMessages--;
164                     Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
165 #if FEATURE_TRACING
166                     DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
167                     if (etwLog.IsEnabled())
168                     {
169                         etwLog.DataflowBlockUnlinking(_owningSource, target);
170                     }
171 #endif
172                 }
173                 // If the target is to stay and we are counting the remaining messages for this link, decrement the counter
174                 else if (node.RemainingMessages > 0)
175                 {
176                     Debug.Assert(node.RemainingMessages > 1, "The target should have been removed, because there are no remaining messages.");
177                     node.RemainingMessages--;
178                 }
179             }
180         }
181
182         /// <summary>Clears the target registry entry points while allowing subsequent traversals of the linked list.</summary>
183         internal LinkedTargetInfo ClearEntryPoints()
184         {
185             // Save _firstTarget so we can return it
186             LinkedTargetInfo firstTarget = _firstTarget;
187
188             // Clear out the entry points
189             _firstTarget = _lastTarget = null;
190             _targetInformation.Clear();
191             Debug.Assert(_linksWithRemainingMessages >= 0, "_linksWithRemainingMessages must be non-negative at any time.");
192             _linksWithRemainingMessages = 0;
193
194             return firstTarget;
195         }
196
197         /// <summary>Propagated completion to the targets of the given linked list.</summary>
198         /// <param name="firstTarget">The head of a saved linked list.</param>
199         internal void PropagateCompletion(LinkedTargetInfo firstTarget)
200         {
201             Debug.Assert(_owningSource.Completion.IsCompleted, "The owning source must have completed before propagating completion.");
202
203             // Cache the owning source's completion task to avoid calling the getter many times
204             Task owningSourceCompletion = _owningSource.Completion;
205
206             // Propagate completion to those targets that have requested it
207             for (LinkedTargetInfo node = firstTarget; node != null; node = node.Next)
208             {
209                 if (node.PropagateCompletion) Common.PropagateCompletion(owningSourceCompletion, node.Target, Common.AsyncExceptionHandler);
210             }
211         }
212
213         /// <summary>Gets the first node of the ordered target list.</summary>
214         internal LinkedTargetInfo FirstTargetNode { get { return _firstTarget; } }
215
216         /// <summary>Adds a LinkedTargetInfo node to the doubly-linked list.</summary>
217         /// <param name="node">The node to be added.</param>
218         /// <param name="append">Whether to append or to prepend the node.</param>
219         internal void AddToList(LinkedTargetInfo node, bool append)
220         {
221             Contract.Requires(node != null, "Requires a node to be added.");
222
223             // If the list is empty, assign the ends to point to the new node and we are done
224             if (_firstTarget == null && _lastTarget == null)
225             {
226                 _firstTarget = _lastTarget = node;
227             }
228             else
229             {
230                 Debug.Assert(_firstTarget != null && _lastTarget != null, "Both first and last node must either be null or non-null.");
231                 Debug.Assert(_lastTarget.Next == null, "The last node must not have a successor.");
232                 Debug.Assert(_firstTarget.Previous == null, "The first node must not have a predecessor.");
233
234                 if (append)
235                 {
236                     // Link the new node to the end of the existing list
237                     node.Previous = _lastTarget;
238                     _lastTarget.Next = node;
239                     _lastTarget = node;
240                 }
241                 else
242                 {
243                     // Link the new node to the front of the existing list
244                     node.Next = _firstTarget;
245                     _firstTarget.Previous = node;
246                     _firstTarget = node;
247                 }
248             }
249
250             Debug.Assert(_firstTarget != null && _lastTarget != null, "Both first and last node must be non-null after AddToList.");
251         }
252
253         /// <summary>Removes the LinkedTargetInfo node from the doubly-linked list.</summary>
254         /// <param name="node">The node to be removed.</param>
255         internal void RemoveFromList(LinkedTargetInfo node)
256         {
257             Contract.Requires(node != null, "Node to remove is required.");
258             Debug.Assert(_firstTarget != null && _lastTarget != null, "Both first and last node must be non-null before RemoveFromList.");
259
260             LinkedTargetInfo previous = node.Previous;
261             LinkedTargetInfo next = node.Next;
262
263             // Remove the node by linking the adjacent nodes
264             if (node.Previous != null)
265             {
266                 node.Previous.Next = next;
267                 node.Previous = null;
268             }
269
270             if (node.Next != null)
271             {
272                 node.Next.Previous = previous;
273                 node.Next = null;
274             }
275
276             // Adjust the list ends
277             if (_firstTarget == node) _firstTarget = next;
278             if (_lastTarget == node) _lastTarget = previous;
279
280             Debug.Assert((_firstTarget != null) == (_lastTarget != null), "Both first and last node must either be null or non-null after RemoveFromList.");
281         }
282
283         /// <summary>Gets the number of items in the registry.</summary>
284         private int Count { get { return _targetInformation.Count; } }
285
286         /// <summary>Converts the linked list of targets to an array for rendering in a debugger.</summary>
287         private ITargetBlock<T>[] TargetsForDebugger
288         {
289             get
290             {
291                 var targets = new ITargetBlock<T>[Count];
292                 int i = 0;
293                 for (LinkedTargetInfo node = _firstTarget; node != null; node = node.Next)
294                 {
295                     targets[i++] = node.Target;
296                 }
297
298                 return targets;
299             }
300         }
301
302
303
304         /// <summary>Provides a nop passthrough for use with TargetRegistry.</summary>
305         [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
306         [DebuggerTypeProxy(typeof(TargetRegistry<>.NopLinkPropagator.DebugView))]
307         private sealed class NopLinkPropagator : IPropagatorBlock<T, T>, ISourceBlock<T>, IDebuggerDisplay
308         {
309             /// <summary>The source that encapsulates this block.</summary>
310             private readonly ISourceBlock<T> _owningSource;
311             /// <summary>The target with which this block is associated.</summary>
312             private readonly ITargetBlock<T> _target;
313
314             /// <summary>Initializes the passthrough.</summary>
315             /// <param name="owningSource">The source that encapsulates this block.</param>
316             /// <param name="target">The target to which messages should be forwarded.</param>
317             internal NopLinkPropagator(ISourceBlock<T> owningSource, ITargetBlock<T> target)
318             {
319                 Contract.Requires(owningSource != null, "Propagator must be associated with a source.");
320                 Contract.Requires(target != null, "Target to propagate to is required.");
321
322                 // Store the arguments
323                 _owningSource = owningSource;
324                 _target = target;
325             }
326
327             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
328             DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
329             {
330                 Debug.Assert(source == _owningSource, "Only valid to be used with the source for which it was created.");
331                 return _target.OfferMessage(messageHeader, messageValue, this, consumeToAccept);
332             }
333
334             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
335             T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out Boolean messageConsumed)
336             {
337                 return _owningSource.ConsumeMessage(messageHeader, this, out messageConsumed);
338             }
339
340             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
341             bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
342             {
343                 return _owningSource.ReserveMessage(messageHeader, this);
344             }
345
346             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
347             void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
348             {
349                 _owningSource.ReleaseReservation(messageHeader, this);
350             }
351
352             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
353             Task IDataflowBlock.Completion { get { return _owningSource.Completion; } }
354             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
355             void IDataflowBlock.Complete() { _target.Complete(); }
356             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
357             void IDataflowBlock.Fault(Exception exception) { _target.Fault(exception); }
358
359             /// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
360             IDisposable ISourceBlock<T>.LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); }
361
362             /// <summary>The data to display in the debugger display attribute.</summary>
363             [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
364             private object DebuggerDisplayContent
365             {
366                 get
367                 {
368                     var displaySource = _owningSource as IDebuggerDisplay;
369                     var displayTarget = _target as IDebuggerDisplay;
370                     return string.Format("{0} Source=\"{1}\", Target=\"{2}\"",
371                         Common.GetNameForDebugger(this),
372                         displaySource != null ? displaySource.Content : _owningSource,
373                         displayTarget != null ? displayTarget.Content : _target);
374                 }
375             }
376             /// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
377             object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
378
379             /// <summary>Provides a debugger type proxy for a passthrough.</summary>
380             private sealed class DebugView
381             {
382                 /// <summary>The passthrough.</summary>
383                 private readonly NopLinkPropagator _passthrough;
384
385                 /// <summary>Initializes the debug view.</summary>
386                 /// <param name="passthrough">The passthrough to view.</param>
387                 public DebugView(NopLinkPropagator passthrough)
388                 {
389                     Contract.Requires(passthrough != null, "Need a propagator with which to construct the debug view.");
390                     _passthrough = passthrough;
391                 }
392
393                 /// <summary>The linked target for this block.</summary>
394                 public ITargetBlock<T> LinkedTarget { get { return _passthrough._target; } }
395             }
396         }
397
398
399         /// <summary>Provides a debugger type proxy for the target registry.</summary>
400         private sealed class DebugView
401         {
402             /// <summary>The registry being debugged.</summary>
403             private readonly TargetRegistry<T> _registry;
404
405             /// <summary>Initializes the type proxy.</summary>
406             /// <param name="registry">The target registry.</param>
407             public DebugView(TargetRegistry<T> registry)
408             {
409                 Contract.Requires(registry != null, "Need a registry with which to construct the debug view.");
410                 _registry = registry;
411             }
412
413             /// <summary>Gets a list of all targets to show in the debugger.</summary>
414             [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
415             public ITargetBlock<T>[] Targets { get { return _registry.TargetsForDebugger; } }
416         }
417     }
418 }