X-Git-Url: http://wien.tomnetworks.com/gitweb/?a=blobdiff_plain;f=mcs%2Fclass%2FSystem.Core%2FSystem.Linq.Parallel.QueryNodes%2FQueryGroupByNode.cs;h=69dbbb1bd18987f15b40820364fb4aeea4041a34;hb=e2b2d181084848f3c5dde2788370db1b79893c69;hp=547dabc2ac229ea85a1f6e0aa18d600293e33056;hpb=96a8f92359442f7fbe4c4e2aa13cb8e5024c7360;p=mono.git diff --git a/mcs/class/System.Core/System.Linq.Parallel.QueryNodes/QueryGroupByNode.cs b/mcs/class/System.Core/System.Linq.Parallel.QueryNodes/QueryGroupByNode.cs index 547dabc2ac2..69dbbb1bd18 100644 --- a/mcs/class/System.Core/System.Linq.Parallel.QueryNodes/QueryGroupByNode.cs +++ b/mcs/class/System.Core/System.Linq.Parallel.QueryNodes/QueryGroupByNode.cs @@ -24,7 +24,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -#if NET_4_0 +#if NET_4_0 || MOBILE using System; using System.Threading; using System.Collections; @@ -56,15 +56,33 @@ namespace System.Linq.Parallel.QueryNodes return src.GroupBy (keySelector, elementSelector, comparer); } - - internal override IList>>> GetOrderedEnumerables (QueryOptions options) + + internal override IList>> GetEnumerables (QueryOptions options) { - throw new System.NotImplementedException(); + return ParallelPartitioner.CreateForChunks (GetGroupedElements ()).GetPartitions (options.PartitionCount).Wrap (); } - internal override IList>> GetEnumerables (QueryOptions options) + internal override IList>>> GetOrderedEnumerables (QueryOptions options) + { + return ParallelPartitioner.CreateForChunks (GetGroupedElements ()).GetOrderablePartitions (options.PartitionCount).Wrap (); + } + + internal IEnumerable> GetGroupedElements () { - throw new System.NotImplementedException(); + return (IEnumerable>)GetStore ().Select (e => (IGrouping)new ConcurrentGrouping (e.Key, e.Value)); + } + + internal ConcurrentDictionary> GetStore () + { + var store = new ConcurrentDictionary> (); + Func> queueFactory = (_) => new ConcurrentQueue (); + + ParallelExecuter.ProcessAndBlock (Parent, (e, c) => { + ConcurrentQueue queue = store.GetOrAdd (keySelector (e), queueFactory); + queue.Enqueue (elementSelector (e)); + }); + + return store; } } }