Merge pull request #268 from pcc/menudeactivate
[mono.git] / mcs / class / System.Core / System.Linq.Parallel.QueryNodes / QueryGroupByNode.cs
index f309d656913c85ebb1763bd763a737a98b1e480e..55bd421f0fab7ff1ae8563ee8b4832cf0f96bd9d 100644 (file)
@@ -1,4 +1,3 @@
-#if NET_4_0
 //
 // QueryOrderByNode.cs
 //
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
 
+#if NET_4_0
 using System;
 using System.Threading;
 using System.Collections;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
 
-namespace System.Linq
+namespace System.Linq.Parallel.QueryNodes
 {
        internal class QueryGroupByNode<TSource, TKey, TElement> : QueryStreamNode<IGrouping<TKey, TElement>, TSource>
        {
@@ -56,15 +56,33 @@ namespace System.Linq
                        
                        return src.GroupBy (keySelector, elementSelector, comparer);
                }
-               
-               internal override IList<IEnumerable<KeyValuePair<long, IGrouping<TKey, TElement>>>> GetOrderedEnumerables (QueryOptions options)
+
+               internal override IList<IEnumerable<IGrouping<TKey, TElement>>> GetEnumerables (QueryOptions options)
                {                       
-                       throw new System.NotImplementedException();
+                       return ParallelPartitioner.CreateForChunks (GetGroupedElements ()).GetPartitions (options.PartitionCount).Wrap ();
                }
                
-               internal override IList<IEnumerable<IGrouping<TKey, TElement>>> GetEnumerables (QueryOptions options)
+               internal override IList<IEnumerable<KeyValuePair<long, IGrouping<TKey, TElement>>>> GetOrderedEnumerables (QueryOptions options)
                {
-                       throw new System.NotImplementedException();
+                       return ParallelPartitioner.CreateForChunks (GetGroupedElements ()).GetOrderablePartitions (options.PartitionCount).Wrap ();
+               }
+
+               internal IEnumerable<IGrouping<TKey, TElement>> GetGroupedElements ()
+               {
+                       return (IEnumerable<System.Linq.IGrouping<TKey,TElement>>)GetStore ().Select (e => (IGrouping<TKey,TElement>)new ConcurrentGrouping<TKey, TElement> (e.Key, e.Value));
+               }
+
+               internal ConcurrentDictionary<TKey, ConcurrentQueue<TElement>> GetStore ()
+               {
+                       var store = new ConcurrentDictionary<TKey, ConcurrentQueue<TElement>> ();
+                       Func<TKey, ConcurrentQueue<TElement>> queueFactory = (_) => new ConcurrentQueue<TElement> ();
+
+                       ParallelExecuter.ProcessAndBlock (Parent, (e, c) => {
+                               ConcurrentQueue<TElement> queue = store.GetOrAdd (keySelector (e), queueFactory);
+                               queue.Enqueue (elementSelector (e));
+                       });
+
+                       return store;
                }
        }
 }