9dbc359a725c3ed0fd9217e00841ae9ba8b8564d
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / Scheduling / QueryTaskGroupState.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // QueryTaskGroupState.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Threading.Tasks;
17 using System.Diagnostics.Contracts;
18 #if SILVERLIGHT
19 using System.Core; // for System.Core.SR
20 #endif
21
22 namespace System.Linq.Parallel
23 {
24     /// <summary>
25     /// A collection of tasks used by a single query instance. This type also offers some
26     /// convenient methods for tracing significant ETW events, waiting on tasks, propagating
27     /// exceptions, and performing cancellation activities.
28     /// </summary>
29     internal class QueryTaskGroupState
30     {
31         private Task m_rootTask; // The task under which all query tasks root.
32         private int m_alreadyEnded; // Whether the tasks have been waited on already.
33         private CancellationState m_cancellationState; // The cancellation state.
34         private int m_queryId; // Id of this query execution.
35
36
37         //-----------------------------------------------------------------------------------
38         // Creates a new shared bit of state among tasks.
39         //
40
41         internal QueryTaskGroupState(CancellationState cancellationState, int queryId)
42         {
43             m_cancellationState = cancellationState;
44             m_queryId = queryId;
45         }
46
47         //-----------------------------------------------------------------------------------
48         // Whether this query has ended or not.
49         //
50
51         internal bool IsAlreadyEnded
52         {
53             get { return m_alreadyEnded == 1; }
54         }
55
56         //-----------------------------------------------------------------------------------
57         // Cancellation state, used to tear down tasks cooperatively when necessary.
58         //
59
60         internal CancellationState CancellationState
61         {
62             get { return m_cancellationState; }
63         }
64
65         //-----------------------------------------------------------------------------------
66         // Id of this query execution.
67         //
68
69         internal int QueryId
70         {
71             get { return m_queryId; }
72         }
73
74         //-----------------------------------------------------------------------------------
75         // Marks the beginning of a query's execution.
76         //
77
78         internal void QueryBegin(Task rootTask)
79         {
80             Contract.Assert(rootTask != null, "Expected a non-null task");
81             Contract.Assert(m_rootTask == null, "Cannot begin a query more than once");
82             m_rootTask = rootTask;
83         }
84
85         //-----------------------------------------------------------------------------------
86         // Marks the end of a query's execution, waiting for all tasks to finish and
87         // propagating any relevant exceptions.  Note that the full set of tasks must have
88         // been initialized (with SetTask) before calling this.
89         //
90
91         internal void QueryEnd(bool userInitiatedDispose)
92         {
93             Contract.Assert(m_rootTask != null);
94             //Contract.Assert(Task.Current == null || (Task.Current != m_rootTask && Task.Current.Parent != m_rootTask));
95
96             if (Interlocked.Exchange(ref m_alreadyEnded, 1) == 0)
97             {
98                 // There are four cases:
99                 // Case #1: Wait produced an exception that is not OCE(ct), or an AggregateException which is not full of OCE(ct) ==>  We rethrow.
100                 // Case #2: External cancellation has been requested ==> we'll manually throw OCE(externalToken).
101                 // Case #3a: We are servicing a call to Dispose() (and possibly also external cancellation has been requested).. simply return. See 
102
103
104
105                 // See also "InlinedAggregationOperator" which duplicates some of this logic for the aggregators.
106                 // See also "QueryOpeningEnumerator" which duplicates some of this logic.
107                 // See also "ExceptionAggregator" which duplicates some of this logic.
108
109                 try
110                 {
111                     // Wait for all the tasks to complete
112                     // If any of the tasks ended in the Faulted stated, an AggregateException will be thrown.
113                     m_rootTask.Wait();
114                 }
115                 catch (AggregateException ae)
116                 {
117                     AggregateException flattenedAE = ae.Flatten();
118                     bool allOCEsOnTrackedExternalCancellationToken = true;
119                     for (int i = 0; i < flattenedAE.InnerExceptions.Count; i++)
120                     {
121                         OperationCanceledException oce = flattenedAE.InnerExceptions[i] as OperationCanceledException;
122
123                         // we only let it pass through iff:
124                         // it is not null, not default, and matches the exact token we were given as being the external token
125                         // and the external Token is actually canceled (ie not a spoof OCE(extCT) for a non-canceled extCT)
126                         if (oce == null ||
127                             !oce.CancellationToken.IsCancellationRequested ||
128                             oce.CancellationToken != m_cancellationState.ExternalCancellationToken)
129                         {
130                             allOCEsOnTrackedExternalCancellationToken = false;
131                             break;
132                         }
133                     }
134
135                     // if all the exceptions were OCE(externalToken), then we will propogate only a single OCE(externalToken) below
136                     // otherwise, we flatten the aggregate (because the WaitAll above already aggregated) and rethrow.
137                     if (!allOCEsOnTrackedExternalCancellationToken)
138                         throw flattenedAE;  // Case #1
139                 }
140                 finally
141                 {
142                     m_rootTask.Dispose();
143                 }
144
145                 if (m_cancellationState.MergedCancellationToken.IsCancellationRequested)
146                 {
147                     // cancellation has occured but no user-delegate exceptions were detected 
148
149                     // NOTE: it is important that we see other state variables correctly here, and that
150                     // read-reordering hasn't played havoc. 
151                     // This is OK because 
152                     //   1. all the state writes (eg in the Initiate* methods) are volatile writes (standard .NET MM)
153                     //   2. tokenCancellationRequested is backed by a volatile field, hence the reads below
154                     //   won't get reordered about the read of token.IsCancellationRequested.
155
156                     // If the query has already been disposed, we don't want to throw an OCE (this is a fix for 
157                     if (!m_cancellationState.TopLevelDisposedFlag.Value)
158                     {
159                         CancellationState.ThrowWithStandardMessageIfCanceled(m_cancellationState.ExternalCancellationToken); // Case #2
160                     }
161
162                     //otherwise, given that there were no user-delegate exceptions (they would have been rethrown above),
163                     //the only remaining situation is user-initiated dispose.
164                     Contract.Assert(m_cancellationState.TopLevelDisposedFlag.Value);
165
166                     // If we aren't actively disposing, that means somebody else previously disposed
167                     // of the enumerator. We must throw an ObjectDisposedException.
168                     if (!userInitiatedDispose)
169                     {
170                         throw new ObjectDisposedException("enumerator", SR.GetString(SR.PLINQ_DisposeRequested)); // Case #3
171                     }
172                 }
173
174                 // Case #4. nothing to do.
175             }
176         }
177     }
178 }