Update mcs/class/Commons.Xml.Relaxng/Commons.Xml.Relaxng/RelaxngPattern.cs
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / JoinBlock.cs
1 // JoinBlock.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25
26 using System;
27 using System.Threading.Tasks;
28 using System.Collections.Generic;
29 using System.Collections.Concurrent;
30
31 namespace System.Threading.Tasks.Dataflow
32 {
33         public sealed class JoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<T1, T2>>, ISourceBlock<Tuple<T1, T2>>, IDataflowBlock
34         {
35                 static readonly GroupingDataflowBlockOptions defaultOptions = new GroupingDataflowBlockOptions ();
36
37                 CompletionHelper compHelper = CompletionHelper.GetNew ();
38                 GroupingDataflowBlockOptions dataflowBlockOptions;
39                 TargetBuffer<Tuple<T1, T2>> targets = new TargetBuffer<Tuple<T1, T2>> ();
40                 MessageVault<Tuple<T1, T2>> vault = new MessageVault<Tuple<T1, T2>> ();
41                 MessageOutgoingQueue<Tuple<T1, T2>> outgoing;
42
43                 JoinTarget<T1> target1;
44                 JoinTarget<T2> target2;
45
46                 DataflowMessageHeader headers;
47
48                 public JoinBlock () : this (defaultOptions)
49                 {
50
51                 }
52
53                 public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
54                 {
55                         if (dataflowBlockOptions == null)
56                                 throw new ArgumentNullException ("dataflowBlockOptions");
57
58                         this.dataflowBlockOptions = dataflowBlockOptions;
59                         this.target1 = new JoinTarget<T1> (this, SignalArrivalTarget1, new BlockingCollection<T1> (), compHelper);
60                         this.target2 = new JoinTarget<T2> (this, SignalArrivalTarget2, new BlockingCollection<T2> (), compHelper);
61                         this.outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (compHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
62                 }
63
64                 public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, bool unlinkAfterOne)
65                 {
66                         var result = targets.AddTarget (target, unlinkAfterOne);
67                         outgoing.ProcessForTarget (target, this, false, ref headers);
68                         return result;
69                 }
70
71                 public bool TryReceive (Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
72                 {
73                         return outgoing.TryReceive (filter, out item);
74                 }
75
76                 public bool TryReceiveAll (out IList<Tuple<T1, T2>> items)
77                 {
78                         return outgoing.TryReceiveAll (out items);
79                 }
80
81                 public Tuple<T1, T2> ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
82                 {
83                         return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
84                 }
85
86                 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
87                 {
88                         vault.ReleaseReservation (messageHeader, target);
89                 }
90
91                 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
92                 {
93                         return vault.ReserveMessage (messageHeader, target);
94                 }
95
96                 public void Complete ()
97                 {
98                         outgoing.Complete ();
99                 }
100
101                 public void Fault (Exception ex)
102                 {
103                         compHelper.Fault (ex);
104                 }
105
106                 public Task Completion {
107                         get {
108                                 return compHelper.Completion;
109                         }
110                 }
111
112                 void SignalArrivalTarget1 ()
113                 {
114                         T2 val2;
115                         if (target2.Buffer.TryTake (out val2)) {
116                                 T1 val1 = target1.Buffer.Take ();
117                                 TriggerMessage (val1, val2);
118                         }
119                 }
120
121                 void SignalArrivalTarget2 ()
122                 {
123                         T1 val1;
124                         if (target1.Buffer.TryTake (out val1)) {
125                                 T2 val2 = target2.Buffer.Take ();
126                                 TriggerMessage (val1, val2);
127                         }
128                 }
129
130                 void TriggerMessage (T1 val1, T2 val2)
131                 {
132                         Tuple<T1, T2> tuple = Tuple.Create (val1, val2);
133                         ITargetBlock<Tuple<T1, T2>> target = targets.Current;
134
135                         if (target == null) {
136                                 outgoing.AddData (tuple);
137                         } else {
138                                 target.OfferMessage (headers.Increment (),
139                                                      tuple,
140                                                      this,
141                                                      false);
142                         }
143
144                         if (!outgoing.IsEmpty && (target = targets.Current) != null)
145                                 outgoing.ProcessForTarget (target, this, false, ref headers);
146                 }
147
148                 class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
149                 {
150                         JoinBlock<T1, T2> joinBlock;
151                         BlockingCollection<TTarget> buffer;
152                         Action signal;
153
154                         public JoinTarget (JoinBlock<T1, T2> joinBlock, Action signal, BlockingCollection<TTarget> buffer, CompletionHelper helper)
155                         : base (buffer, helper, () => joinBlock.outgoing.IsCompleted)
156                         {
157                                 this.joinBlock = joinBlock;
158                                 this.buffer = buffer;
159                                 this.signal = signal;
160                         }
161
162                         protected override void EnsureProcessing ()
163                         {
164                                 signal ();
165                         }
166
167                         public BlockingCollection<TTarget> Buffer {
168                                 get {
169                                         return buffer;
170                                 }
171                         }
172
173                         DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
174                                                                                   TTarget messageValue,
175                                                                                   ISourceBlock<TTarget> source,
176                                                                                   bool consumeToAccept)
177                         {
178                                 return OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
179                         }
180
181                         void IDataflowBlock.Complete ()
182                         {
183                                 Complete ();
184                         }
185
186                         Task IDataflowBlock.Completion {
187                                 get {
188                                         return joinBlock.Completion;
189                                 }
190                         }
191
192                         void IDataflowBlock.Fault (Exception e)
193                         {
194                                 joinBlock.Fault (e);
195                         }
196                 }
197
198                 public ITargetBlock<T1> Target1 {
199                         get {
200                                 return target1;
201                         }
202                 }
203
204                 public ITargetBlock<T2> Target2 {
205                         get {
206                                 return target2;
207                         }
208                 }
209         }
210 }
211