Update Reference Sources to .NET Framework 4.6.1
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / QueryOperators / Unary / ForAllOperator.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // ForAllQueryOperator.cs
9 //
10 // <OWNER>[....]</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Diagnostics.Contracts;
17
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// A forall operator just enables an action to be placed at the "top" of a query tree
22     /// instead of yielding an enumerator that some consumer can walk. We execute the
23     /// query for effect instead of yielding a data result. 
24     /// </summary>
25     /// <typeparam name="TInput"></typeparam>
26     internal sealed class ForAllOperator<TInput> : UnaryQueryOperator<TInput, TInput>
27     {
28
29         // The per-element action to be invoked.
30         private readonly Action<TInput> m_elementAction;
31
32         //---------------------------------------------------------------------------------------
33         // Constructs a new forall operator.
34         //
35
36         internal ForAllOperator(IEnumerable<TInput> child, Action<TInput> elementAction)
37             :base(child)
38         {
39             Contract.Assert(child != null, "child data source cannot be null");
40             Contract.Assert(elementAction != null, "need a function");
41
42             m_elementAction = elementAction;
43         }
44
45         //---------------------------------------------------------------------------------------
46         // This invokes the entire query tree, invoking the per-element action for each result.
47         //
48
49         internal void RunSynchronously()
50         {
51             Contract.Assert(m_elementAction != null);
52             
53             // Get the enumerator w/out using pipelining. By the time this returns, the query
54             // has been executed and we are done. We expect the return to be null.
55             Shared<bool> dummyTopLevelDisposeFlag = new Shared<bool>(false);
56
57             CancellationTokenSource dummyInternalCancellationTokenSource = new CancellationTokenSource();
58
59             // stuff in appropriate defaults for unspecified options.
60             QuerySettings settingsWithDefaults = SpecifiedQuerySettings
61                 .WithPerExecutionSettings(dummyInternalCancellationTokenSource, dummyTopLevelDisposeFlag)
62                 .WithDefaults();
63
64             QueryLifecycle.LogicalQueryExecutionBegin(settingsWithDefaults.QueryId);
65
66             IEnumerator<TInput> enumerator = GetOpenedEnumerator(ParallelMergeOptions.FullyBuffered, true, true,
67                 settingsWithDefaults);
68             settingsWithDefaults.CleanStateAtQueryEnd();
69             Contract.Assert(enumerator == null);
70
71             QueryLifecycle.LogicalQueryExecutionEnd(settingsWithDefaults.QueryId);
72         }
73
74         //---------------------------------------------------------------------------------------
75         // Just opens the current operator, including opening the child and wrapping it with
76         // partitions as needed.
77         //
78
79         internal override QueryResults<TInput> Open(
80             QuerySettings settings, bool preferStriping)
81         {
82             // We just open the child operator.
83             QueryResults<TInput> childQueryResults = Child.Open(settings, preferStriping);
84             return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
85         }
86
87         internal override void  WrapPartitionedStream<TKey>(
88             PartitionedStream<TInput,TKey> inputStream, IPartitionedStreamRecipient<TInput> recipient, bool preferStriping, QuerySettings settings)
89         {
90             int partitionCount = inputStream.PartitionCount;
91             PartitionedStream<TInput, int> outputStream = new PartitionedStream<TInput, int>(
92                 partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
93             for (int i = 0; i < partitionCount; i++)
94             {
95                 outputStream[i] = new ForAllEnumerator<TKey>(
96                     inputStream[i], m_elementAction, settings.CancellationState.MergedCancellationToken);
97             }
98
99             recipient.Receive(outputStream);
100         }
101
102         //---------------------------------------------------------------------------------------
103         // Returns an enumerable that represents the query executing sequentially.
104         //
105
106         internal override IEnumerable<TInput> AsSequentialQuery(CancellationToken token)
107         {
108             Contract.Assert(false, "AsSequentialQuery is not supported on ForAllOperator");
109             throw new InvalidOperationException();
110         }
111
112         //---------------------------------------------------------------------------------------
113         // Whether this operator performs a premature merge that would not be performed in
114         // a similar sequential operation (i.e., in LINQ to Objects).
115         //
116
117         internal override bool LimitsParallelism
118         {
119             get { return false; }
120         }
121
122         //---------------------------------------------------------------------------------------
123         // The executable form of a forall operator. When it is enumerated, the entire underlying
124         // partition is walked, invoking the per-element action for each item.
125         //
126
127         private class ForAllEnumerator<TKey> : QueryOperatorEnumerator<TInput, int>
128         {
129             private readonly QueryOperatorEnumerator<TInput, TKey> m_source; // The data source.
130             private readonly Action<TInput> m_elementAction; // Forall operator being executed.
131             private CancellationToken m_cancellationToken; // Token used to cancel this operator.
132
133             //---------------------------------------------------------------------------------------
134             // Constructs a new forall enumerator object.
135             //
136
137             internal ForAllEnumerator(QueryOperatorEnumerator<TInput, TKey> source, Action<TInput> elementAction, CancellationToken cancellationToken)
138             {
139                 Contract.Assert(source != null);
140                 Contract.Assert(elementAction != null);
141
142                 m_source = source;
143                 m_elementAction = elementAction;
144                 m_cancellationToken = cancellationToken;
145             }
146
147             //---------------------------------------------------------------------------------------
148             // Just walks the entire data source upon its first invocation, performing the per-
149             // element action for each element.
150             //
151
152             internal override bool MoveNext(ref TInput currentElement, ref int currentKey)
153             {
154                 Contract.Assert(m_elementAction != null, "expected a compiled operator");
155
156                 // We just scroll through the enumerator and execute the action. Because we execute
157                 // "in place", we actually never even produce a single value.
158                 
159                 // Cancellation testing must be performed here as full enumeration occurs within this method.
160                 // We only need to throw a simple exception here.. marshalling logic handled via QueryTaskGroupState.QueryEnd (called by ForAllSpoolingTask)
161                 TInput element = default(TInput);
162                 TKey keyUnused = default(TKey);
163                 int i = 0;
164                 while (m_source.MoveNext(ref element, ref keyUnused))
165                 {
166                     if ((i++ & CancellationState.POLL_INTERVAL) == 0)
167                         CancellationState.ThrowIfCanceled(m_cancellationToken);
168                     m_elementAction(element);
169                 }
170
171                 
172                 return false;
173             }
174
175             protected override void Dispose(bool disposing)
176             {
177                 Contract.Assert(m_source != null);
178                 m_source.Dispose();
179             }
180         }
181     }
182 }