Update mcs/class/Commons.Xml.Relaxng/Commons.Xml.Relaxng/RelaxngPattern.cs
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / TransformManyBlock.cs
1 // TransformManyBlock.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25
26 using System;
27 using System.Threading.Tasks;
28 using System.Collections.Generic;
29 using System.Collections.Concurrent;
30
31 namespace System.Threading.Tasks.Dataflow
32 {
33         public sealed class TransformManyBlock<TInput, TOutput> :
34                 IPropagatorBlock<TInput, TOutput>, ITargetBlock<TInput>, IDataflowBlock, ISourceBlock<TOutput>, IReceivableSourceBlock<TOutput>
35         {
36                 static readonly ExecutionDataflowBlockOptions defaultOptions = new ExecutionDataflowBlockOptions ();
37
38                 CompletionHelper compHelper = CompletionHelper.GetNew ();
39                 BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
40                 MessageBox<TInput> messageBox;
41                 MessageVault<TOutput> vault;
42                 ExecutionDataflowBlockOptions dataflowBlockOptions;
43                 readonly Func<TInput, IEnumerable<TOutput>> transformer;
44                 MessageOutgoingQueue<TOutput> outgoing;
45                 TargetBuffer<TOutput> targets = new TargetBuffer<TOutput> ();
46                 DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
47
48                 public TransformManyBlock (Func<TInput, IEnumerable<TOutput>> transformer) : this (transformer, defaultOptions)
49                 {
50
51                 }
52
53                 public TransformManyBlock (Func<TInput, IEnumerable<TOutput>> transformer, ExecutionDataflowBlockOptions dataflowBlockOptions)
54                 {
55                         if (dataflowBlockOptions == null)
56                                 throw new ArgumentNullException ("dataflowBlockOptions");
57
58                         this.transformer = transformer;
59                         this.dataflowBlockOptions = dataflowBlockOptions;
60                         this.messageBox = new ExecutingMessageBox<TInput> (messageQueue,
61                                                                            compHelper,
62                                                                            () => outgoing.IsCompleted,
63                                                                            TransformProcess,
64                                                                            dataflowBlockOptions);
65                         this.outgoing = new MessageOutgoingQueue<TOutput> (compHelper, () => messageQueue.IsCompleted);
66                         this.vault = new MessageVault<TOutput> ();
67                 }
68
69                 public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
70                                                            TInput messageValue,
71                                                            ISourceBlock<TInput> source,
72                                                            bool consumeToAccept)
73                 {
74                         return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
75                 }
76
77                 public IDisposable LinkTo (ITargetBlock<TOutput> target, bool unlinkAfterOne)
78                 {
79                         var result = targets.AddTarget (target, unlinkAfterOne);
80                         outgoing.ProcessForTarget (target, this, false, ref headers);
81                         return result;
82                 }
83
84                 public TOutput ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed)
85                 {
86                         return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
87                 }
88
89                 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
90                 {
91                         vault.ReleaseReservation (messageHeader, target);
92                 }
93
94                 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
95                 {
96                         return vault.ReserveMessage (messageHeader, target);
97                 }
98
99                 public bool TryReceive (Predicate<TOutput> filter, out TOutput item)
100                 {
101                         return outgoing.TryReceive (filter, out item);
102                 }
103
104                 public bool TryReceiveAll (out IList<TOutput> items)
105                 {
106                         return outgoing.TryReceiveAll (out items);
107                 }
108
109                 void TransformProcess ()
110                 {
111                         ITargetBlock<TOutput> target;
112                         TInput input;
113
114                         while (messageQueue.TryTake (out input)) {
115                                 foreach (var item in transformer (input)) {
116                                         if ((target = targets.Current) != null)
117                                                 target.OfferMessage (headers.Increment (), item, this, false);
118                                         else
119                                                 outgoing.AddData (item);
120                                 }
121                         }
122
123                         if (!outgoing.IsEmpty && (target = targets.Current) != null)
124                                 outgoing.ProcessForTarget (target, this, false, ref headers);
125                 }
126
127                 public void Complete ()
128                 {
129                         messageBox.Complete ();
130                 }
131
132                 public void Fault (Exception ex)
133                 {
134                         compHelper.Fault (ex);
135                 }
136
137                 public Task Completion {
138                         get {
139                                 return compHelper.Completion;
140                         }
141                 }
142
143                 public int OutputCount {
144                         get {
145                                 return outgoing.Count;
146                         }
147                 }
148
149                 public int InputCount {
150                         get {
151                                 return messageQueue.Count;
152                         }
153                 }
154         }
155 }
156