3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Threading;
15 using System.Threading.Tasks;
16 using System.Diagnostics.Contracts;
18 using System.Core; // for System.Core.SR
21 namespace System.Linq.Parallel
24 /// This type contains query execution options specified by the user.
25 /// QuerySettings are used as follows:
26 /// - in the query construction phase, some settings may be uninitialized.
27 /// - at the start of the query opening phase, the WithDefaults method
28 /// is used to initialize all uninitialized settings.
29 /// - in the rest of the query opening phase, we assume that all settings
30 /// have been initialized.
32 internal struct QuerySettings
34 private TaskScheduler m_taskScheduler;
35 private int? m_degreeOfParallelism;
36 private CancellationState m_cancellationState;
37 private ParallelExecutionMode? m_executionMode;
38 private ParallelMergeOptions? m_mergeOptions;
39 private int m_queryId;
41 internal CancellationState CancellationState
43 get { return m_cancellationState; }
46 m_cancellationState = value;
47 Contract.Assert(m_cancellationState != null);
51 // The task manager on which to execute the query.
52 internal TaskScheduler TaskScheduler
54 get { return m_taskScheduler; }
55 set { m_taskScheduler = value; }
58 // The number of parallel tasks to utilize.
59 internal int? DegreeOfParallelism
61 get { return m_degreeOfParallelism; }
62 set { m_degreeOfParallelism = value; }
65 // The mode in which to execute this query.
66 internal ParallelExecutionMode? ExecutionMode
68 get { return m_executionMode; }
69 set { m_executionMode = value; }
72 internal ParallelMergeOptions? MergeOptions
74 get { return m_mergeOptions; }
75 set { m_mergeOptions = value; }
86 //-----------------------------------------------------------------------------------
87 // Constructs a new settings structure.
89 internal QuerySettings(TaskScheduler taskScheduler, int? degreeOfParallelism,
90 CancellationToken externalCancellationToken, ParallelExecutionMode? executionMode,
91 ParallelMergeOptions? mergeOptions)
93 m_taskScheduler = taskScheduler;
94 m_degreeOfParallelism = degreeOfParallelism;
95 m_cancellationState = new CancellationState(externalCancellationToken);
96 m_executionMode = executionMode;
97 m_mergeOptions = mergeOptions;
100 Contract.Assert(m_cancellationState != null);
103 //-----------------------------------------------------------------------------------
104 // Combines two sets of options.
106 internal QuerySettings Merge(QuerySettings settings2)
108 if (this.TaskScheduler != null && settings2.TaskScheduler != null)
110 throw new InvalidOperationException(SR.GetString(SR.ParallelQuery_DuplicateTaskScheduler));
113 if (this.DegreeOfParallelism != null && settings2.DegreeOfParallelism != null)
115 throw new InvalidOperationException(SR.GetString(SR.ParallelQuery_DuplicateDOP));
118 if (this.CancellationState.ExternalCancellationToken.CanBeCanceled && settings2.CancellationState.ExternalCancellationToken.CanBeCanceled)
120 throw new InvalidOperationException(SR.GetString(SR.ParallelQuery_DuplicateWithCancellation));
123 if (this.ExecutionMode != null && settings2.ExecutionMode != null)
125 throw new InvalidOperationException(SR.GetString(SR.ParallelQuery_DuplicateExecutionMode));
128 if (this.MergeOptions != null && settings2.MergeOptions != null)
130 throw new InvalidOperationException(SR.GetString(SR.ParallelQuery_DuplicateMergeOptions));
133 TaskScheduler tm = (this.TaskScheduler == null) ? settings2.TaskScheduler : this.TaskScheduler;
134 int? dop = this.DegreeOfParallelism.HasValue ? this.DegreeOfParallelism : settings2.DegreeOfParallelism;
135 CancellationToken externalCancellationToken = (this.CancellationState.ExternalCancellationToken.CanBeCanceled) ? this.CancellationState.ExternalCancellationToken : settings2.CancellationState.ExternalCancellationToken;
136 ParallelExecutionMode? executionMode = this.ExecutionMode.HasValue ? this.ExecutionMode : settings2.ExecutionMode;
137 ParallelMergeOptions? mergeOptions = this.MergeOptions.HasValue ? this.MergeOptions : settings2.MergeOptions;
139 return new QuerySettings(tm, dop, externalCancellationToken, executionMode, mergeOptions);
142 internal QuerySettings WithPerExecutionSettings()
144 return WithPerExecutionSettings(new CancellationTokenSource(), new Shared<bool>(false));
147 internal QuerySettings WithPerExecutionSettings(CancellationTokenSource topLevelCancellationTokenSource, Shared<bool> topLevelDisposedFlag)
149 //Initialize a new QuerySettings structure and copy in the current settings.
150 //Note: this has the very important effect of newing a fresh CancellationSettings,
151 // and _not_ copying in the current internalCancellationSource or topLevelDisposedFlag which should not be
152 // propogated to internal query executions. (This affects SelectMany execution and specifically fixes bug:535510)
153 // The fresh toplevel parameters are used instead.
154 QuerySettings settings = new QuerySettings(TaskScheduler, DegreeOfParallelism, CancellationState.ExternalCancellationToken, ExecutionMode, MergeOptions);
156 Contract.Assert(topLevelCancellationTokenSource != null, "There should always be a top-level cancellation signal specified.");
157 settings.CancellationState.InternalCancellationTokenSource = topLevelCancellationTokenSource;
159 //Merge internal and external tokens to form the combined token
160 settings.CancellationState.MergedCancellationTokenSource =
161 CancellationTokenSource.CreateLinkedTokenSource(settings.CancellationState.InternalCancellationTokenSource.Token, settings.CancellationState.ExternalCancellationToken);
163 // and copy in the topLevelDisposedFlag
164 settings.CancellationState.TopLevelDisposedFlag = topLevelDisposedFlag;
166 Contract.Assert(settings.CancellationState.InternalCancellationTokenSource != null);
167 Contract.Assert(settings.CancellationState.MergedCancellationToken.CanBeCanceled);
168 Contract.Assert(settings.CancellationState.TopLevelDisposedFlag != null);
170 // Finally, assign a query Id to the settings
171 #if !PFX_LEGACY_3_5 && !SILVERLIGHT
172 settings.m_queryId = PlinqEtwProvider.NextQueryId();
178 //-----------------------------------------------------------------------------------
179 // Copies the settings, replacing unspecified settings with defaults.
181 internal QuerySettings WithDefaults()
183 QuerySettings settings = this;
184 if (settings.TaskScheduler == null)
186 settings.TaskScheduler = TaskScheduler.Default;
189 if (settings.DegreeOfParallelism == null)
191 settings.DegreeOfParallelism = Scheduling.GetDefaultDegreeOfParallelism();
194 if (settings.ExecutionMode == null)
196 settings.ExecutionMode = ParallelExecutionMode.Default;
199 if (settings.MergeOptions == null)
201 settings.MergeOptions = ParallelMergeOptions.Default;
204 if (settings.MergeOptions == ParallelMergeOptions.Default)
206 settings.MergeOptions = ParallelMergeOptions.AutoBuffered;
209 Contract.Assert(settings.TaskScheduler != null);
210 Contract.Assert(settings.DegreeOfParallelism.HasValue);
211 Contract.Assert(settings.DegreeOfParallelism.Value >= 1 && settings.DegreeOfParallelism <= Scheduling.MAX_SUPPORTED_DOP);
212 Contract.Assert(settings.ExecutionMode != null);
213 Contract.Assert(settings.MergeOptions != null);
215 Contract.Assert(settings.MergeOptions != ParallelMergeOptions.Default);
220 // Returns the default settings
221 internal static QuerySettings Empty {
222 get { return new QuerySettings(null, null, new CancellationToken(), null, null); }
225 // Cleanup internal state once the entire query is complete.
226 // (this should not be performed after a 'premature-query' completes as the state should live
227 // uninterrupted for the duration of the full query.)
228 public void CleanStateAtQueryEnd()
230 m_cancellationState.MergedCancellationTokenSource.Dispose();