Merge pull request #409 from Alkarex/patch-1
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / BroadcastOutgoingQueue.cs
1 // BroadcastOutgoingQueue.cs
2 //
3 // Copyright (c) 2012 Petr Onderka
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // THE SOFTWARE.
22
23 using System.Collections.Concurrent;
24
25 namespace System.Threading.Tasks.Dataflow {
26         /// <summary>
27         /// Version of <see cref="OutgoingQueueBase{T}"/> for broadcast blocks.
28         /// </summary>
29         class BroadcastOutgoingQueue<T> : OutgoingQueueBase<T> {
30                 volatile bool hasCurrentItem;
31                 // don't use directly, only through CurrentItem (and carefully)
32                 T currentItem;
33                 SpinLock currentItemLock = new SpinLock();
34
35                 readonly BroadcastTargetCollection<T> targets;
36
37                 protected override TargetCollectionBase<T> Targets {
38                         get { return targets; }
39                 }
40
41                 readonly ConcurrentDictionary<Tuple<DataflowMessageHeader, ITargetBlock<T>>, T>
42                         reservedMessages =
43                                 new ConcurrentDictionary<Tuple<DataflowMessageHeader, ITargetBlock<T>>, T>();
44
45                 public BroadcastOutgoingQueue (
46                         ISourceBlock<T> block, CompletionHelper compHelper,
47                         Func<bool> externalCompleteTester, Action<int> decreaseItemsCount,
48                         DataflowBlockOptions options, bool hasCloner)
49                         : base (compHelper, externalCompleteTester, decreaseItemsCount, options)
50                 {
51                         targets = new BroadcastTargetCollection<T> (block, hasCloner);
52                 }
53
54                 /// <summary>
55                 /// The current item that is to be sent to taget blocks.
56                 /// </summary>
57                 T CurrentItem {
58                         get {
59                                 T item;
60                                 bool lockTaken = false;
61                                 try {
62                                         currentItemLock.Enter (ref lockTaken);
63                                         item = currentItem;
64                                 } finally {
65                                         if (lockTaken)
66                                                 currentItemLock.Exit ();
67                                 }
68                                 return item;
69                         }
70                         set {
71                                 hasCurrentItem = true;
72
73                                 bool lockTaken = false;
74                                 try {
75                                         currentItemLock.Enter (ref lockTaken);
76                                         currentItem = value;
77                                 } finally {
78                                         if (lockTaken)
79                                                 currentItemLock.Exit ();
80                                 }
81                         }
82                 }
83
84                 /// <summary>
85                 /// Takes an item from the queue and sets it as <see cref="CurrentItem"/>.
86                 /// </summary>
87                 public void DequeueItem ()
88                 {
89                         T item;
90                         if (Outgoing.TryTake (out item)) {
91                                 DecreaseCounts (item);
92                                 targets.SetCurrentItem (item);
93
94                                 CurrentItem = item;
95                         }
96                 }
97
98                 /// <summary>
99                 /// Manages sending items to the target blocks.
100                 /// </summary>
101                 protected override void Process ()
102                 {
103                         do {
104                                 ForceProcessing = false;
105
106                                 DequeueItem ();
107
108                                 targets.OfferItemToTargets ();
109                         } while (!Store.IsEmpty || targets.NeedsProcessing);
110
111                         IsProcessing.Value = false;
112
113                         // to guard against race condition
114                         if (ForceProcessing)
115                                 EnsureProcessing ();
116
117                         VerifyCompleteness ();
118                 }
119
120                 public T ConsumeMessage (DataflowMessageHeader messageHeader,
121                                          ITargetBlock<T> target, out bool messageConsumed)
122                 {
123                         if (!messageHeader.IsValid)
124                                 throw new ArgumentException ("The messageHeader is not valid.",
125                                         "messageHeader");
126                         if (target == null)
127                                 throw new ArgumentNullException("target");
128
129                         T item;
130                         if (reservedMessages.TryRemove (Tuple.Create (messageHeader, target), out item)) {
131                                 messageConsumed = true;
132                                 return item;
133                         }
134
135                         // if we first retrieve CurrentItem and then check the header,
136                         // there will be no race condition
137
138                         item = CurrentItem;
139
140                         if (!targets.VerifyHeader (messageHeader)) {
141                                 targets.UnpostponeTargetNotConsumed (target);
142
143                                 messageConsumed = false;
144                                 return default(T);
145                         }
146
147                         targets.UnpostponeTargetConsumed (target, messageHeader);
148                         EnsureProcessing ();
149
150                         messageConsumed = true;
151                         return item;
152                 }
153
154                 public bool ReserveMessage (DataflowMessageHeader messageHeader,
155                                             ITargetBlock<T> target)
156                 {
157                         if (!messageHeader.IsValid)
158                                 throw new ArgumentException ("The messageHeader is not valid.",
159                                         "messageHeader");
160                         if (target == null)
161                                 throw new ArgumentNullException("target");
162
163                         T item = CurrentItem;
164
165                         if (!targets.VerifyHeader (messageHeader)) {
166                                 targets.UnpostponeTargetNotConsumed (target);
167                                 EnsureProcessing ();
168                                 return false;
169                         }
170
171                         targets.ReserveTarget (target);
172                         reservedMessages [Tuple.Create (messageHeader, target)] = item;
173                         return true;
174                 }
175
176                 public void ReleaseReservation (DataflowMessageHeader messageHeader,
177                                                 ITargetBlock<T> target)
178                 {
179                         if (!messageHeader.IsValid)
180                                 throw new ArgumentException ("The messageHeader is not valid.",
181                                         "messageHeader");
182                         if (target == null)
183                                 throw new ArgumentNullException("target");
184
185                         T item;
186                         if (!reservedMessages.TryRemove (Tuple.Create (messageHeader, target), out item))
187                                 throw new InvalidOperationException (
188                                         "The target did not have the message reserved.");
189
190                         targets.UnpostponeTargetNotConsumed (target);
191                         EnsureProcessing ();
192                 }
193
194                 public bool TryReceive (Predicate<T> filter, out T retrievedItem)
195                 {
196                         retrievedItem = default(T);
197
198                         if (!hasCurrentItem) {
199                                 return false;
200                         }
201
202                         T item = CurrentItem;
203
204                         if (filter == null || filter(item)) {
205                                 retrievedItem = item;
206                                 return true;
207                         }
208
209                         return false;
210                 }
211         }
212 }