Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Binary / GroupJoinQueryOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // GroupJoinQueryOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// A group join operator takes a left query tree and a right query tree, and then yields
22     /// the matching elements between the two. This can be used for outer joins, i.e. those
23     /// where an outer element has no matching inner elements -- the result is just an empty
24     /// list. As with the join algorithm above, we currently use a hash join algorithm.
25     /// </summary>
26     /// <typeparam name="TLeftInput"></typeparam>
27     /// <typeparam name="TRightInput"></typeparam>
28     /// <typeparam name="TKey"></typeparam>
29     /// <typeparam name="TOutput"></typeparam>
30     internal sealed class GroupJoinQueryOperator<TLeftInput, TRightInput, TKey, TOutput> :  BinaryQueryOperator<TLeftInput, TRightInput, TOutput>
31     {
32
33         private readonly Func<TLeftInput, TKey> m_leftKeySelector; // The key selection routine for the outer (left) data source.
34         private readonly Func<TRightInput, TKey> m_rightKeySelector; // The key selection routine for the inner (right) data source.
35         private readonly Func<TLeftInput, IEnumerable<TRightInput>, TOutput> m_resultSelector; // The result selection routine.
36         private readonly IEqualityComparer<TKey> m_keyComparer; // An optional key comparison object.
37
38         //---------------------------------------------------------------------------------------
39         // Constructs a new join operator.
40         //
41
42         internal GroupJoinQueryOperator(ParallelQuery<TLeftInput> left, ParallelQuery<TRightInput> right,
43                                         Func<TLeftInput, TKey> leftKeySelector,
44                                         Func<TRightInput, TKey> rightKeySelector,
45                                         Func<TLeftInput, IEnumerable<TRightInput>, TOutput> resultSelector,
46                                         IEqualityComparer<TKey> keyComparer)
47             :base(left, right)
48         {
49             Contract.Assert(left != null && right != null, "child data sources cannot be null");
50             Contract.Assert(leftKeySelector != null, "left key selector must not be null");
51             Contract.Assert(rightKeySelector != null, "right key selector must not be null");
52             Contract.Assert(resultSelector != null, "need a result selector function");
53
54             m_leftKeySelector = leftKeySelector;
55             m_rightKeySelector = rightKeySelector;
56             m_resultSelector = resultSelector;
57             m_keyComparer = keyComparer;
58             m_outputOrdered = LeftChild.OutputOrdered;
59
60             SetOrdinalIndex(OrdinalIndexState.Shuffled);
61         }
62
63         //---------------------------------------------------------------------------------------
64         // Just opens the current operator, including opening the child and wrapping it with
65         // partitions as needed.
66         //
67
68         internal override QueryResults<TOutput> Open(QuerySettings settings, bool preferStriping)
69         {
70             QueryResults<TLeftInput> leftResults = LeftChild.Open(settings, false);
71             QueryResults<TRightInput> rightResults = RightChild.Open(settings, false);
72
73             return new BinaryQueryOperatorResults(leftResults, rightResults, this, settings, false);
74         }
75
76         public override void WrapPartitionedStream<TLeftKey, TRightKey>(
77             PartitionedStream<TLeftInput, TLeftKey> leftStream, PartitionedStream<TRightInput, TRightKey> rightStream,
78             IPartitionedStreamRecipient<TOutput> outputRecipient, bool preferStriping, QuerySettings settings)
79         {
80             Contract.Assert(rightStream.PartitionCount == leftStream.PartitionCount);
81             int partitionCount = leftStream.PartitionCount;
82
83             if (LeftChild.OutputOrdered)
84             {
85                 WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
86                     ExchangeUtilities.HashRepartitionOrdered(leftStream, m_leftKeySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken),
87                     rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken);
88             }
89             else
90             {
91                 WrapPartitionedStreamHelper<int, TRightKey>(
92                     ExchangeUtilities.HashRepartition(leftStream, m_leftKeySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken),
93                     rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken);
94             }
95         }
96
97         //---------------------------------------------------------------------------------------
98         // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going
99         // to be, and then call this method with that key as a generic parameter.
100         //
101
102         private void WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
103             PartitionedStream<Pair<TLeftInput, TKey>, TLeftKey> leftHashStream, PartitionedStream<TRightInput, TRightKey> rightPartitionedStream, 
104             IPartitionedStreamRecipient<TOutput> outputRecipient, int partitionCount, CancellationToken cancellationToken)
105         {
106             PartitionedStream<Pair<TRightInput, TKey>, int> rightHashStream = ExchangeUtilities.HashRepartition(
107                 rightPartitionedStream, m_rightKeySelector, m_keyComparer, null, cancellationToken);
108
109             PartitionedStream<TOutput, TLeftKey> outputStream = new PartitionedStream<TOutput, TLeftKey>(
110                 partitionCount, leftHashStream.KeyComparer, OrdinalIndexState);
111
112             for (int i = 0; i < partitionCount; i++)
113             {
114                 outputStream[i] = new HashJoinQueryOperatorEnumerator<TLeftInput, TLeftKey, TRightInput, TKey, TOutput>(
115                     leftHashStream[i], rightHashStream[i], null, m_resultSelector, m_keyComparer, cancellationToken);
116             }
117
118             outputRecipient.Receive(outputStream);
119         }
120
121         //---------------------------------------------------------------------------------------
122         // Returns an enumerable that represents the query executing sequentially.
123         //
124
125         internal override IEnumerable<TOutput> AsSequentialQuery(CancellationToken token)
126         {
127             IEnumerable<TLeftInput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
128             IEnumerable<TRightInput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token);
129
130             return wrappedLeftChild
131                 .GroupJoin(
132                 wrappedRightChild, m_leftKeySelector, m_rightKeySelector, m_resultSelector, m_keyComparer);
133         }
134
135         //---------------------------------------------------------------------------------------
136         // Whether this operator performs a premature merge that would not be performed in
137         // a similar sequential operation (i.e., in LINQ to Objects).
138         //
139
140         internal override bool LimitsParallelism
141         {
142             get { return false; }
143         }
144     }
145 }