1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
9 // An intermediate buffer that ensures messages are output in the right order.
10 // Used by blocks (e.g. TransformBlock, TransformManyBlock) when operating in
11 // parallel modes that could result in messages being processed out of order.
13 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
15 using System.Collections.Generic;
16 using System.Diagnostics;
17 using System.Diagnostics.Contracts;
20 namespace System.Threading.Tasks.Dataflow.Internal
22 /// <summary>Base interface for reordering buffers.</summary>
23 internal interface IReorderingBuffer
25 /// <summary>Informs the reordering buffer not to expect the message with the specified id.</summary>
26 /// <param name="id">The id of the message to be ignored.</param>
27 void IgnoreItem(long id);
30 /// <summary>Provides a buffer that reorders items according to their incoming IDs.</summary>
31 /// <typeparam name="TOutput">Specifies the type of data stored in the items being reordered.</typeparam>
33 /// This type expects the first item to be ID==0 and for all subsequent items
34 /// to increase IDs sequentially.
36 [DebuggerDisplay("Count={CountForDebugging}")]
37 [DebuggerTypeProxy(typeof(ReorderingBuffer<>.DebugView))]
38 internal sealed class ReorderingBuffer<TOutput> : IReorderingBuffer
40 /// <summary>The source that owns this reordering buffer.</summary>
41 private readonly object _owningSource;
42 /// <summary>A reordering buffer used when parallelism is employed and items may be completed out-of-order.</summary>
43 /// <remarks>Also serves as the sync object to protect the contents of this class.</remarks>
44 private readonly Dictionary<long, KeyValuePair<bool, TOutput>> _reorderingBuffer = new Dictionary<long, KeyValuePair<bool, TOutput>>();
45 /// <summary>Action used to output items in order.</summary>
46 private readonly Action<object, TOutput> _outputAction;
47 /// <summary>The ID of the next item that should be released from the reordering buffer.</summary>
48 private long _nextReorderedIdToOutput = 0;
50 /// <summary>Gets the object used to synchronize all access to the reordering buffer's internals.</summary>
51 private object ValueLock { get { return _reorderingBuffer; } }
53 /// <summary>Initializes the reordering buffer.</summary>
54 /// <param name="owningSource">The source that owns this reordering buffer.</param>
55 /// <param name="outputAction">The action to invoke when the next in-order item is available to be output.</param>
56 internal ReorderingBuffer(object owningSource, Action<object, TOutput> outputAction)
58 // Validate and store internal arguments
59 Contract.Requires(owningSource != null, "Buffer must be associated with a source.");
60 Contract.Requires(outputAction != null, "Action required for when items are to be released.");
61 _owningSource = owningSource;
62 _outputAction = outputAction;
65 /// <summary>Stores the next item as it completes processing.</summary>
66 /// <param name="id">The ID of the item.</param>
67 /// <param name="item">The completed item.</param>
68 /// <param name="itemIsValid">Specifies whether the item is valid (true) or just a placeholder (false).</param>
69 internal void AddItem(long id, TOutput item, bool itemIsValid)
71 Contract.Requires(id != Common.INVALID_REORDERING_ID, "This ID should never have been handed out.");
72 Common.ContractAssertMonitorStatus(ValueLock, held: false);
74 // This may be called concurrently, so protect the buffer...
77 // If this is the next item we need in our ordering, output it.
78 if (_nextReorderedIdToOutput == id)
80 OutputNextItem(item, itemIsValid);
82 // Otherwise, we're using reordering and we're not ready for this item yet, so store
83 // it until we can use it.
86 Debug.Assert((ulong)id > (ulong)_nextReorderedIdToOutput, "Duplicate id.");
87 _reorderingBuffer.Add(id, new KeyValuePair<bool, TOutput>(itemIsValid, item));
93 /// Determines whether the specified id is next to be output, and if it is
94 /// and if the item is "trusted" (meaning it may be output into the output
95 /// action as-is), adds it.
97 /// <param name="id">The id of the item.</param>
98 /// <param name="item">The item.</param>
99 /// <param name="isTrusted">
100 /// Whether to allow the item to be output directly if it is the next item.
103 /// null if the item was added.
104 /// true if the item was not added but is next in line.
105 /// false if the item was not added and is not next in line.
107 internal bool? AddItemIfNextAndTrusted(long id, TOutput item, bool isTrusted)
109 Contract.Requires(id != Common.INVALID_REORDERING_ID, "This ID should never have been handed out.");
110 Common.ContractAssertMonitorStatus(ValueLock, held: false);
114 // If this is in the next item, try to take the fast path.
115 if (_nextReorderedIdToOutput == id)
117 // If we trust this data structure to be stored as-is,
118 // output it immediately. Otherwise, return that it is
119 // next to be output.
122 OutputNextItem(item, itemIsValid: true);
131 /// <summary>Informs the reordering buffer not to expect the message with the specified id.</summary>
132 /// <param name="id">The id of the message to be ignored.</param>
133 public void IgnoreItem(long id)
135 AddItem(id, default(TOutput), itemIsValid: false);
138 /// <summary>Outputs the item. The item must have already been confirmed to have the next ID.</summary>
139 /// <param name="theNextItem">The item to output.</param>
140 /// <param name="itemIsValid">Whether the item is valid.</param>
141 private void OutputNextItem(TOutput theNextItem, bool itemIsValid)
143 Common.ContractAssertMonitorStatus(ValueLock, held: true);
145 // Note that we're now looking for a different item, and pass this one through.
146 // Then release any items which may be pending.
147 _nextReorderedIdToOutput++;
148 if (itemIsValid) _outputAction(_owningSource, theNextItem);
150 // Try to get the next available item from the buffer and output it. Continue to do so
151 // until we run out of items in the reordering buffer or don't yet have the next ID buffered.
152 KeyValuePair<bool, TOutput> nextOutputItemWithValidity;
153 while (_reorderingBuffer.TryGetValue(_nextReorderedIdToOutput, out nextOutputItemWithValidity))
155 _reorderingBuffer.Remove(_nextReorderedIdToOutput);
156 _nextReorderedIdToOutput++;
157 if (nextOutputItemWithValidity.Key) _outputAction(_owningSource, nextOutputItemWithValidity.Value);
161 /// <summary>Gets a item count for debugging purposes.</summary>
162 private int CountForDebugging { get { return _reorderingBuffer.Count; } }
164 /// <summary>Provides a debugger type proxy for the buffer.</summary>
165 private sealed class DebugView
167 /// <summary>The buffer being debugged.</summary>
168 private readonly ReorderingBuffer<TOutput> _buffer;
170 /// <summary>Initializes the debug view.</summary>
171 /// <param name="buffer">The buffer being debugged.</param>
172 public DebugView(ReorderingBuffer<TOutput> buffer)
174 Contract.Requires(buffer != null, "Need a buffer with which to construct the debug view.");
178 /// <summary>Gets a dictionary of buffered items and their reordering IDs.</summary>
179 public Dictionary<long, KeyValuePair<Boolean, TOutput>> ItemsBuffered { get { return _buffer._reorderingBuffer; } }
180 /// <summary>Gets the next ID required for outputting.</summary>
181 public long NextIdRequired { get { return _buffer._nextReorderedIdToOutput; } }