[System.Core] Returns false in SequenceEqual when the two sequences size are different
[mono.git] / mcs / class / System.Core / System.Linq.Parallel.QueryNodes / QueryGroupByNode.cs
index 547dabc2ac229ea85a1f6e0aa18d600293e33056..394b529af0346b0054391773a00f14ec5d0094a0 100644 (file)
@@ -56,15 +56,33 @@ namespace System.Linq.Parallel.QueryNodes
                        
                        return src.GroupBy (keySelector, elementSelector, comparer);
                }
-               
-               internal override IList<IEnumerable<KeyValuePair<long, IGrouping<TKey, TElement>>>> GetOrderedEnumerables (QueryOptions options)
+
+               internal override IList<IEnumerable<IGrouping<TKey, TElement>>> GetEnumerables (QueryOptions options)
                {                       
-                       throw new System.NotImplementedException();
+                       return ParallelPartitioner.CreateForChunks (GetGroupedElements ()).GetPartitions (options.PartitionCount).Wrap ();
                }
                
-               internal override IList<IEnumerable<IGrouping<TKey, TElement>>> GetEnumerables (QueryOptions options)
+               internal override IList<IEnumerable<KeyValuePair<long, IGrouping<TKey, TElement>>>> GetOrderedEnumerables (QueryOptions options)
+               {
+                       return ParallelPartitioner.CreateForChunks (GetGroupedElements ()).GetOrderablePartitions (options.PartitionCount).Wrap ();
+               }
+
+               internal IEnumerable<IGrouping<TKey, TElement>> GetGroupedElements ()
                {
-                       throw new System.NotImplementedException();
+                       return GetStore ().Select ((e) => new ConcurrentGrouping<TKey, TElement> (e.Key, e.Value));
+               }
+
+               internal ConcurrentDictionary<TKey, ConcurrentQueue<TElement>> GetStore ()
+               {
+                       var store = new ConcurrentDictionary<TKey, ConcurrentQueue<TElement>> ();
+                       Func<TKey, ConcurrentQueue<TElement>> queueFactory = (_) => new ConcurrentQueue<TElement> ();
+
+                       ParallelExecuter.ProcessAndBlock (Parent, (e, c) => {
+                               ConcurrentQueue<TElement> queue = store.GetOrAdd (keySelector (e), queueFactory);
+                               queue.Enqueue (elementSelector (e));
+                       });
+
+                       return store;
                }
        }
 }