[System.Threading.Tasks.Dataflow] Replace implementation with CoreFx version
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / CoreFxSources / Internal / ReorderingBuffer.cs
1 // Copyright (c) Microsoft. All rights reserved.
2 // Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
5 //
6 // ReorderingBuffer.cs
7 //
8 //
9 // An intermediate buffer that ensures messages are output in the right order.
10 // Used by blocks (e.g. TransformBlock, TransformManyBlock) when operating in 
11 // parallel modes that could result in messages being processed out of order.
12 //
13 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14
15 using System.Collections.Generic;
16 using System.Diagnostics;
17 using System.Diagnostics.Contracts;
18 using System.Linq;
19
20 namespace System.Threading.Tasks.Dataflow.Internal
21 {
22     /// <summary>Base interface for reordering buffers.</summary>
23     internal interface IReorderingBuffer
24     {
25         /// <summary>Informs the reordering buffer not to expect the message with the specified id.</summary>
26         /// <param name="id">The id of the message to be ignored.</param>
27         void IgnoreItem(long id);
28     }
29
30     /// <summary>Provides a buffer that reorders items according to their incoming IDs.</summary>
31     /// <typeparam name="TOutput">Specifies the type of data stored in the items being reordered.</typeparam>
32     /// <remarks>
33     /// This type expects the first item to be ID==0 and for all subsequent items
34     /// to increase IDs sequentially.
35     /// </remarks>
36     [DebuggerDisplay("Count={CountForDebugging}")]
37     [DebuggerTypeProxy(typeof(ReorderingBuffer<>.DebugView))]
38     internal sealed class ReorderingBuffer<TOutput> : IReorderingBuffer
39     {
40         /// <summary>The source that owns this reordering buffer.</summary>
41         private readonly object _owningSource;
42         /// <summary>A reordering buffer used when parallelism is employed and items may be completed out-of-order.</summary>
43         /// <remarks>Also serves as the sync object to protect the contents of this class.</remarks>
44         private readonly Dictionary<long, KeyValuePair<bool, TOutput>> _reorderingBuffer = new Dictionary<long, KeyValuePair<bool, TOutput>>();
45         /// <summary>Action used to output items in order.</summary>
46         private readonly Action<object, TOutput> _outputAction;
47         /// <summary>The ID of the next item that should be released from the reordering buffer.</summary>
48         private long _nextReorderedIdToOutput = 0;
49
50         /// <summary>Gets the object used to synchronize all access to the reordering buffer's internals.</summary>
51         private object ValueLock { get { return _reorderingBuffer; } }
52
53         /// <summary>Initializes the reordering buffer.</summary>
54         /// <param name="owningSource">The source that owns this reordering buffer.</param>
55         /// <param name="outputAction">The action to invoke when the next in-order item is available to be output.</param>
56         internal ReorderingBuffer(object owningSource, Action<object, TOutput> outputAction)
57         {
58             // Validate and store internal arguments
59             Contract.Requires(owningSource != null, "Buffer must be associated with a source.");
60             Contract.Requires(outputAction != null, "Action required for when items are to be released.");
61             _owningSource = owningSource;
62             _outputAction = outputAction;
63         }
64
65         /// <summary>Stores the next item as it completes processing.</summary>
66         /// <param name="id">The ID of the item.</param>
67         /// <param name="item">The completed item.</param>
68         /// <param name="itemIsValid">Specifies whether the item is valid (true) or just a placeholder (false).</param>
69         internal void AddItem(long id, TOutput item, bool itemIsValid)
70         {
71             Contract.Requires(id != Common.INVALID_REORDERING_ID, "This ID should never have been handed out.");
72             Common.ContractAssertMonitorStatus(ValueLock, held: false);
73
74             // This may be called concurrently, so protect the buffer...
75             lock (ValueLock)
76             {
77                 // If this is the next item we need in our ordering, output it.
78                 if (_nextReorderedIdToOutput == id)
79                 {
80                     OutputNextItem(item, itemIsValid);
81                 }
82                 // Otherwise, we're using reordering and we're not ready for this item yet, so store
83                 // it until we can use it.
84                 else
85                 {
86                     Debug.Assert((ulong)id > (ulong)_nextReorderedIdToOutput, "Duplicate id.");
87                     _reorderingBuffer.Add(id, new KeyValuePair<bool, TOutput>(itemIsValid, item));
88                 }
89             }
90         }
91
92         /// <summary>
93         /// Determines whether the specified id is next to be output, and if it is
94         /// and if the item is "trusted" (meaning it may be output into the output
95         /// action as-is), adds it.
96         /// </summary>
97         /// <param name="id">The id of the item.</param>
98         /// <param name="item">The item.</param>
99         /// <param name="isTrusted">
100         /// Whether to allow the item to be output directly if it is the next item.
101         /// </param>
102         /// <returns>
103         /// null if the item was added.
104         /// true if the item was not added but is next in line.
105         /// false if the item was not added and is not next in line.
106         /// </returns>
107         internal bool? AddItemIfNextAndTrusted(long id, TOutput item, bool isTrusted)
108         {
109             Contract.Requires(id != Common.INVALID_REORDERING_ID, "This ID should never have been handed out.");
110             Common.ContractAssertMonitorStatus(ValueLock, held: false);
111
112             lock (ValueLock)
113             {
114                 // If this is in the next item, try to take the fast path.
115                 if (_nextReorderedIdToOutput == id)
116                 {
117                     // If we trust this data structure to be stored as-is,
118                     // output it immediately.  Otherwise, return that it is
119                     // next to be output.
120                     if (isTrusted)
121                     {
122                         OutputNextItem(item, itemIsValid: true);
123                         return null;
124                     }
125                     else return true;
126                 }
127                 else return false;
128             }
129         }
130
131         /// <summary>Informs the reordering buffer not to expect the message with the specified id.</summary>
132         /// <param name="id">The id of the message to be ignored.</param>
133         public void IgnoreItem(long id)
134         {
135             AddItem(id, default(TOutput), itemIsValid: false);
136         }
137
138         /// <summary>Outputs the item.  The item must have already been confirmed to have the next ID.</summary>
139         /// <param name="theNextItem">The item to output.</param>
140         /// <param name="itemIsValid">Whether the item is valid.</param>
141         private void OutputNextItem(TOutput theNextItem, bool itemIsValid)
142         {
143             Common.ContractAssertMonitorStatus(ValueLock, held: true);
144
145             // Note that we're now looking for a different item, and pass this one through.
146             // Then release any items which may be pending.
147             _nextReorderedIdToOutput++;
148             if (itemIsValid) _outputAction(_owningSource, theNextItem);
149
150             // Try to get the next available item from the buffer and output it.  Continue to do so
151             // until we run out of items in the reordering buffer or don't yet have the next ID buffered.
152             KeyValuePair<bool, TOutput> nextOutputItemWithValidity;
153             while (_reorderingBuffer.TryGetValue(_nextReorderedIdToOutput, out nextOutputItemWithValidity))
154             {
155                 _reorderingBuffer.Remove(_nextReorderedIdToOutput);
156                 _nextReorderedIdToOutput++;
157                 if (nextOutputItemWithValidity.Key) _outputAction(_owningSource, nextOutputItemWithValidity.Value);
158             }
159         }
160
161         /// <summary>Gets a item count for debugging purposes.</summary>
162         private int CountForDebugging { get { return _reorderingBuffer.Count; } }
163
164         /// <summary>Provides a debugger type proxy for the buffer.</summary>
165         private sealed class DebugView
166         {
167             /// <summary>The buffer being debugged.</summary>
168             private readonly ReorderingBuffer<TOutput> _buffer;
169
170             /// <summary>Initializes the debug view.</summary>
171             /// <param name="buffer">The buffer being debugged.</param>
172             public DebugView(ReorderingBuffer<TOutput> buffer)
173             {
174                 Contract.Requires(buffer != null, "Need a buffer with which to construct the debug view.");
175                 _buffer = buffer;
176             }
177
178             /// <summary>Gets a dictionary of buffered items and their reordering IDs.</summary>
179             public Dictionary<long, KeyValuePair<Boolean, TOutput>> ItemsBuffered { get { return _buffer._reorderingBuffer; } }
180             /// <summary>Gets the next ID required for outputting.</summary>
181             public long NextIdRequired { get { return _buffer._nextReorderedIdToOutput; } }
182         }
183     }
184 }