Merge pull request #409 from Alkarex/patch-1
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / TargetCollection.cs
1 // TargetCollection.cs
2 //
3 // Copyright (c) 2012 Petr Onderka
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22
23 using System.Collections.Concurrent;
24 using System.Collections.Generic;
25
26 namespace System.Threading.Tasks.Dataflow {
27         /// <summary>
28         /// Base class for collection of target blocks for a source block.
29         /// Also handles sending messages to the target blocks.
30         /// </summary>
31         abstract class TargetCollectionBase<T> {
32                 /// <summary>
33                 /// Represents a target block with its options.
34                 /// </summary>
35                 protected class Target : IDisposable {
36                         readonly TargetCollectionBase<T> targetCollection;
37                         volatile int remainingMessages;
38                         readonly CancellationTokenSource cancellationTokenSource;
39
40                         public ITargetBlock<T> TargetBlock { get; private set; }
41
42                         public Target (TargetCollectionBase<T> targetCollection,
43                                        ITargetBlock<T> targetBlock, int maxMessages,
44                                        CancellationTokenSource cancellationTokenSource)
45                         {
46                                 TargetBlock = targetBlock;
47                                 this.targetCollection = targetCollection;
48                                 remainingMessages = maxMessages;
49                                 this.cancellationTokenSource = cancellationTokenSource;
50
51                                 Postponed = new AtomicBoolean ();
52                                 Reserved = new AtomicBoolean ();
53                         }
54
55                         /// <summary>
56                         /// Is called after a message was sent,  makes sure the linked is destroyed after
57                         /// <see cref="DataflowLinkOptions.MaxMessages"/> were sent.
58                         /// </summary>
59                         public void MessageSent()
60                         {
61                                 if (remainingMessages != -1)
62                                         remainingMessages--;
63                                 if (remainingMessages == 0)
64                                         Dispose ();
65                         }
66
67                         readonly AtomicBoolean disabled = new AtomicBoolean ();
68                         /// <summary>
69                         /// Is the link destroyed?
70                         /// </summary>
71                         public bool Disabled
72                         {
73                                 get { return disabled.Value; }
74                         }
75
76                         /// <summary>
77                         /// Destroys the link to this target.
78                         /// </summary>
79                         public void Dispose ()
80                         {
81                                 disabled.Value = true;
82
83                                 if (cancellationTokenSource != null)
84                                         cancellationTokenSource.Cancel ();
85
86                                 Target ignored;
87                                 targetCollection.TargetDictionary.TryRemove (TargetBlock, out ignored);
88
89                                 // to avoid memory leak; it could take a long time
90                                 // before this object is actually removed from the collection
91                                 TargetBlock = null;
92                         }
93
94                         /// <summary>
95                         /// Does this target have a postponed message?
96                         /// </summary>
97                         public AtomicBoolean Postponed { get; private set; }
98                         
99                         /// <summary>
100                         /// Does this target have a reserved message?
101                         /// </summary>
102                         /// <remarks>Used only by broadcast blocks.</remarks>
103                         public AtomicBoolean Reserved { get; private set; }
104                 }
105
106                 readonly ISourceBlock<T> block;
107                 readonly bool broadcast;
108                 readonly bool consumeToAccept;
109
110                 readonly ConcurrentQueue<Target> prependQueue = new ConcurrentQueue<Target> ();
111                 readonly ConcurrentQueue<Target> appendQueue = new ConcurrentQueue<Target> ();
112                 readonly LinkedList<Target> targets = new LinkedList<Target> ();
113
114                 protected readonly ConcurrentDictionary<ITargetBlock<T>, Target> TargetDictionary =
115                         new ConcurrentDictionary<ITargetBlock<T>, Target> ();
116
117                 // lastMessageHeaderId will be always accessed only from one thread
118                 long lastMessageHeaderId;
119                 // currentMessageHeaderId can be read from multiple threads at the same time
120                 long currentMessageHeaderId;
121
122                 bool firstOffering;
123                 T currentItem;
124
125                 protected TargetCollectionBase (ISourceBlock<T> block, bool broadcast, bool consumeToAccept)
126                 {
127                         this.block = block;
128                         this.broadcast = broadcast;
129                         this.consumeToAccept = consumeToAccept;
130                 }
131
132                 /// <summary>
133                 /// Adds a target block to send messages to.
134                 /// </summary>
135                 /// <returns>
136                 /// An object that can be used to destroy the link to the added target.
137                 /// </returns>
138                 public IDisposable AddTarget (ITargetBlock<T> targetBlock, DataflowLinkOptions options)
139                 {
140                         CancellationTokenSource cancellationTokenSource = null;
141                         if (options.PropagateCompletion) {
142                                 cancellationTokenSource = new CancellationTokenSource();
143                                 block.Completion.ContinueWith (t =>
144                                 {
145                                         if (t.IsFaulted)
146                                                 targetBlock.Fault (t.Exception);
147                                         else
148                                                 targetBlock.Complete ();
149                                 }, cancellationTokenSource.Token);
150                         }
151
152                         var target = new Target (
153                                 this, targetBlock, options.MaxMessages, cancellationTokenSource);
154                         TargetDictionary [targetBlock] = target;
155                         if (options.Append)
156                                 appendQueue.Enqueue (target);
157                         else
158                                 prependQueue.Enqueue (target);
159
160                         return target;
161                 }
162
163                 /// <summary>
164                 /// Sets the current item to be offered to targets
165                 /// </summary>
166                 public void SetCurrentItem (T item)
167                 {
168                         firstOffering = true;
169                         currentItem = item;
170                         Thread.VolatileWrite (ref currentMessageHeaderId, ++lastMessageHeaderId);
171
172                         ClearUnpostponed ();
173                 }
174
175                 /// <summary>
176                 /// Clears the collection of "unpostponed" targets.
177                 /// </summary>
178                 protected abstract void ClearUnpostponed ();
179
180                 /// <summary>
181                 /// Resets the current item to be offered to targets.
182                 /// This means there is currently nothing to offer.
183                 /// </summary>
184                 public void ResetCurrentItem ()
185                 {
186                         currentItem = default(T);
187                         Thread.VolatileWrite (ref currentMessageHeaderId, 0);
188                 }
189
190                 /// <summary>
191                 /// Is there an item to send right now?
192                 /// </summary>
193                 public bool HasCurrentItem {
194                         get { return Thread.VolatileRead (ref currentMessageHeaderId) != 0; }
195                 }
196
197                 /// <summary>
198                 /// Offers the current item to all eligible targets.
199                 /// </summary>
200                 /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
201                 public bool OfferItemToTargets ()
202                 {
203                         // is there an item to offer?
204                         if (!HasCurrentItem)
205                                 return false;
206
207                         var old = Tuple.Create (targets.First, targets.Last);
208
209                         do {
210                                 // order is important here, we want to make sure that prepended target
211                                 // added before appended target is processed first
212                                 var appended = PrependOrAppend (false);
213                                 var prepended = PrependOrAppend (true);
214
215                                 if (OfferItemToTargets (prepended))
216                                         return true;
217
218                                 if (firstOffering) {
219                                         if (OfferItemToTargets (old))
220                                                 return true;
221                                         firstOffering = false;
222                                 } else {
223                                         if (OfferItemToUnpostponed ())
224                                                 return true;
225                                 }
226
227                                 if (OfferItemToTargets (appended))
228                                         return true;
229                         } while (NeedsProcessing);
230
231                         return false;
232                 }
233
234                 /// <summary>
235                 /// Are there any targets that currently require a message to be sent to them?
236                 /// </summary>
237                 public bool NeedsProcessing {
238                         get {
239                                 return !appendQueue.IsEmpty || !prependQueue.IsEmpty
240                                        || !UnpostponedIsEmpty;
241                         }
242                 }
243
244                 /// <summary>
245                 /// Is the collection of unpostponed targets empty?
246                 /// </summary>
247                 protected abstract bool UnpostponedIsEmpty { get; }
248
249                 /// <summary>
250                 /// Prepends (appends) targets that should be prepended (appended) to the collection of targets.
251                 /// </summary>
252                 /// <param name="prepend"><c>true</c> to prepend, <c>false</c> to append.</param>
253                 /// <returns>
254                 /// Nodes that contain first and last target added to the list,
255                 /// or <c>null</c> if no nodes were added.
256                 /// </returns>
257                 Tuple<LinkedListNode<Target>, LinkedListNode<Target>> PrependOrAppend (
258                         bool prepend)
259                 {
260                         var queue = prepend ? prependQueue : appendQueue;
261
262                         if (queue.IsEmpty)
263                                 return null;
264
265                         LinkedListNode<Target> first = null;
266                         LinkedListNode<Target> last = null;
267
268                         Target target;
269                         while (queue.TryDequeue (out target)) {
270                                 var node = prepend
271                                                    ? targets.AddFirst (target)
272                                                    : targets.AddLast (target);
273                                 if (first == null)
274                                         first = node;
275                                 last = node;
276                         }
277
278                         return prepend
279                                        ? Tuple.Create (last, first)
280                                        : Tuple.Create (first, last);
281                 }
282
283                 /// <summary>
284                 /// Offers the current item to the targets between the given nodes (inclusive).
285                 /// </summary>
286                 /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
287                 bool OfferItemToTargets (
288                         Tuple<LinkedListNode<Target>, LinkedListNode<Target>> targetPair)
289                 {
290                         if (targetPair == null
291                             || targetPair.Item1 == null || targetPair.Item2 == null)
292                                 return false;
293
294                         var node = targetPair.Item1;
295                         while (node != targetPair.Item2.Next) {
296                                 if (node.Value.Disabled) {
297                                         var nodeToRemove = node;
298                                         node = node.Next;
299                                         targets.Remove (nodeToRemove);
300                                         continue;
301                                 }
302
303                                 if (OfferItem (node.Value) && !broadcast)
304                                         return true;
305
306                                 node = node.Next;
307                         }
308
309                         return false;
310                 }
311
312                 /// <summary>
313                 /// Offers the current item to unpostponed targets.
314                 /// </summary>
315                 /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
316                 protected abstract bool OfferItemToUnpostponed ();
317
318                 /// <summary>
319                 /// Offers the current item to the given target.
320                 /// </summary>
321                 /// <returns>Was the item accepted?</returns>
322                 protected bool OfferItem (Target target)
323                 {
324                         if (target.Reserved.Value)
325                                 return false;
326                         if (!broadcast && target.Postponed.Value)
327                                 return false;
328
329                         var result = target.TargetBlock.OfferMessage (
330                                 // volatile read is not necessary here,
331                                 // because currentMessageHeaderId is always written from this thread
332                                 new DataflowMessageHeader (currentMessageHeaderId), currentItem, block,
333                                 consumeToAccept);
334
335                         switch (result) {
336                         case DataflowMessageStatus.Accepted:
337                                 target.MessageSent ();
338                                 return true;
339                         case DataflowMessageStatus.Postponed:
340                                 target.Postponed.Value = true;
341                                 return false;
342                         case DataflowMessageStatus.DecliningPermanently:
343                                 target.Dispose ();
344                                 return false;
345                         default:
346                                 return false;
347                         }
348                 }
349
350                 /// <summary>
351                 /// Returns whether the given header corresponds to the current item.
352                 /// </summary>
353                 public bool VerifyHeader (DataflowMessageHeader header)
354                 {
355                         return header.Id == Thread.VolatileRead (ref currentMessageHeaderId);
356                 }
357         }
358
359         /// <summary>
360         /// Target collection for non-broadcast blocks.
361         /// </summary>
362         class TargetCollection<T> : TargetCollectionBase<T> {
363                 readonly ConcurrentQueue<Target> unpostponedTargets =
364                         new ConcurrentQueue<Target> ();
365
366                 public TargetCollection (ISourceBlock<T> block)
367                         : base (block, false, false)
368                 {
369                 }
370
371                 /// <summary>
372                 /// Is the collection of unpostponed targets empty?
373                 /// </summary>
374                 protected override bool UnpostponedIsEmpty {
375                         get { return unpostponedTargets.IsEmpty; }
376                 }
377
378                 /// <summary>
379                 /// Returns whether the given header corresponds to the current item
380                 /// and that the given target block postponed this item.
381                 /// </summary>
382                 public bool VerifyHeader (DataflowMessageHeader header, ITargetBlock<T> targetBlock)
383                 {
384                         return VerifyHeader (header)
385                                && TargetDictionary[targetBlock].Postponed.Value;
386                 }
387
388                 /// <summary>
389                 /// Unpostpones the given target.
390                 /// </summary>
391                 /// <param name="targetBlock">Target to unpostpone.</param>
392                 /// <param name="messageConsumed">Did the target consume an item?</param>
393                 public void UnpostponeTarget (ITargetBlock<T> targetBlock, bool messageConsumed)
394                 {
395                         Target target;
396                         if (!TargetDictionary.TryGetValue (targetBlock, out target))
397                                 return;
398
399                         if (messageConsumed)
400                                 target.MessageSent ();
401                         unpostponedTargets.Enqueue (target);
402
403                         target.Postponed.Value = false;
404                 }
405
406                 /// <summary>
407                 /// Clears the collection of "unpostponed" targets.
408                 /// </summary>
409                 protected override void ClearUnpostponed ()
410                 {
411                         Target ignored;
412                         while (unpostponedTargets.TryDequeue (out ignored)) {
413                         }
414                 }
415
416                 /// <summary>
417                 /// Offers the current item to unpostponed targets.
418                 /// </summary>
419                 /// <returns>Was the item accepted?</returns>
420                 protected override bool OfferItemToUnpostponed ()
421                 {
422                         Target target;
423                         while (unpostponedTargets.TryDequeue (out target)) {
424                                 if (!target.Disabled && OfferItem (target))
425                                         return true;
426                         }
427
428                         return false;
429                 }
430         }
431
432         /// <summary>
433         /// Target collection for broadcast blocks.
434         /// </summary>
435         class BroadcastTargetCollection<T> : TargetCollectionBase<T> {
436                 // it's necessary to store the headers because of a race between
437                 // UnpostponeTargetConsumed and SetCurrentItem
438                 readonly ConcurrentQueue<Tuple<Target, DataflowMessageHeader>>
439                         unpostponedTargets =
440                                 new ConcurrentQueue<Tuple<Target, DataflowMessageHeader>> ();
441
442                 public BroadcastTargetCollection (ISourceBlock<T> block, bool consumeToAccept)
443                         : base (block, true, consumeToAccept)
444                 {
445                 }
446
447                 /// <summary>
448                 /// Is the collection of unpostponed targets empty?
449                 /// </summary>
450                 protected override bool UnpostponedIsEmpty {
451                         get { return unpostponedTargets.IsEmpty; }
452                 }
453
454                 /// <summary>
455                 /// Marks the target as having a reserved message.
456                 /// </summary>
457                 public void ReserveTarget (ITargetBlock<T> targetBlock)
458                 {
459                         TargetDictionary [targetBlock].Reserved.Value = true;
460                 }
461
462                 /// <summary>
463                 /// Unpostpone target after it consumed a message.
464                 /// </summary>
465                 /// <param name="targetBlock">The target to unpostpone.</param>
466                 /// <param name="header">Header of the message the target consumed.</param>
467                 public void UnpostponeTargetConsumed (ITargetBlock<T> targetBlock,
468                                                       DataflowMessageHeader header)
469                 {
470                         Target target = TargetDictionary [targetBlock];
471
472                         target.MessageSent ();
473                         unpostponedTargets.Enqueue (Tuple.Create (target, header));
474
475                         target.Postponed.Value = false;
476                         target.Reserved.Value = false;
477                 }
478
479                 /// <summary>
480                 /// Unpostpone target in the case when it didn't successfuly consume a message.
481                 /// </summary>
482                 public void UnpostponeTargetNotConsumed (ITargetBlock<T> targetBlock)
483                 {
484                         Target target;
485                         if (!TargetDictionary.TryGetValue (targetBlock, out target))
486                                 return;
487
488                         unpostponedTargets.Enqueue (Tuple.Create (target,
489                                 new DataflowMessageHeader ()));
490
491                         target.Postponed.Value = false;
492                         target.Reserved.Value = false;
493                 }
494
495                 /// <summary>
496                 /// Clears the collection of "unpostponed" targets.
497                 /// </summary>
498                 protected override void ClearUnpostponed ()
499                 {
500                         Tuple<Target, DataflowMessageHeader> ignored;
501                         while (unpostponedTargets.TryDequeue (out ignored)) {
502                         }
503                 }
504
505                 /// <summary>
506                 /// Offers the current item to unpostponed targets.
507                 /// </summary>
508                 /// <returns>Always <c>false</c>.</returns>
509                 protected override bool OfferItemToUnpostponed ()
510                 {
511                         Tuple<Target, DataflowMessageHeader> tuple;
512                         while (unpostponedTargets.TryDequeue (out tuple)) {
513                                 // offer to unconditionaly unpostponed
514                                 // and those that consumed some old value
515                                 if (!tuple.Item1.Disabled
516                                     && (!tuple.Item2.IsValid || !VerifyHeader (tuple.Item2)))
517                                         OfferItem (tuple.Item1);
518                         }
519
520                         return false;
521                 }
522         }
523 }