1 // OutgoingQueueBase.cs
3 // Copyright (c) 2011 Jérémie "garuma" Laval
4 // Copyright (c) 2012 Petr Onderka
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to deal
8 // in the Software without restriction, including without limitation the rights
9 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 // copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 using System.Collections.Concurrent;
26 namespace System.Threading.Tasks.Dataflow {
28 /// Handles outgoing messages that get queued when there is no
29 /// block on the other end to proces it. It also allows receive operations.
31 abstract class OutgoingQueueBase<T> {
32 protected ConcurrentQueue<T> Store { get; private set; }
33 protected BlockingCollection<T> Outgoing { get; private set; }
35 readonly CompletionHelper compHelper;
36 readonly Func<bool> externalCompleteTester;
37 readonly DataflowBlockOptions options;
38 protected AtomicBoolean IsProcessing { get; private set; }
39 protected abstract TargetCollectionBase<T> Targets { get; }
40 int totalModifiedCount;
41 readonly Action<int> decreaseItemsCount;
42 volatile bool forceProcessing;
44 protected OutgoingQueueBase (
45 CompletionHelper compHelper, Func<bool> externalCompleteTester,
46 Action<int> decreaseItemsCount, DataflowBlockOptions options)
48 IsProcessing = new AtomicBoolean ();
49 Store = new ConcurrentQueue<T> ();
50 Outgoing = new BlockingCollection<T> (Store);
51 this.compHelper = compHelper;
52 this.externalCompleteTester = externalCompleteTester;
53 this.options = options;
54 this.decreaseItemsCount = decreaseItemsCount;
58 /// Is the queue completed?
59 /// Queue is completed after <see cref="Complete"/> is called
60 /// and all items are retrieved from it.
62 public bool IsCompleted {
63 get { return Outgoing.IsCompleted; }
67 /// Current number of items in the queue.
68 /// Item are counted the way <see cref="DataflowBlockOptions.BoundedCapacity"/>
69 /// counts them, e.g. each item in a batch counts, even if batch is a single object.
72 get { return totalModifiedCount; }
76 /// Calculates the count of items in the given object.
78 protected virtual int GetModifiedCount (T data)
84 /// Adds an object to the queue.
86 public void AddData (T data)
90 Interlocked.Add (ref totalModifiedCount, GetModifiedCount (data));
91 if (Interlocked.Increment (ref outgoingCount) == 1)
93 } catch (InvalidOperationException) {
94 VerifyCompleteness ();
99 /// Makes sure sending messages to targets is running.
101 protected void EnsureProcessing ()
103 ForceProcessing = true;
104 if (IsProcessing.TrySet())
105 Task.Factory.StartNew (Process, CancellationToken.None,
106 TaskCreationOptions.PreferFairness, options.TaskScheduler);
110 /// Indicates whether sending messages should be forced to start.
112 protected bool ForceProcessing {
113 get { return forceProcessing; }
114 set { forceProcessing = value; }
118 /// Sends messages to targets.
120 protected abstract void Process ();
123 /// Adds a target block to send messages to.
126 /// An object that can be used to destroy the link to the added target.
128 public IDisposable AddTarget (ITargetBlock<T> targetBlock, DataflowLinkOptions linkOptions)
130 if (targetBlock == null)
131 throw new ArgumentNullException ("targetBlock");
132 if (linkOptions == null)
133 throw new ArgumentNullException ("linkOptions");
135 var result = Targets.AddTarget (targetBlock, linkOptions);
141 /// Makes sure the block is completed if it should be.
143 protected void VerifyCompleteness ()
145 if (Outgoing.IsCompleted && externalCompleteTester ())
146 compHelper.Complete ();
150 /// Is the block faulted or cancelled?
152 protected bool IsFaultedOrCancelled {
153 get { return compHelper.Completion.IsFaulted || compHelper.Completion.IsCanceled; }
157 /// Used to notify that object was removed from the queue
158 /// and to update counts.
160 protected void DecreaseCounts (T data)
162 var modifiedCount = GetModifiedCount (data);
163 Interlocked.Add (ref totalModifiedCount, -modifiedCount);
164 Interlocked.Decrement (ref outgoingCount);
165 decreaseItemsCount (modifiedCount);
169 /// Marks the queue for completion.
171 public void Complete ()
173 Outgoing.CompleteAdding ();
174 VerifyCompleteness ();