aa273ab2f16346897693cb1203a40b50b78b9ca5
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / OutgoingQueue.cs
1 // OutgoingQueue.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23
24 using System.Collections.Generic;
25
26 namespace System.Threading.Tasks.Dataflow {
27         /// <summary>
28         /// Version of <see cref="OutgoingQueueBase{T}"/> for
29         /// non-broadcast blocks.
30         /// </summary>
31         class OutgoingQueue<T> : OutgoingQueueBase<T> {
32                 readonly Func<T, int> countSelector;
33                 SpinLock firstItemLock = new SpinLock();
34                 volatile ITargetBlock<T> reservedForTargetBlock;
35                 readonly TargetCollection<T> targets;
36
37                 protected override TargetCollectionBase<T> Targets {
38                         get { return targets; }
39                 }
40
41                 public OutgoingQueue (
42                         ISourceBlock<T> block, CompletionHelper compHelper,
43                         Func<bool> externalCompleteTester, Action<int> decreaseItemsCount,
44                         DataflowBlockOptions options, Func<T, int> countSelector = null)
45                         : base (compHelper, externalCompleteTester,
46                                 decreaseItemsCount, options)
47                 {
48                         targets = new TargetCollection<T> (block);
49                         this.countSelector = countSelector;
50                 }
51
52                 /// <summary>
53                 /// Calculates the count of items in the given object.
54                 /// </summary>
55                 protected override int GetModifiedCount(T data)
56                 {
57                         if (countSelector == null)
58                                 return 1;
59
60                         return countSelector (data);
61                 }
62
63                 /// <summary>
64                 /// Sends messages to targets.
65                 /// </summary>
66                 protected override void Process ()
67                 {
68                         bool processed;
69                         do {
70                                 ForceProcessing = false;
71
72                                 bool lockTaken = false;
73                                 try {
74                                         firstItemLock.Enter (ref lockTaken);
75
76                                         T item;
77                                         if (!Store.TryPeek (out item))
78                                                 break;
79
80                                         if (!targets.HasCurrentItem)
81                                                 targets.SetCurrentItem (item);
82
83                                         if (reservedForTargetBlock != null)
84                                                 break;
85
86                                         processed = targets.OfferItemToTargets ();
87                                         if (processed) {
88                                                 Outgoing.TryTake (out item);
89                                                 DecreaseCounts (item);
90                                                 FirstItemChanged ();
91                                         }
92                                 } finally {
93                                         if (lockTaken)
94                                                 firstItemLock.Exit ();
95                                 }
96                         } while (processed);
97
98                         IsProcessing.Value = false;
99
100                         // to guard against race condition
101                         if (ForceProcessing && reservedForTargetBlock == null)
102                                 EnsureProcessing ();
103
104                         VerifyCompleteness ();
105                 }
106
107                 public T ConsumeMessage (DataflowMessageHeader messageHeader,
108                                          ITargetBlock<T> targetBlock, out bool messageConsumed)
109                 {
110                         if (!messageHeader.IsValid)
111                                 throw new ArgumentException ("The messageHeader is not valid.",
112                                         "messageHeader");
113                         if (targetBlock == null)
114                                 throw new ArgumentNullException("target");
115
116                         T result = default(T);
117                         messageConsumed = false;
118
119                         bool lockTaken = false;
120                         try {
121                                 firstItemLock.Enter (ref lockTaken);
122
123                                 if (targets.VerifyHeader (messageHeader, targetBlock)
124                                     && (reservedForTargetBlock == null
125                                         || reservedForTargetBlock == targetBlock)) {
126                                         // cannot consume from faulted block, unless reserved
127                                         if (reservedForTargetBlock == null && IsFaultedOrCancelled)
128                                                 return result;
129
130                                         Outgoing.TryTake (out result);
131                                         messageConsumed = true;
132                                         DecreaseCounts (result);
133                                         reservedForTargetBlock = null;
134                                         FirstItemChanged ();
135                                 }
136                         } finally {
137                                 if (lockTaken)
138                                         firstItemLock.Exit ();
139                         }
140
141                         targets.UnpostponeTarget (targetBlock, messageConsumed);
142                         EnsureProcessing ();
143                         VerifyCompleteness ();
144
145                         return result;
146                 }
147
148                 public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
149                 {
150                         if (!messageHeader.IsValid)
151                                 throw new ArgumentException ("The messageHeader is not valid.",
152                                         "messageHeader");
153                         if (target == null)
154                                 throw new ArgumentNullException("target");
155
156                         bool lockTaken = false;
157                         try {
158                                 firstItemLock.Enter (ref lockTaken);
159
160                                 if (targets.VerifyHeader(messageHeader, target)) {
161                                         reservedForTargetBlock = target;
162                                         return true;
163                                 }
164
165                                 targets.UnpostponeTarget (target, false);
166                                 EnsureProcessing ();
167
168                                 return false;
169                         } finally {
170                                 if (lockTaken)
171                                         firstItemLock.Exit ();
172                         }
173                 }
174
175                 public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
176                 {
177                         if (!messageHeader.IsValid)
178                                 throw new ArgumentException ("The messageHeader is not valid.",
179                                         "messageHeader");
180                         if (target == null)
181                                 throw new ArgumentNullException("target");
182
183                         bool lockTaken = false;
184                         try
185                         {
186                                 firstItemLock.Enter(ref lockTaken);
187
188                                 if (!targets.VerifyHeader(messageHeader, target)
189                                     || reservedForTargetBlock != target)
190                                         throw new InvalidOperationException(
191                                                 "The target did not have the message reserved.");
192
193                                 reservedForTargetBlock = null;
194                         } finally {
195                                 if (lockTaken)
196                                         firstItemLock.Exit ();
197                         }
198
199                         targets.UnpostponeTarget (target, false);
200                         EnsureProcessing ();
201                 }
202
203                 /// <summary>
204                 /// Notifies that the first item in the queue changed.
205                 /// </summary>
206                 void FirstItemChanged ()
207                 {
208                         T firstItem;
209                         if (Store.TryPeek (out firstItem))
210                                 targets.SetCurrentItem (firstItem);
211                         else
212                                 targets.ResetCurrentItem ();
213                 }
214
215                 public bool TryReceive (Predicate<T> filter, out T item)
216                 {
217                         bool success = false;
218                         item = default (T);
219
220                         bool lockTaken = false;
221                         try {
222                                 firstItemLock.Enter (ref lockTaken);
223
224                                 if (reservedForTargetBlock != null)
225                                         return false;
226
227                                 T result;
228                                 if (Store.TryPeek (out result) && (filter == null || filter (result))) {
229                                         Outgoing.TryTake (out item);
230                                         success = true;
231                                         DecreaseCounts (item);
232                                         FirstItemChanged ();
233                                 }
234                         } finally {
235                                 if (lockTaken)
236                                         firstItemLock.Exit ();
237                         }
238
239                         EnsureProcessing ();
240                         VerifyCompleteness ();
241
242                         return success;
243                 }
244
245                 public bool TryReceiveAll (out IList<T> items)
246                 {
247                         items = null;
248
249                         if (Store.IsEmpty)
250                                 return false;
251
252                         bool lockTaken = false;
253                         try {
254                                 firstItemLock.Enter (ref lockTaken);
255
256                                 if (reservedForTargetBlock != null)
257                                         return false;
258
259                                 var list = new List<T> (Outgoing.Count);
260
261                                 T item;
262                                 while (Outgoing.TryTake (out item)) {
263                                         DecreaseCounts (item);
264                                         list.Add (item);
265                                 }
266
267                                 items = list;
268
269                                 FirstItemChanged ();
270                         } finally {
271                                 if (lockTaken)
272                                         firstItemLock.Exit ();
273                         }
274
275                         EnsureProcessing ();
276                         VerifyCompleteness ();
277
278                         return items.Count > 0;
279                 }
280         }
281 }