// ObservableDataflowBlock.cs
//
// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
-//
-//
-
-using System;
-using System.Threading.Tasks;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace System.Threading.Tasks.Dataflow
-{
- internal class ObservableDataflowBlock<TSource> : IObservable<TSource>
- {
- class ObserverWrapper : ITargetBlock<TSource>
- {
- IObserver<TSource> observer;
+namespace System.Threading.Tasks.Dataflow {
+ /// <summary>
+ /// Rx Observable that represents a source block.
+ /// </summary>
+ class ObservableDataflowBlock<TSource> : IObservable<TSource> {
+ class ObserverWrapper : ITargetBlock<TSource> {
+ readonly IObserver<TSource> observer;
public ObserverWrapper (IObserver<TSource> observer)
{
observer.OnCompleted ();
}
- public void Fault (Exception ex)
+ public void Fault (Exception exception)
{
- observer.OnError (ex);
+ observer.OnError (exception);
}
public Task Completion {
- get {
- return null;
- }
+ get { return null; }
}
- public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
- TSource messageValue,
- ISourceBlock<TSource> source,
- bool consumeToAccept)
+ public DataflowMessageStatus OfferMessage (
+ DataflowMessageHeader messageHeader, TSource messageValue,
+ ISourceBlock<TSource> source, bool consumeToAccept)
{
if (consumeToAccept) {
if (!source.ReserveMessage (messageHeader, this))
}
}
- ISourceBlock<TSource> source;
+ readonly ISourceBlock<TSource> source;
public ObservableDataflowBlock (ISourceBlock<TSource> source)
{
public IDisposable Subscribe (IObserver<TSource> observer)
{
- ObserverWrapper wrapper = new ObserverWrapper (observer);
+ var wrapper = new ObserverWrapper (observer);
return source.LinkTo (wrapper);
}
}
-}
-
+}
\ No newline at end of file