Added support for NameFormat to all blocks
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / JoinBlock.cs
1 // JoinBlock.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25
26 using System;
27 using System.Threading.Tasks;
28 using System.Collections.Generic;
29
30 namespace System.Threading.Tasks.Dataflow
31 {
32         public sealed class JoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<T1, T2>>, ISourceBlock<Tuple<T1, T2>>, IDataflowBlock
33         {
34                 static readonly GroupingDataflowBlockOptions defaultOptions = new GroupingDataflowBlockOptions ();
35
36                 CompletionHelper compHelper = CompletionHelper.GetNew ();
37                 GroupingDataflowBlockOptions dataflowBlockOptions;
38                 TargetBuffer<Tuple<T1, T2>> targets = new TargetBuffer<Tuple<T1, T2>> ();
39                 MessageVault<Tuple<T1, T2>> vault = new MessageVault<Tuple<T1, T2>> ();
40                 MessageOutgoingQueue<Tuple<T1, T2>> outgoing;
41
42                 readonly JoinTarget<T1> target1;
43                 readonly JoinTarget<T2> target2;
44
45                 SpinLock targetLock = new SpinLock(false);
46
47                 DataflowMessageHeader headers;
48
49                 public JoinBlock () : this (defaultOptions)
50                 {
51
52                 }
53
54                 public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
55                 {
56                         if (dataflowBlockOptions == null)
57                                 throw new ArgumentNullException ("dataflowBlockOptions");
58
59                         this.dataflowBlockOptions = dataflowBlockOptions;
60                         target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
61                         target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
62                         outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (compHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
63                 }
64
65                 public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, bool unlinkAfterOne)
66                 {
67                         var result = targets.AddTarget (target, unlinkAfterOne);
68                         outgoing.ProcessForTarget (target, this, false, ref headers);
69                         return result;
70                 }
71
72                 public bool TryReceive (Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
73                 {
74                         return outgoing.TryReceive (filter, out item);
75                 }
76
77                 public bool TryReceiveAll (out IList<Tuple<T1, T2>> items)
78                 {
79                         return outgoing.TryReceiveAll (out items);
80                 }
81
82                 public Tuple<T1, T2> ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
83                 {
84                         return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
85                 }
86
87                 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
88                 {
89                         vault.ReleaseReservation (messageHeader, target);
90                 }
91
92                 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
93                 {
94                         return vault.ReserveMessage (messageHeader, target);
95                 }
96
97                 public void Complete ()
98                 {
99                         outgoing.Complete ();
100                 }
101
102                 public void Fault (Exception ex)
103                 {
104                         compHelper.Fault (ex);
105                 }
106
107                 public Task Completion {
108                         get {
109                                 return compHelper.Completion;
110                         }
111                 }
112
113                 // TODO: see if we can find a lockless implementation
114                 void SignalArrivalTargetImpl()
115                 {
116                         bool taken = false;
117                         T1 value1;
118                         T2 value2;
119
120                         try {
121                                 targetLock.Enter (ref taken);
122
123                                 if (target1.Buffer.Count == 0 || target2.Buffer.Count == 0)
124                                         return;
125
126                                 value1 = target1.Buffer.Take ();
127                                 value2 = target2.Buffer.Take ();
128                         } finally {
129                                 if (taken)
130                                         targetLock.Exit ();
131                         }
132
133                         TriggerMessage (value1, value2);
134                 }
135
136
137                 void TriggerMessage (T1 val1, T2 val2)
138                 {
139                         Tuple<T1, T2> tuple = Tuple.Create (val1, val2);
140                         ITargetBlock<Tuple<T1, T2>> target = targets.Current;
141
142                         if (target == null) {
143                                 outgoing.AddData (tuple);
144                         } else {
145                                 target.OfferMessage (headers.Increment (),
146                                                      tuple,
147                                                      this,
148                                                      false);
149                         }
150
151                         if (!outgoing.IsEmpty && (target = targets.Current) != null)
152                                 outgoing.ProcessForTarget (target, this, false, ref headers);
153                 }
154
155                 public ITargetBlock<T1> Target1 {
156                         get {
157                                 return target1;
158                         }
159                 }
160
161                 public ITargetBlock<T2> Target2 {
162                         get {
163                                 return target2;
164                         }
165                 }
166
167                 public override string ToString ()
168                 {
169                         return NameHelper.GetName (this, dataflowBlockOptions);
170                 }
171         }
172 }