Merge pull request #439 from mono-soc-2012/garyb/iconfix
[mono.git] / mcs / class / System.Threading.Tasks.Dataflow / System.Threading.Tasks.Dataflow / ObservableDataflowBlock.cs
index b7de249dcaac0285bcf9431ed422df25778bf52d..e82d1306b4af65d862f4b287c8356e5bffe245d2 100644 (file)
@@ -1,6 +1,7 @@
 // ObservableDataflowBlock.cs
 //
 // Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
 //
 // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to deal
 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
-//
-//
 
-
-using System;
-using System.Threading.Tasks;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-
-namespace System.Threading.Tasks.Dataflow
-{
-       internal class ObservableDataflowBlock<TSource> : IObservable<TSource>
-       {
-               class ObserverWrapper : ITargetBlock<TSource>
-               {
-                       IObserver<TSource> observer;
+namespace System.Threading.Tasks.Dataflow {
+       /// <summary>
+       /// Rx Observable that represents a source block.
+       /// </summary>
+       class ObservableDataflowBlock<TSource> : IObservable<TSource> {
+               class ObserverWrapper : ITargetBlock<TSource> {
+                       readonly IObserver<TSource> observer;
 
                        public ObserverWrapper (IObserver<TSource> observer)
                        {
@@ -46,21 +39,18 @@ namespace System.Threading.Tasks.Dataflow
                                observer.OnCompleted ();
                        }
 
-                       public void Fault (Exception ex)
+                       public void Fault (Exception exception)
                        {
-                               observer.OnError (ex);
+                               observer.OnError (exception);
                        }
 
                        public Task Completion {
-                               get {
-                                       return null;
-                               }
+                               get { return null; }
                        }
 
-                       public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
-                                                                  TSource messageValue,
-                                                                  ISourceBlock<TSource> source,
-                                                                  bool consumeToAccept)
+                       public DataflowMessageStatus OfferMessage (
+                               DataflowMessageHeader messageHeader, TSource messageValue,
+                               ISourceBlock<TSource> source, bool consumeToAccept)
                        {
                                if (consumeToAccept) {
                                        if (!source.ReserveMessage (messageHeader, this))
@@ -77,7 +67,7 @@ namespace System.Threading.Tasks.Dataflow
                        }
                }
 
-               ISourceBlock<TSource> source;
+               readonly ISourceBlock<TSource> source;
 
                public ObservableDataflowBlock (ISourceBlock<TSource> source)
                {
@@ -86,9 +76,8 @@ namespace System.Threading.Tasks.Dataflow
 
                public IDisposable Subscribe (IObserver<TSource> observer)
                {
-                       ObserverWrapper wrapper = new ObserverWrapper (observer);
+                       var wrapper = new ObserverWrapper (observer);
                        return source.LinkTo (wrapper);
                }
        }
-}
-
+}
\ No newline at end of file