[System] Fixes UdpClient.Receive with IPv6 endpoint
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / MessageBox.cs
1 // MessageBox.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23
24 using System.Collections.Concurrent;
25 using System.Linq;
26
27 namespace System.Threading.Tasks.Dataflow {
28         /// <summary>
29         /// In MessageBox we store message that have been offered to us so that they can be
30         /// later processed 
31         /// </summary>
32         internal abstract class MessageBox<TInput> {
33                 protected ITargetBlock<TInput> Target { get; set; }
34                 protected CompletionHelper CompHelper { get; private set; }
35                 readonly Func<bool> externalCompleteTester;
36                 readonly DataflowBlockOptions options;
37                 readonly bool greedy;
38                 readonly Func<bool> canAccept;
39
40                 readonly ConcurrentDictionary<ISourceBlock<TInput>, DataflowMessageHeader>
41                         postponedMessages =
42                                 new ConcurrentDictionary<ISourceBlock<TInput>, DataflowMessageHeader> ();
43                 int itemCount;
44                 readonly AtomicBoolean postponedProcessing = new AtomicBoolean ();
45
46                 // these two fields are used only in one special case
47                 SpinLock consumingLock;
48                 // this is necessary, because canAccept is not pure
49                 bool canAcceptFromBefore;
50
51                 protected BlockingCollection<TInput> MessageQueue { get; private set; }
52
53                 protected MessageBox (
54                         ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
55                         CompletionHelper compHelper, Func<bool> externalCompleteTester,
56                         DataflowBlockOptions options, bool greedy = true, Func<bool> canAccept = null)
57                 {
58                         this.Target = target;
59                         this.CompHelper = compHelper;
60                         this.MessageQueue = messageQueue;
61                         this.externalCompleteTester = externalCompleteTester;
62                         this.options = options;
63                         this.greedy = greedy;
64                         this.canAccept = canAccept;
65                 }
66
67                 public DataflowMessageStatus OfferMessage (
68                         DataflowMessageHeader messageHeader, TInput messageValue,
69                         ISourceBlock<TInput> source, bool consumeToAccept)
70                 {
71                         if (!messageHeader.IsValid)
72                                 throw new ArgumentException ("The messageHeader is not valid.",
73                                         "messageHeader");
74                         if (consumeToAccept && source == null)
75                                 throw new ArgumentException (
76                                         "consumeToAccept may only be true if provided with a non-null source.",
77                                         "consumeToAccept");
78
79                         if (MessageQueue.IsAddingCompleted || !CompHelper.CanRun)
80                                 return DataflowMessageStatus.DecliningPermanently;
81
82                         var full = options.BoundedCapacity != -1
83                                    && Volatile.Read (ref itemCount) >= options.BoundedCapacity;
84                         if (!greedy || full) {
85                                 if (source == null)
86                                         return DataflowMessageStatus.Declined;
87
88                                 postponedMessages [source] = messageHeader;
89
90                                 // necessary to avoid race condition
91                                 DecreaseCount (0);
92
93                                 if (!greedy && !full)
94                                         EnsureProcessing (true);
95                                 
96                                 return DataflowMessageStatus.Postponed;
97                         }
98
99                         // in this case, we need to use locking to make sure
100                         // we don't consume when we can't accept
101                         if (consumeToAccept && canAccept != null) {
102                                 bool lockTaken = false;
103                                 try {
104                                         consumingLock.Enter (ref lockTaken);
105                                         if (!canAcceptFromBefore && !canAccept ())
106                                                 return DataflowMessageStatus.DecliningPermanently;
107
108                                         bool consummed;
109                                         messageValue = source.ConsumeMessage (messageHeader, Target, out consummed);
110                                         if (!consummed) {
111                                                 canAcceptFromBefore = true;
112                                                 return DataflowMessageStatus.NotAvailable;
113                                         }
114
115                                         canAcceptFromBefore = false;
116                                 } finally {
117                                         if (lockTaken)
118                                                 consumingLock.Exit ();
119                                 }
120                         } else {
121                                 if (consumeToAccept) {
122                                         bool consummed;
123                                         messageValue = source.ConsumeMessage (messageHeader, Target, out consummed);
124                                         if (!consummed)
125                                                 return DataflowMessageStatus.NotAvailable;
126                                 }
127
128                                 if (canAccept != null && !canAccept ())
129                                         return DataflowMessageStatus.DecliningPermanently;
130                         }
131
132                         try {
133                                 MessageQueue.Add (messageValue);
134                         } catch (InvalidOperationException) {
135                                 // This is triggered either if the underlying collection didn't accept the item
136                                 // or if the messageQueue has been marked complete, either way it corresponds to a false
137                                 return DataflowMessageStatus.DecliningPermanently;
138                         }
139
140                         IncreaseCount ();
141
142                         EnsureProcessing (true);
143
144                         VerifyCompleteness ();
145
146                         return DataflowMessageStatus.Accepted;
147                 }
148
149                 /// <summary>
150                 /// Increses the count of items in the block by 1.
151                 /// </summary>
152                 public void IncreaseCount ()
153                 {
154                         Interlocked.Increment (ref itemCount);
155                 }
156
157                 /// <summary>
158                 /// Decreses the number of items in the block by the given count.
159                 /// </summary>
160                 /// <remarks>
161                 /// The <paramref name="count"/> parameter is used when one object
162                 /// can represent many items, like a batch in <see cref="BatchBlock{T}"/>.
163                 /// </remarks>
164                 public void DecreaseCount (int count = 1)
165                 {
166                         int decreased = Interlocked.Add (ref itemCount, -count);
167
168                         // if BoundedCapacity is -1, there is no need to do this
169                         if (decreased < options.BoundedCapacity && !postponedMessages.IsEmpty) {
170                                 if (greedy)
171                                         EnsurePostponedProcessing ();
172                                 else
173                                         EnsureProcessing (false);
174                         }
175                 }
176
177                 /// <summary>
178                 /// The number of messages that were postponed
179                 /// and can be attempted to be consumed.
180                 /// </summary>
181                 public int PostponedMessagesCount {
182                         get { return postponedMessages.Count; }
183                 }
184
185                 /// <summary>
186                 /// Reserves a message from those that were postponed.
187                 /// Does not guarantee any order of the messages being reserved.
188                 /// </summary>
189                 /// <returns>
190                 /// An object representing the reservation on success,
191                 /// <c>null</c> on failure.
192                 /// </returns>
193                 public Tuple<ISourceBlock<TInput>, DataflowMessageHeader> ReserveMessage()
194                 {
195                         while (!postponedMessages.IsEmpty) {
196                                 // KeyValuePair is a struct, so default value is not null
197                                 var block = postponedMessages.FirstOrDefault () .Key;
198
199                                 // collection is empty
200                                 if (block == null)
201                                         break;
202
203                                 DataflowMessageHeader header;
204                                 bool removed = postponedMessages.TryRemove (block, out header);
205
206                                 // another thread was faster, try again
207                                 if (!removed)
208                                         continue;
209
210                                 bool reserved = block.ReserveMessage (header, Target);
211                                 if (reserved)
212                                         return Tuple.Create (block, header);
213                         }
214
215                         return null;
216                 }
217
218                 /// <summary>
219                 /// Releases the given reservation.
220                 /// </summary>
221                 public void RelaseReservation(Tuple<ISourceBlock<TInput>, DataflowMessageHeader> reservation)
222                 {
223                         reservation.Item1.ReleaseReservation (reservation.Item2, Target);
224                 }
225
226                 /// <summary>
227                 /// Consumes previously reserved item.
228                 /// </summary>
229                 public TInput ConsumeReserved(Tuple<ISourceBlock<TInput>, DataflowMessageHeader> reservation)
230                 {
231                         bool consumed;
232                         return reservation.Item1.ConsumeMessage (
233                                 reservation.Item2, Target, out consumed);
234                 }
235
236                 /// <summary>
237                 /// Makes sure retrieving items that were postponed,
238                 /// because they would exceed <see cref="DataflowBlockOptions.BoundedCapacity"/>,
239                 /// is currently running.
240                 /// </summary>
241                 void EnsurePostponedProcessing ()
242                 {
243                         if (postponedProcessing.TrySet())
244                                 Task.Factory.StartNew (RetrievePostponed, options.CancellationToken,
245                                         TaskCreationOptions.PreferFairness, options.TaskScheduler);
246                 }
247
248                 /// <summary>
249                 /// Retrieves items that were postponed,
250                 /// because they would exceed <see cref="DataflowBlockOptions.BoundedCapacity"/>.
251                 /// </summary>
252                 void RetrievePostponed ()
253                 {
254                         // BoundedCapacity can't be -1 here, because in that case there would be no postponing
255                         while (Volatile.Read (ref itemCount) < options.BoundedCapacity
256                                && !postponedMessages.IsEmpty && !MessageQueue.IsAddingCompleted) {
257                                 var block = postponedMessages.First ().Key;
258                                 DataflowMessageHeader header;
259                                 postponedMessages.TryRemove (block, out header);
260
261                                 bool consumed;
262                                 var item = block.ConsumeMessage (header, Target, out consumed);
263                                 if (consumed) {
264                                         try {
265                                                 MessageQueue.Add (item);
266                                                 IncreaseCount ();
267                                                 EnsureProcessing (false);
268                                         } catch (InvalidOperationException) {
269                                                 break;
270                                         }
271                                 }
272                         }
273
274                         // release all postponed messages
275                         if (MessageQueue.IsAddingCompleted) {
276                                 while (!postponedMessages.IsEmpty) {
277                                         var block = postponedMessages.First ().Key;
278                                         DataflowMessageHeader header;
279                                         postponedMessages.TryRemove (block, out header);
280
281                                         if (block.ReserveMessage (header, Target))
282                                                 block.ReleaseReservation (header, Target);
283                                 }
284                         }
285
286                         postponedProcessing.Value = false;
287
288                         // because of race
289                         if ((Volatile.Read (ref itemCount) < options.BoundedCapacity
290                              || MessageQueue.IsAddingCompleted)
291                             && !postponedMessages.IsEmpty)
292                                 EnsurePostponedProcessing ();
293                 }
294
295                 /// <summary>
296                 /// Makes sure the input queue is processed the way it needs to.
297                 /// </summary>
298                 /// <param name="newItem">Was new item just added?</param>
299                 protected abstract void EnsureProcessing (bool newItem);
300
301                 /// <summary>
302                 /// Completes the box, no new messages will be accepted.
303                 /// Also starts the process of completing the output queue.
304                 /// </summary>
305                 public void Complete ()
306                 {
307                         // Make message queue complete
308                         MessageQueue.CompleteAdding ();
309                         OutgoingQueueComplete ();
310                         VerifyCompleteness ();
311
312                         if (!postponedMessages.IsEmpty)
313                                 EnsurePostponedProcessing ();
314                 }
315
316                 /// <summary>
317                 /// Notifies that outgoing queue should be completed, if possible.
318                 /// </summary>
319                 protected virtual void OutgoingQueueComplete ()
320                 {
321                 }
322
323                 /// <summary>
324                 /// Makes sure the block is completed if it should be.
325                 /// </summary>
326                 protected virtual void VerifyCompleteness ()
327                 {
328                         if (MessageQueue.IsCompleted && externalCompleteTester ())
329                                 CompHelper.Complete ();
330                 }
331         }
332 }