Changed link from GUID to URL
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / OutgoingQueueBase.cs
1 // OutgoingQueueBase.cs
2 //
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22 // THE SOFTWARE.
23
24 using System.Collections.Concurrent;
25
26 namespace System.Threading.Tasks.Dataflow {
27         /// <summary>
28         /// Handles outgoing messages that get queued when there is no
29         /// block on the other end to proces it. It also allows receive operations.
30         /// </summary>
31         abstract class OutgoingQueueBase<T> {
32                 protected ConcurrentQueue<T> Store { get; private set; }
33                 protected BlockingCollection<T> Outgoing { get; private set; }
34                 int outgoingCount;
35                 readonly CompletionHelper compHelper;
36                 readonly Func<bool> externalCompleteTester;
37                 readonly DataflowBlockOptions options;
38                 protected AtomicBoolean IsProcessing { get; private set; }
39                 protected abstract TargetCollectionBase<T> Targets { get; }
40                 int totalModifiedCount;
41                 readonly Action<int> decreaseItemsCount;
42                 volatile bool forceProcessing;
43
44                 protected OutgoingQueueBase (
45                         CompletionHelper compHelper, Func<bool> externalCompleteTester,
46                         Action<int> decreaseItemsCount, DataflowBlockOptions options)
47                 {
48                         IsProcessing = new AtomicBoolean ();
49                         Store = new ConcurrentQueue<T> ();
50                         Outgoing = new BlockingCollection<T> (Store);
51                         this.compHelper = compHelper;
52                         this.externalCompleteTester = externalCompleteTester;
53                         this.options = options;
54                         this.decreaseItemsCount = decreaseItemsCount;
55                 }
56
57                 /// <summary>
58                 /// Is the queue completed?
59                 /// Queue is completed after <see cref="Complete"/> is called
60                 /// and all items are retrieved from it.
61                 /// </summary>
62                 public bool IsCompleted {
63                         get { return Outgoing.IsCompleted; }
64                 }
65
66                 /// <summary>
67                 /// Current number of items in the queue.
68                 /// Item are counted the way <see cref="DataflowBlockOptions.BoundedCapacity"/>
69                 /// counts them, e.g. each item in a batch counts, even if batch is a single object.
70                 /// </summary>
71                 public int Count {
72                         get { return totalModifiedCount; }
73                 }
74
75                 /// <summary>
76                 /// Calculates the count of items in the given object.
77                 /// </summary>
78                 protected virtual int GetModifiedCount (T data)
79                 {
80                         return 1;
81                 }
82
83                 /// <summary>
84                 /// Adds an object to the queue.
85                 /// </summary>
86                 public void AddData (T data)
87                 {
88                         try {
89                                 Outgoing.Add (data);
90                                 Interlocked.Add (ref totalModifiedCount, GetModifiedCount (data));
91                                 if (Interlocked.Increment (ref outgoingCount) == 1)
92                                         EnsureProcessing ();
93                         } catch (InvalidOperationException) {
94                                 VerifyCompleteness ();
95                         }
96                 }
97
98                 /// <summary>
99                 /// Makes sure sending messages to targets is running.
100                 /// </summary>
101                 protected void EnsureProcessing ()
102                 {
103                         ForceProcessing = true;
104                         if (IsProcessing.TrySet())
105                                 Task.Factory.StartNew (Process, CancellationToken.None,
106                                         TaskCreationOptions.PreferFairness, options.TaskScheduler);
107                 }
108
109                 /// <summary>
110                 /// Indicates whether sending messages should be forced to start.
111                 /// </summary>
112                 protected bool ForceProcessing {
113                         get { return forceProcessing; }
114                         set { forceProcessing = value; }
115                 }
116
117                 /// <summary>
118                 /// Sends messages to targets.
119                 /// </summary>
120                 protected abstract void Process ();
121
122                 /// <summary>
123                 /// Adds a target block to send messages to.
124                 /// </summary>
125                 /// <returns>
126                 /// An object that can be used to destroy the link to the added target.
127                 /// </returns>
128                 public IDisposable AddTarget (ITargetBlock<T> targetBlock, DataflowLinkOptions linkOptions)
129                 {
130                         if (targetBlock == null)
131                                 throw new ArgumentNullException ("targetBlock");
132                         if (linkOptions == null)
133                                 throw new ArgumentNullException ("linkOptions");
134
135                         var result = Targets.AddTarget (targetBlock, linkOptions);
136                         EnsureProcessing ();
137                         return result;
138                 }
139
140                 /// <summary>
141                 /// Makes sure the block is completed if it should be.
142                 /// </summary>
143                 protected void VerifyCompleteness ()
144                 {
145                         if (Outgoing.IsCompleted && externalCompleteTester ())
146                                 compHelper.Complete ();
147                 }
148
149                 /// <summary>
150                 /// Is the block faulted or cancelled?
151                 /// </summary>
152                 protected bool IsFaultedOrCancelled {
153                         get { return compHelper.Completion.IsFaulted || compHelper.Completion.IsCanceled; }
154                 }
155
156                 /// <summary>
157                 /// Used to notify that object was removed from the queue
158                 /// and to update counts.
159                 /// </summary>
160                 protected void DecreaseCounts (T data)
161                 {
162                         var modifiedCount = GetModifiedCount (data);
163                         Interlocked.Add (ref totalModifiedCount, -modifiedCount);
164                         Interlocked.Decrement (ref outgoingCount);
165                         decreaseItemsCount (modifiedCount);
166                 }
167
168                 /// <summary>
169                 /// Marks the queue for completion.
170                 /// </summary>
171                 public void Complete ()
172                 {
173                         Outgoing.CompleteAdding ();
174                         VerifyCompleteness ();
175                 }
176         }
177 }