3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // ForAllQueryOperator.cs
10 // <OWNER>[....]</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Diagnostics.Contracts;
18 namespace System.Linq.Parallel
21 /// A forall operator just enables an action to be placed at the "top" of a query tree
22 /// instead of yielding an enumerator that some consumer can walk. We execute the
23 /// query for effect instead of yielding a data result.
25 /// <typeparam name="TInput"></typeparam>
26 internal sealed class ForAllOperator<TInput> : UnaryQueryOperator<TInput, TInput>
29 // The per-element action to be invoked.
30 private readonly Action<TInput> m_elementAction;
32 //---------------------------------------------------------------------------------------
33 // Constructs a new forall operator.
36 internal ForAllOperator(IEnumerable<TInput> child, Action<TInput> elementAction)
39 Contract.Assert(child != null, "child data source cannot be null");
40 Contract.Assert(elementAction != null, "need a function");
42 m_elementAction = elementAction;
45 //---------------------------------------------------------------------------------------
46 // This invokes the entire query tree, invoking the per-element action for each result.
49 internal void RunSynchronously()
51 Contract.Assert(m_elementAction != null);
53 // Get the enumerator w/out using pipelining. By the time this returns, the query
54 // has been executed and we are done. We expect the return to be null.
55 Shared<bool> dummyTopLevelDisposeFlag = new Shared<bool>(false);
57 CancellationTokenSource dummyInternalCancellationTokenSource = new CancellationTokenSource();
59 // stuff in appropriate defaults for unspecified options.
60 QuerySettings settingsWithDefaults = SpecifiedQuerySettings
61 .WithPerExecutionSettings(dummyInternalCancellationTokenSource, dummyTopLevelDisposeFlag)
64 QueryLifecycle.LogicalQueryExecutionBegin(settingsWithDefaults.QueryId);
66 IEnumerator<TInput> enumerator = GetOpenedEnumerator(ParallelMergeOptions.FullyBuffered, true, true,
67 settingsWithDefaults);
68 settingsWithDefaults.CleanStateAtQueryEnd();
69 Contract.Assert(enumerator == null);
71 QueryLifecycle.LogicalQueryExecutionEnd(settingsWithDefaults.QueryId);
74 //---------------------------------------------------------------------------------------
75 // Just opens the current operator, including opening the child and wrapping it with
76 // partitions as needed.
79 internal override QueryResults<TInput> Open(
80 QuerySettings settings, bool preferStriping)
82 // We just open the child operator.
83 QueryResults<TInput> childQueryResults = Child.Open(settings, preferStriping);
84 return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
87 internal override void WrapPartitionedStream<TKey>(
88 PartitionedStream<TInput,TKey> inputStream, IPartitionedStreamRecipient<TInput> recipient, bool preferStriping, QuerySettings settings)
90 int partitionCount = inputStream.PartitionCount;
91 PartitionedStream<TInput, int> outputStream = new PartitionedStream<TInput, int>(
92 partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
93 for (int i = 0; i < partitionCount; i++)
95 outputStream[i] = new ForAllEnumerator<TKey>(
96 inputStream[i], m_elementAction, settings.CancellationState.MergedCancellationToken);
99 recipient.Receive(outputStream);
102 //---------------------------------------------------------------------------------------
103 // Returns an enumerable that represents the query executing sequentially.
106 internal override IEnumerable<TInput> AsSequentialQuery(CancellationToken token)
108 Contract.Assert(false, "AsSequentialQuery is not supported on ForAllOperator");
109 throw new InvalidOperationException();
112 //---------------------------------------------------------------------------------------
113 // Whether this operator performs a premature merge that would not be performed in
114 // a similar sequential operation (i.e., in LINQ to Objects).
117 internal override bool LimitsParallelism
119 get { return false; }
122 //---------------------------------------------------------------------------------------
123 // The executable form of a forall operator. When it is enumerated, the entire underlying
124 // partition is walked, invoking the per-element action for each item.
127 private class ForAllEnumerator<TKey> : QueryOperatorEnumerator<TInput, int>
129 private readonly QueryOperatorEnumerator<TInput, TKey> m_source; // The data source.
130 private readonly Action<TInput> m_elementAction; // Forall operator being executed.
131 private CancellationToken m_cancellationToken; // Token used to cancel this operator.
133 //---------------------------------------------------------------------------------------
134 // Constructs a new forall enumerator object.
137 internal ForAllEnumerator(QueryOperatorEnumerator<TInput, TKey> source, Action<TInput> elementAction, CancellationToken cancellationToken)
139 Contract.Assert(source != null);
140 Contract.Assert(elementAction != null);
143 m_elementAction = elementAction;
144 m_cancellationToken = cancellationToken;
147 //---------------------------------------------------------------------------------------
148 // Just walks the entire data source upon its first invocation, performing the per-
149 // element action for each element.
152 internal override bool MoveNext(ref TInput currentElement, ref int currentKey)
154 Contract.Assert(m_elementAction != null, "expected a compiled operator");
156 // We just scroll through the enumerator and execute the action. Because we execute
157 // "in place", we actually never even produce a single value.
159 // Cancellation testing must be performed here as full enumeration occurs within this method.
160 // We only need to throw a simple exception here.. marshalling logic handled via QueryTaskGroupState.QueryEnd (called by ForAllSpoolingTask)
161 TInput element = default(TInput);
162 TKey keyUnused = default(TKey);
164 while (m_source.MoveNext(ref element, ref keyUnused))
166 if ((i++ & CancellationState.POLL_INTERVAL) == 0)
167 CancellationState.ThrowIfCanceled(m_cancellationToken);
168 m_elementAction(element);
175 protected override void Dispose(bool disposing)
177 Contract.Assert(m_source != null);