3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // UnaryQueryOperator.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
17 namespace System.Linq.Parallel
20 /// The base class from which all binary query operators derive, that is, those that
21 /// have two child operators. This introduces some convenience methods for those
22 /// classes, as well as any state common to all subclasses.
24 /// <typeparam name="TInput"></typeparam>
25 /// <typeparam name="TOutput"></typeparam>
26 internal abstract class UnaryQueryOperator<TInput, TOutput> : QueryOperator<TOutput>
29 // The single child operator for the current node.
30 private readonly QueryOperator<TInput> m_child;
32 // The state of the order index of the output of this operator.
33 private OrdinalIndexState m_indexState = OrdinalIndexState.Shuffled;
35 //---------------------------------------------------------------------------------------
39 internal UnaryQueryOperator(IEnumerable<TInput> child)
40 : this(QueryOperator<TInput>.AsQueryOperator(child))
44 internal UnaryQueryOperator(IEnumerable<TInput> child, bool outputOrdered)
45 : this(QueryOperator<TInput>.AsQueryOperator(child), outputOrdered)
49 private UnaryQueryOperator(QueryOperator<TInput> child)
50 : this(child, child.OutputOrdered, child.SpecifiedQuerySettings)
54 internal UnaryQueryOperator(QueryOperator<TInput> child, bool outputOrdered)
55 : this(child, outputOrdered, child.SpecifiedQuerySettings)
59 private UnaryQueryOperator(QueryOperator<TInput> child, bool outputOrdered, QuerySettings settings)
60 : base(outputOrdered, settings)
65 internal QueryOperator<TInput> Child
67 get { return m_child; }
70 internal override sealed OrdinalIndexState OrdinalIndexState
72 get { return m_indexState; }
75 protected void SetOrdinalIndexState(OrdinalIndexState indexState)
77 m_indexState = indexState;
80 //---------------------------------------------------------------------------------------
81 // This method wraps each enumerator in inputStream with an enumerator performing this
82 // operator's transformation. However, instead of returning the transformed partitioned
83 // stream, we pass it to a recipient object by calling recipient.Give<TNewKey>(..). That
84 // way, we can "return" a partitioned stream that potentially uses a different order key
85 // from the order key of the input stream.
88 internal abstract void WrapPartitionedStream<TKey>(
89 PartitionedStream<TInput, TKey> inputStream, IPartitionedStreamRecipient<TOutput> recipient,
90 bool preferStriping, QuerySettings settings);
93 //---------------------------------------------------------------------------------------
94 // Implementation of QueryResults for an unary operator. The results will not be indexible
95 // unless a derived class provides that functionality.
98 internal class UnaryQueryOperatorResults : QueryResults<TOutput>
100 protected QueryResults<TInput> m_childQueryResults; // Results of the child query
101 private UnaryQueryOperator<TInput, TOutput> m_op; // Operator that generated these results
102 private QuerySettings m_settings; // Settings collected from the query
103 private bool m_preferStriping; // If the results are indexible, should we use striping when partitioning them
105 internal UnaryQueryOperatorResults(QueryResults<TInput> childQueryResults, UnaryQueryOperator<TInput, TOutput> op, QuerySettings settings, bool preferStriping)
107 m_childQueryResults = childQueryResults;
109 m_settings = settings;
110 m_preferStriping = preferStriping;
113 internal override void GivePartitionedStream(IPartitionedStreamRecipient<TOutput> recipient)
115 Contract.Assert(IsIndexible == (m_op.OrdinalIndexState == OrdinalIndexState.Indexible));
117 if (m_settings.ExecutionMode.Value == ParallelExecutionMode.Default && m_op.LimitsParallelism)
119 // We need to run the query sequentially, up to and including this operator
120 IEnumerable<TOutput> opSequential = m_op.AsSequentialQuery(m_settings.CancellationState.ExternalCancellationToken);
121 PartitionedStream<TOutput, int> result = ExchangeUtilities.PartitionDataSource(
122 opSequential, m_settings.DegreeOfParallelism.Value, m_preferStriping);
123 recipient.Receive<int>(result);
125 else if (IsIndexible)
127 // The output of this operator is indexible. Pass the partitioned output into the IPartitionedStreamRecipient.
128 PartitionedStream<TOutput, int> result = ExchangeUtilities.PartitionDataSource(this, m_settings.DegreeOfParallelism.Value, m_preferStriping);
129 recipient.Receive<int>(result);
133 // The common case: get partitions from the child and wrap each partition.
134 m_childQueryResults.GivePartitionedStream(new ChildResultsRecipient(recipient, m_op, m_preferStriping, m_settings));
138 //---------------------------------------------------------------------------------------
139 // ChildResultsRecipient is a recipient of a partitioned stream. It receives a partitioned
140 // stream from the child operator, wraps the enumerators with the transformation for this
141 // operator, and passes the partitioned stream along to the next recipient (the parent
145 private class ChildResultsRecipient : IPartitionedStreamRecipient<TInput>
147 IPartitionedStreamRecipient<TOutput> m_outputRecipient;
148 UnaryQueryOperator<TInput, TOutput> m_op;
149 bool m_preferStriping;
150 QuerySettings m_settings;
152 internal ChildResultsRecipient(
153 IPartitionedStreamRecipient<TOutput> outputRecipient, UnaryQueryOperator<TInput, TOutput> op, bool preferStriping, QuerySettings settings)
155 m_outputRecipient = outputRecipient;
157 m_preferStriping = preferStriping;
158 m_settings = settings;
161 public void Receive<TKey>(PartitionedStream<TInput, TKey> inputStream)
163 // Call WrapPartitionedStream on our operator, which will wrap the input
164 // partitioned stream, and pass the result along to m_outputRecipient.
165 m_op.WrapPartitionedStream(inputStream, m_outputRecipient, m_preferStriping, m_settings);