Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / UnaryQueryOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // UnaryQueryOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16
17 namespace System.Linq.Parallel
18 {
19     /// <summary>
20     /// The base class from which all binary query operators derive, that is, those that
21     /// have two child operators. This introduces some convenience methods for those
22     /// classes, as well as any state common to all subclasses.
23     /// </summary>
24     /// <typeparam name="TInput"></typeparam>
25     /// <typeparam name="TOutput"></typeparam>
26     internal abstract class UnaryQueryOperator<TInput, TOutput> : QueryOperator<TOutput>
27     {
28
29         // The single child operator for the current node.
30         private readonly QueryOperator<TInput> m_child;
31
32         // The state of the order index of the output of this operator.
33         private OrdinalIndexState m_indexState = OrdinalIndexState.Shuffled;
34
35         //---------------------------------------------------------------------------------------
36         // Constructors
37         //
38
39         internal UnaryQueryOperator(IEnumerable<TInput> child)
40             : this(QueryOperator<TInput>.AsQueryOperator(child))
41         {
42         }
43
44         internal UnaryQueryOperator(IEnumerable<TInput> child, bool outputOrdered)
45             : this(QueryOperator<TInput>.AsQueryOperator(child), outputOrdered)
46         {
47         }
48
49         private UnaryQueryOperator(QueryOperator<TInput> child)
50             : this(child, child.OutputOrdered, child.SpecifiedQuerySettings)
51         {
52         }
53
54         internal UnaryQueryOperator(QueryOperator<TInput> child, bool outputOrdered)
55             : this(child, outputOrdered, child.SpecifiedQuerySettings)
56         {
57         }
58
59         private UnaryQueryOperator(QueryOperator<TInput> child, bool outputOrdered, QuerySettings settings)
60             : base(outputOrdered, settings)
61         {
62             m_child = child;
63         }
64
65         internal QueryOperator<TInput> Child
66         {
67             get { return m_child; }
68         }
69
70         internal override sealed OrdinalIndexState OrdinalIndexState
71         {
72             get { return m_indexState; }
73         }
74
75         protected void SetOrdinalIndexState(OrdinalIndexState indexState)
76         {
77             m_indexState = indexState;
78         }
79
80         //---------------------------------------------------------------------------------------
81         // This method wraps each enumerator in inputStream with an enumerator performing this
82         // operator's transformation. However, instead of returning the transformed partitioned
83         // stream, we pass it to a recipient object by calling recipient.Give<TNewKey>(..). That
84         // way, we can "return" a partitioned stream that potentially uses a different order key
85         // from the order key of the input stream.
86         //
87
88         internal abstract void WrapPartitionedStream<TKey>(
89             PartitionedStream<TInput, TKey> inputStream, IPartitionedStreamRecipient<TOutput> recipient,
90             bool preferStriping, QuerySettings settings);
91
92
93         //---------------------------------------------------------------------------------------
94         // Implementation of QueryResults for an unary operator. The results will not be indexible
95         // unless a derived class provides that functionality.
96         //
97
98         internal class UnaryQueryOperatorResults : QueryResults<TOutput>
99         {
100             protected QueryResults<TInput> m_childQueryResults; // Results of the child query
101             private UnaryQueryOperator<TInput, TOutput> m_op; // Operator that generated these results
102             private QuerySettings m_settings; // Settings collected from the query
103             private bool m_preferStriping; // If the results are indexible, should we use striping when partitioning them
104
105             internal UnaryQueryOperatorResults(QueryResults<TInput> childQueryResults, UnaryQueryOperator<TInput, TOutput> op, QuerySettings settings, bool preferStriping)
106             {
107                 m_childQueryResults = childQueryResults;
108                 m_op = op;
109                 m_settings = settings;
110                 m_preferStriping = preferStriping;
111             }
112
113             internal override void GivePartitionedStream(IPartitionedStreamRecipient<TOutput> recipient)
114             {
115                 Contract.Assert(IsIndexible == (m_op.OrdinalIndexState == OrdinalIndexState.Indexible));
116
117                 if (m_settings.ExecutionMode.Value == ParallelExecutionMode.Default && m_op.LimitsParallelism)
118                 {
119                     // We need to run the query sequentially, up to and including this operator
120                     IEnumerable<TOutput> opSequential = m_op.AsSequentialQuery(m_settings.CancellationState.ExternalCancellationToken);
121                     PartitionedStream<TOutput, int> result = ExchangeUtilities.PartitionDataSource(
122                         opSequential, m_settings.DegreeOfParallelism.Value, m_preferStriping);
123                     recipient.Receive<int>(result);
124                 }
125                 else if (IsIndexible)
126                 {
127                     // The output of this operator is indexible. Pass the partitioned output into the IPartitionedStreamRecipient.
128                     PartitionedStream<TOutput, int> result = ExchangeUtilities.PartitionDataSource(this, m_settings.DegreeOfParallelism.Value, m_preferStriping);
129                     recipient.Receive<int>(result);
130                 }
131                 else
132                 {
133                     // The common case: get partitions from the child and wrap each partition.
134                     m_childQueryResults.GivePartitionedStream(new ChildResultsRecipient(recipient, m_op, m_preferStriping, m_settings));
135                 }
136             }
137
138             //---------------------------------------------------------------------------------------
139             // ChildResultsRecipient is a recipient of a partitioned stream. It receives a partitioned
140             // stream from the child operator, wraps the enumerators with the transformation for this
141             // operator, and passes the partitioned stream along to the next recipient (the parent
142             // operator).
143             //
144
145             private class ChildResultsRecipient : IPartitionedStreamRecipient<TInput>
146             {
147                 IPartitionedStreamRecipient<TOutput> m_outputRecipient;
148                 UnaryQueryOperator<TInput, TOutput> m_op;
149                 bool m_preferStriping;
150                 QuerySettings m_settings;
151
152                 internal ChildResultsRecipient(
153                     IPartitionedStreamRecipient<TOutput> outputRecipient, UnaryQueryOperator<TInput, TOutput> op, bool preferStriping, QuerySettings settings)
154                 {
155                     m_outputRecipient = outputRecipient;
156                     m_op = op;
157                     m_preferStriping = preferStriping;
158                     m_settings = settings;
159                 }
160
161                 public void Receive<TKey>(PartitionedStream<TInput, TKey> inputStream)
162                 {
163                     // Call WrapPartitionedStream on our operator, which will wrap the input
164                     // partitioned stream, and pass the result along to m_outputRecipient.
165                     m_op.WrapPartitionedStream(inputStream, m_outputRecipient, m_preferStriping, m_settings);
166                 }
167             }
168         }
169
170     }
171 }