-<?xml version="1.0" encoding="utf-8"?>\r
+<?xml version="1.0" encoding="utf-8"?>\r
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">\r
<PropertyGroup>\r
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>\r
<Compile Include="System.Threading.Tasks.Dataflow\ISourceBlock.cs" />
<Compile Include="System.Threading.Tasks.Dataflow\ITargetBlock.cs" />
<Compile Include="System.Threading.Tasks.Dataflow\CompletionHelper.cs" />
+ <Compile Include="System.Threading.Tasks.Dataflow\JoinTarget.cs" />\r
<Compile Include="System.Threading.Tasks.Dataflow\MessageBox.cs" />
<Compile Include="System.Threading.Tasks.Dataflow\MessageOutgoingQueue.cs" />
<Compile Include="System.Threading.Tasks.Dataflow\MessageVault.cs" />
</ItemGroup>\r
\r
</Project>\r
-
using System;
using System.Threading.Tasks;
using System.Collections.Generic;
-using System.Collections.Concurrent;
namespace System.Threading.Tasks.Dataflow
{
MessageVault<Tuple<T1, T2>> vault = new MessageVault<Tuple<T1, T2>> ();
MessageOutgoingQueue<Tuple<T1, T2>> outgoing;
- JoinTarget<T1> target1;
- JoinTarget<T2> target2;
+ readonly JoinTarget<T1> target1;
+ readonly JoinTarget<T2> target2;
DataflowMessageHeader headers;
throw new ArgumentNullException ("dataflowBlockOptions");
this.dataflowBlockOptions = dataflowBlockOptions;
- this.target1 = new JoinTarget<T1> (this, SignalArrivalTarget1, new BlockingCollection<T1> (), compHelper);
- this.target2 = new JoinTarget<T2> (this, SignalArrivalTarget2, new BlockingCollection<T2> (), compHelper);
- this.outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (compHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
+ target1 = new JoinTarget<T1> (this, SignalArrivalTarget1, compHelper, () => outgoing.IsCompleted);
+ target2 = new JoinTarget<T2> (this, SignalArrivalTarget2, compHelper, () => outgoing.IsCompleted);
+ outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (compHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
}
public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, bool unlinkAfterOne)
outgoing.ProcessForTarget (target, this, false, ref headers);
}
- class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
- {
- JoinBlock<T1, T2> joinBlock;
- BlockingCollection<TTarget> buffer;
- Action signal;
-
- public JoinTarget (JoinBlock<T1, T2> joinBlock, Action signal, BlockingCollection<TTarget> buffer, CompletionHelper helper)
- : base (buffer, helper, () => joinBlock.outgoing.IsCompleted)
- {
- this.joinBlock = joinBlock;
- this.buffer = buffer;
- this.signal = signal;
- }
-
- protected override void EnsureProcessing ()
- {
- signal ();
- }
-
- public BlockingCollection<TTarget> Buffer {
- get {
- return buffer;
- }
- }
-
- DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
- TTarget messageValue,
- ISourceBlock<TTarget> source,
- bool consumeToAccept)
- {
- return OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
- }
-
- void IDataflowBlock.Complete ()
- {
- Complete ();
- }
-
- Task IDataflowBlock.Completion {
- get {
- return joinBlock.Completion;
- }
- }
-
- void IDataflowBlock.Fault (Exception e)
- {
- joinBlock.Fault (e);
- }
- }
-
public ITargetBlock<T1> Target1 {
get {
return target1;
using System;
using System.Threading.Tasks;
using System.Collections.Generic;
-using System.Collections.Concurrent;
namespace System.Threading.Tasks.Dataflow
{
MessageVault<Tuple<T1, T2, T3>> vault = new MessageVault<Tuple<T1, T2, T3>> ();
MessageOutgoingQueue<Tuple<T1, T2, T3>> outgoing;
- JoinTarget<T1> target1;
- JoinTarget<T2> target2;
- JoinTarget<T3> target3;
+ readonly JoinTarget<T1> target1;
+ readonly JoinTarget<T2> target2;
+ readonly JoinTarget<T3> target3;
SpinLock targetLock = new SpinLock (false);
Func<bool> checker2 = () => target1.Buffer.Count == 0 || target3.Buffer.Count == 0;
Func<bool> checker3 = () => target1.Buffer.Count == 0 || target2.Buffer.Count == 0;
- this.target1 = new JoinTarget<T1> (this, () => SignalArrivalTargetImpl (checker1), new BlockingCollection<T1> (), compHelper);
- this.target2 = new JoinTarget<T2> (this, () => SignalArrivalTargetImpl (checker2), new BlockingCollection<T2> (), compHelper);
- this.target3 = new JoinTarget<T3> (this, () => SignalArrivalTargetImpl (checker3), new BlockingCollection<T3> (), compHelper);
- this.outgoing =
+ Func<bool> joinTargetCompleteTester = () => outgoing.IsCompleted;
+ target1 = new JoinTarget<T1> (this, () => SignalArrivalTargetImpl (checker1), compHelper, joinTargetCompleteTester);
+ target2 = new JoinTarget<T2> (this, () => SignalArrivalTargetImpl (checker2), compHelper, joinTargetCompleteTester);
+ target3 = new JoinTarget<T3> (this, () => SignalArrivalTargetImpl (checker3), compHelper, joinTargetCompleteTester);
+ outgoing =
new MessageOutgoingQueue<Tuple<T1, T2, T3>> (compHelper,
() => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted || target3.Buffer.IsCompleted);
}
outgoing.ProcessForTarget (target, this, false, ref headers);
}
- class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
- {
- JoinBlock<T1, T2, T3> joinBlock;
- BlockingCollection<TTarget> buffer;
- Action signal;
-
- public JoinTarget (JoinBlock<T1, T2, T3> joinBlock, Action signal, BlockingCollection<TTarget> buffer, CompletionHelper helper)
- : base (buffer, helper, () => joinBlock.outgoing.IsCompleted)
- {
- this.joinBlock = joinBlock;
- this.buffer = buffer;
- this.signal = signal;
- }
-
- protected override void EnsureProcessing ()
- {
- signal ();
- }
-
- public BlockingCollection<TTarget> Buffer {
- get {
- return buffer;
- }
- }
-
- DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
- TTarget messageValue,
- ISourceBlock<TTarget> source,
- bool consumeToAccept)
- {
- return OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
- }
-
- void IDataflowBlock.Complete ()
- {
- Complete ();
- }
-
- Task IDataflowBlock.Completion {
- get {
- return joinBlock.Completion;
- }
- }
-
- void IDataflowBlock.Fault (Exception e)
- {
- joinBlock.Fault (e);
- }
- }
-
public ITargetBlock<T1> Target1 {
get {
return target1;
--- /dev/null
+// JoinBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow {
+ internal class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
+ {
+ readonly IDataflowBlock joinBlock;
+ readonly Action signal;
+
+ public JoinTarget (IDataflowBlock joinBlock, Action signal, CompletionHelper helper,
+ Func<bool> externalCompleteTester)
+ : base (new BlockingCollection<TTarget> (), helper, externalCompleteTester)
+ {
+ this.joinBlock = joinBlock;
+ this.signal = signal;
+ }
+
+ protected override void EnsureProcessing ()
+ {
+ signal ();
+ }
+
+ public BlockingCollection<TTarget> Buffer {
+ get {
+ return MessageQueue;
+ }
+ }
+
+ DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
+ TTarget messageValue,
+ ISourceBlock<TTarget> source,
+ bool consumeToAccept)
+ {
+ return OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+ }
+
+ void IDataflowBlock.Complete ()
+ {
+ Complete ();
+ }
+
+ Task IDataflowBlock.Completion {
+ get {
+ return joinBlock.Completion;
+ }
+ }
+
+ void IDataflowBlock.Fault (Exception e)
+ {
+ joinBlock.Fault (e);
+ }
+ }
+}
\ No newline at end of file
//
//
-
-using System;
-using System.Threading.Tasks;
-using System.Collections.Generic;
using System.Collections.Concurrent;
namespace System.Threading.Tasks.Dataflow
internal class MessageBox<TInput>
{
readonly CompletionHelper compHelper;
- readonly BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
readonly Func<bool> externalCompleteTester;
+ protected BlockingCollection<TInput> MessageQueue { get; private set; }
+
public MessageBox (BlockingCollection<TInput> messageQueue, CompletionHelper compHelper, Func<bool> externalCompleteTester)
{
this.compHelper = compHelper;
- this.messageQueue = messageQueue;
+ this.MessageQueue = messageQueue;
this.externalCompleteTester = externalCompleteTester;
}
{
if (!messageHeader.IsValid)
return DataflowMessageStatus.Declined;
- if (messageQueue.IsAddingCompleted)
+ if (MessageQueue.IsAddingCompleted)
return DataflowMessageStatus.DecliningPermanently;
if (consumeToAccept) {
}
try {
- messageQueue.Add (messageValue);
+ MessageQueue.Add (messageValue);
} catch (InvalidOperationException) {
// This is triggered either if the underlying collection didn't accept the item
// or if the messageQueue has been marked complete, either way it corresponds to a false
protected virtual void EnsureProcessing ()
{
-
}
public void Complete ()
{
// Make message queue complete
- messageQueue.CompleteAdding ();
+ MessageQueue.CompleteAdding ();
VerifyCompleteness ();
}
void VerifyCompleteness ()
{
- if (messageQueue.IsCompleted && externalCompleteTester ())
+ if (MessageQueue.IsCompleted && externalCompleteTester ())
compHelper.Complete ();
}
}