Merge pull request #409 from Alkarex/patch-1
[mono.git] / mcs / class / System.Core / System.Linq.Parallel.QueryNodes / QueryGroupByNode.cs
index 547dabc2ac229ea85a1f6e0aa18d600293e33056..69dbbb1bd18987f15b40820364fb4aeea4041a34 100644 (file)
@@ -24,7 +24,7 @@
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
 
-#if NET_4_0
+#if NET_4_0 || MOBILE
 using System;
 using System.Threading;
 using System.Collections;
@@ -56,15 +56,33 @@ namespace System.Linq.Parallel.QueryNodes
                        
                        return src.GroupBy (keySelector, elementSelector, comparer);
                }
-               
-               internal override IList<IEnumerable<KeyValuePair<long, IGrouping<TKey, TElement>>>> GetOrderedEnumerables (QueryOptions options)
+
+               internal override IList<IEnumerable<IGrouping<TKey, TElement>>> GetEnumerables (QueryOptions options)
                {                       
-                       throw new System.NotImplementedException();
+                       return ParallelPartitioner.CreateForChunks (GetGroupedElements ()).GetPartitions (options.PartitionCount).Wrap ();
                }
                
-               internal override IList<IEnumerable<IGrouping<TKey, TElement>>> GetEnumerables (QueryOptions options)
+               internal override IList<IEnumerable<KeyValuePair<long, IGrouping<TKey, TElement>>>> GetOrderedEnumerables (QueryOptions options)
+               {
+                       return ParallelPartitioner.CreateForChunks (GetGroupedElements ()).GetOrderablePartitions (options.PartitionCount).Wrap ();
+               }
+
+               internal IEnumerable<IGrouping<TKey, TElement>> GetGroupedElements ()
                {
-                       throw new System.NotImplementedException();
+                       return (IEnumerable<System.Linq.IGrouping<TKey,TElement>>)GetStore ().Select (e => (IGrouping<TKey,TElement>)new ConcurrentGrouping<TKey, TElement> (e.Key, e.Value));
+               }
+
+               internal ConcurrentDictionary<TKey, ConcurrentQueue<TElement>> GetStore ()
+               {
+                       var store = new ConcurrentDictionary<TKey, ConcurrentQueue<TElement>> ();
+                       Func<TKey, ConcurrentQueue<TElement>> queueFactory = (_) => new ConcurrentQueue<TElement> ();
+
+                       ParallelExecuter.ProcessAndBlock (Parent, (e, c) => {
+                               ConcurrentQueue<TElement> queue = store.GetOrAdd (keySelector (e), queueFactory);
+                               queue.Enqueue (elementSelector (e));
+                       });
+
+                       return store;
                }
        }
 }