Added support for cancallation to all blocks
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BroadcastBlock.cs
1 // BroadcastBlock.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22 //
23 //
24
25 using System.Collections.Generic;
26 using System.Collections.Concurrent;
27
28 namespace System.Threading.Tasks.Dataflow
29 {
30         public sealed class BroadcastBlock<T> : IPropagatorBlock<T, T>, ITargetBlock<T>, IDataflowBlock, ISourceBlock<T>, IReceivableSourceBlock<T>
31         {
32                 static readonly DataflowBlockOptions defaultOptions = new DataflowBlockOptions ();
33
34                 CompletionHelper compHelper;
35                 BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
36                 MessageBox<T> messageBox;
37                 MessageVault<T> vault;
38                 DataflowBlockOptions dataflowBlockOptions;
39                 readonly Func<T, T> cloner;
40                 MessageOutgoingQueue<T> outgoing;
41                 TargetBuffer<T> targets = new TargetBuffer<T> ();
42                 DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
43
44                 public BroadcastBlock (Func<T, T> cloner) : this (cloner, defaultOptions)
45                 {
46
47                 }
48
49                 public BroadcastBlock (Func<T, T> cloner, DataflowBlockOptions dataflowBlockOptions)
50                 {
51                         if (dataflowBlockOptions == null)
52                                 throw new ArgumentNullException ("dataflowBlockOptions");
53
54                         this.cloner = cloner;
55                         this.dataflowBlockOptions = dataflowBlockOptions;
56                         this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
57                         this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper, () => outgoing.IsCompleted, BroadcastProcess, dataflowBlockOptions);
58                         this.outgoing = new MessageOutgoingQueue<T> (compHelper, () => messageQueue.IsCompleted);
59                         this.vault = new MessageVault<T> ();
60                 }
61
62                 public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
63                                                            T messageValue,
64                                                            ISourceBlock<T> source,
65                                                            bool consumeToAccept)
66                 {
67                         return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
68                 }
69
70                 public IDisposable LinkTo (ITargetBlock<T> target, bool unlinkAfterOne)
71                 {
72                         return targets.AddTarget (target, unlinkAfterOne);
73                 }
74
75                 public T ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
76                 {
77                         return cloner(vault.ConsumeMessage (messageHeader, target, out messageConsumed));
78                 }
79
80                 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
81                 {
82                         vault.ReleaseReservation (messageHeader, target);
83                 }
84
85                 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
86                 {
87                         return vault.ReserveMessage (messageHeader, target);
88                 }
89
90                 public bool TryReceive (Predicate<T> filter, out T item)
91                 {
92                         return outgoing.TryReceive (filter, out item);
93                 }
94
95                 public bool TryReceiveAll (out IList<T> items)
96                 {
97                         return outgoing.TryReceiveAll (out items);
98                 }
99
100                 void BroadcastProcess ()
101                 {
102                         T input;
103
104                         if (!messageQueue.TryTake (out input) || targets.Current == null)
105                                 return;
106
107                         foreach (var target in targets) {
108                                 DataflowMessageHeader header = headers.Increment ();
109                                 if (cloner != null)
110                                         vault.StoreMessage (header, input);
111                                 target.OfferMessage (header, input, this, cloner != null);
112                                 // TODO: verify if it's the correct semantic
113                                 T save = input;
114                                 if (!messageQueue.TryTake (out input))
115                                         input = save;
116                         }
117                 }
118
119                 public void Complete ()
120                 {
121                         messageBox.Complete ();
122                 }
123
124                 public void Fault (Exception ex)
125                 {
126                         compHelper.Fault (ex);
127                 }
128
129                 public Task Completion {
130                         get {
131                                 return compHelper.Completion;
132                         }
133                 }
134
135                 public override string ToString ()
136                 {
137                         return NameHelper.GetName (this, dataflowBlockOptions);
138                 }
139         }
140 }