3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // GroupJoinQueryOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
18 namespace System.Linq.Parallel
21 /// A group join operator takes a left query tree and a right query tree, and then yields
22 /// the matching elements between the two. This can be used for outer joins, i.e. those
23 /// where an outer element has no matching inner elements -- the result is just an empty
24 /// list. As with the join algorithm above, we currently use a hash join algorithm.
26 /// <typeparam name="TLeftInput"></typeparam>
27 /// <typeparam name="TRightInput"></typeparam>
28 /// <typeparam name="TKey"></typeparam>
29 /// <typeparam name="TOutput"></typeparam>
30 internal sealed class GroupJoinQueryOperator<TLeftInput, TRightInput, TKey, TOutput> : BinaryQueryOperator<TLeftInput, TRightInput, TOutput>
33 private readonly Func<TLeftInput, TKey> m_leftKeySelector; // The key selection routine for the outer (left) data source.
34 private readonly Func<TRightInput, TKey> m_rightKeySelector; // The key selection routine for the inner (right) data source.
35 private readonly Func<TLeftInput, IEnumerable<TRightInput>, TOutput> m_resultSelector; // The result selection routine.
36 private readonly IEqualityComparer<TKey> m_keyComparer; // An optional key comparison object.
38 //---------------------------------------------------------------------------------------
39 // Constructs a new join operator.
42 internal GroupJoinQueryOperator(ParallelQuery<TLeftInput> left, ParallelQuery<TRightInput> right,
43 Func<TLeftInput, TKey> leftKeySelector,
44 Func<TRightInput, TKey> rightKeySelector,
45 Func<TLeftInput, IEnumerable<TRightInput>, TOutput> resultSelector,
46 IEqualityComparer<TKey> keyComparer)
49 Contract.Assert(left != null && right != null, "child data sources cannot be null");
50 Contract.Assert(leftKeySelector != null, "left key selector must not be null");
51 Contract.Assert(rightKeySelector != null, "right key selector must not be null");
52 Contract.Assert(resultSelector != null, "need a result selector function");
54 m_leftKeySelector = leftKeySelector;
55 m_rightKeySelector = rightKeySelector;
56 m_resultSelector = resultSelector;
57 m_keyComparer = keyComparer;
58 m_outputOrdered = LeftChild.OutputOrdered;
60 SetOrdinalIndex(OrdinalIndexState.Shuffled);
63 //---------------------------------------------------------------------------------------
64 // Just opens the current operator, including opening the child and wrapping it with
65 // partitions as needed.
68 internal override QueryResults<TOutput> Open(QuerySettings settings, bool preferStriping)
70 QueryResults<TLeftInput> leftResults = LeftChild.Open(settings, false);
71 QueryResults<TRightInput> rightResults = RightChild.Open(settings, false);
73 return new BinaryQueryOperatorResults(leftResults, rightResults, this, settings, false);
76 public override void WrapPartitionedStream<TLeftKey, TRightKey>(
77 PartitionedStream<TLeftInput, TLeftKey> leftStream, PartitionedStream<TRightInput, TRightKey> rightStream,
78 IPartitionedStreamRecipient<TOutput> outputRecipient, bool preferStriping, QuerySettings settings)
80 Contract.Assert(rightStream.PartitionCount == leftStream.PartitionCount);
81 int partitionCount = leftStream.PartitionCount;
83 if (LeftChild.OutputOrdered)
85 WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
86 ExchangeUtilities.HashRepartitionOrdered(leftStream, m_leftKeySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken),
87 rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken);
91 WrapPartitionedStreamHelper<int, TRightKey>(
92 ExchangeUtilities.HashRepartition(leftStream, m_leftKeySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken),
93 rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken);
97 //---------------------------------------------------------------------------------------
98 // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going
99 // to be, and then call this method with that key as a generic parameter.
102 private void WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
103 PartitionedStream<Pair<TLeftInput, TKey>, TLeftKey> leftHashStream, PartitionedStream<TRightInput, TRightKey> rightPartitionedStream,
104 IPartitionedStreamRecipient<TOutput> outputRecipient, int partitionCount, CancellationToken cancellationToken)
106 PartitionedStream<Pair<TRightInput, TKey>, int> rightHashStream = ExchangeUtilities.HashRepartition(
107 rightPartitionedStream, m_rightKeySelector, m_keyComparer, null, cancellationToken);
109 PartitionedStream<TOutput, TLeftKey> outputStream = new PartitionedStream<TOutput, TLeftKey>(
110 partitionCount, leftHashStream.KeyComparer, OrdinalIndexState);
112 for (int i = 0; i < partitionCount; i++)
114 outputStream[i] = new HashJoinQueryOperatorEnumerator<TLeftInput, TLeftKey, TRightInput, TKey, TOutput>(
115 leftHashStream[i], rightHashStream[i], null, m_resultSelector, m_keyComparer, cancellationToken);
118 outputRecipient.Receive(outputStream);
121 //---------------------------------------------------------------------------------------
122 // Returns an enumerable that represents the query executing sequentially.
125 internal override IEnumerable<TOutput> AsSequentialQuery(CancellationToken token)
127 IEnumerable<TLeftInput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
128 IEnumerable<TRightInput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token);
130 return wrappedLeftChild
132 wrappedRightChild, m_leftKeySelector, m_rightKeySelector, m_resultSelector, m_keyComparer);
135 //---------------------------------------------------------------------------------------
136 // Whether this operator performs a premature merge that would not be performed in
137 // a similar sequential operation (i.e., in LINQ to Objects).
140 internal override bool LimitsParallelism
142 get { return false; }