Fixed completion of JoinTargets
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / JoinTarget.cs
1 // JoinBlock.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22
23 using System.Collections.Concurrent;
24
25 namespace System.Threading.Tasks.Dataflow {
26         internal class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
27         {
28                 readonly IDataflowBlock joinBlock;
29                 readonly Action signal;
30
31                 public JoinTarget (
32                         IDataflowBlock joinBlock, Action signal, CompletionHelper helper,
33                         Func<bool> externalCompleteTester, DataflowBlockOptions options,
34                         bool greedy)
35                         : base (null, new BlockingCollection<TTarget> (), helper, externalCompleteTester,
36                                 options, greedy)
37                 {
38                         this.joinBlock = joinBlock;
39                         this.signal = signal;
40                         Target = this;
41                 }
42
43                 protected override void EnsureProcessing (bool newItem)
44                 {
45                         signal ();
46                 }
47
48                 public BlockingCollection<TTarget> Buffer {
49                         get {
50                                 return MessageQueue;
51                         }
52                 }
53
54                 DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
55                                                                           TTarget messageValue,
56                                                                           ISourceBlock<TTarget> source,
57                                                                           bool consumeToAccept)
58                 {
59                         return OfferMessage (messageHeader, messageValue, source, consumeToAccept);
60                 }
61
62                 void IDataflowBlock.Complete ()
63                 {
64                         Complete ();
65                 }
66
67                 Task IDataflowBlock.Completion {
68                         get {
69                                 throw new NotSupportedException();
70                         }
71                 }
72
73                 void IDataflowBlock.Fault (Exception e)
74                 {
75                         joinBlock.Fault (e);
76                 }
77         }
78 }