Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.Core / System / Linq / Parallel / Merging / DefaultMergeHelper.cs
1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 // 
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // DefaultMergeHelper.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Threading.Tasks;
17 using System.Diagnostics.Contracts;
18
19 namespace System.Linq.Parallel
20 {
21     /// <summary>
22     /// The default merge helper uses a set of straightforward algorithms for output
23     /// merging. Namely, for synchronous merges, the input data is yielded from the
24     /// input data streams in "depth first" left-to-right order. For asynchronous merges,
25     /// on the other hand, we use a biased choice algorithm to favor input channels in
26     /// a "fair" way. No order preservation is carried out by this helper. 
27     /// </summary>
28     /// <typeparam name="TInputOutput"></typeparam>
29     /// <typeparam name="TIgnoreKey"></typeparam>
30     internal class DefaultMergeHelper<TInputOutput, TIgnoreKey> : IMergeHelper<TInputOutput>
31     {
32         private QueryTaskGroupState m_taskGroupState; // State shared among tasks.
33         private PartitionedStream<TInputOutput, TIgnoreKey> m_partitions; // Source partitions.
34         private AsynchronousChannel<TInputOutput>[] m_asyncChannels; // Destination channels (async).
35         private SynchronousChannel<TInputOutput>[] m_syncChannels; // Destination channels (sync).
36         private IEnumerator<TInputOutput> m_channelEnumerator; // Output enumerator.
37         private TaskScheduler m_taskScheduler; // The task manager to execute the query.
38         private bool m_ignoreOutput; // Whether we're enumerating "for effect".
39
40         //-----------------------------------------------------------------------------------
41         // Instantiates a new merge helper.
42         //
43         // Arguments:
44         //     partitions   - the source partitions from which to consume data.
45         //     ignoreOutput - whether we're enumerating "for effect" or for output.
46         //     pipeline     - whether to use a pipelined merge.
47         //
48
49         internal DefaultMergeHelper(PartitionedStream<TInputOutput, TIgnoreKey> partitions, bool ignoreOutput, ParallelMergeOptions options, 
50             TaskScheduler taskScheduler, CancellationState cancellationState, int queryId)
51         {
52             Contract.Assert(partitions != null);
53
54             m_taskGroupState = new QueryTaskGroupState(cancellationState, queryId);
55             m_partitions = partitions;
56             m_taskScheduler = taskScheduler;
57             m_ignoreOutput = ignoreOutput;
58             IntValueEvent consumerEvent = new IntValueEvent();
59
60             TraceHelpers.TraceInfo("DefaultMergeHelper::.ctor(..): creating a default merge helper");
61
62             // If output won't be ignored, we need to manufacture a set of channels for the consumer.
63             // Otherwise, when the merge is executed, we'll just invoke the activities themselves.
64             if (!ignoreOutput)
65             {
66                 // Create the asynchronous or synchronous channels, based on whether we're pipelining.
67                 if (options != ParallelMergeOptions.FullyBuffered)
68                 {
69                     if (partitions.PartitionCount > 1)
70                     {
71                         m_asyncChannels =
72                             MergeExecutor<TInputOutput>.MakeAsynchronousChannels(partitions.PartitionCount, options, consumerEvent, cancellationState.MergedCancellationToken);
73                         m_channelEnumerator = new AsynchronousChannelMergeEnumerator<TInputOutput>(m_taskGroupState, m_asyncChannels, consumerEvent);
74                     }
75                     else
76                     {
77                         // If there is only one partition, we don't need to create channels. The only producer enumerator
78                         // will be used as the result enumerator.
79                         m_channelEnumerator = ExceptionAggregator.WrapQueryEnumerator(partitions[0], m_taskGroupState.CancellationState).GetEnumerator();
80                     }
81                 }
82                 else
83                 {
84                     m_syncChannels =
85                         MergeExecutor<TInputOutput>.MakeSynchronousChannels(partitions.PartitionCount);
86                     m_channelEnumerator = new SynchronousChannelMergeEnumerator<TInputOutput>(m_taskGroupState, m_syncChannels);
87                 }
88
89                 Contract.Assert(m_asyncChannels == null || m_asyncChannels.Length == partitions.PartitionCount);
90                 Contract.Assert(m_syncChannels == null || m_syncChannels.Length == partitions.PartitionCount);
91                 Contract.Assert(m_channelEnumerator != null, "enumerator can't be null if we're not ignoring output");
92             }
93         }
94
95         //-----------------------------------------------------------------------------------
96         // Schedules execution of the merge itself.
97         //
98         // Arguments:
99         //    ordinalIndexState - the state of the ordinal index of the merged partitions
100         //
101
102         void IMergeHelper<TInputOutput>.Execute()
103         {
104             if (m_asyncChannels != null)
105             {
106                 SpoolingTask.SpoolPipeline<TInputOutput, TIgnoreKey>(m_taskGroupState, m_partitions, m_asyncChannels, m_taskScheduler);
107             }
108             else if (m_syncChannels != null)
109             {
110                 SpoolingTask.SpoolStopAndGo<TInputOutput, TIgnoreKey>(m_taskGroupState, m_partitions, m_syncChannels, m_taskScheduler);
111             }
112             else if (m_ignoreOutput)
113             {
114                 SpoolingTask.SpoolForAll<TInputOutput, TIgnoreKey>(m_taskGroupState, m_partitions, m_taskScheduler);
115             }
116             else
117             {
118                 // The last case is a pipelining merge when DOP = 1. In this case, the consumer thread itself will compute the results,
119                 // so we don't need any tasks to compute the results asynchronously.
120                 Contract.Assert(m_partitions.PartitionCount == 1);
121             }
122         }
123
124         //-----------------------------------------------------------------------------------
125         // Gets the enumerator from which to enumerate output results.
126         //
127
128         IEnumerator<TInputOutput> IMergeHelper<TInputOutput>.GetEnumerator()
129         {
130             Contract.Assert(m_ignoreOutput || m_channelEnumerator != null);
131             return m_channelEnumerator;
132         }
133
134         //-----------------------------------------------------------------------------------
135         // Returns the results as an array.
136         //
137         // @
138
139
140
141
142
143         public TInputOutput[] GetResultsAsArray()
144         {
145             if (m_syncChannels != null)
146             {
147                 // Right size an array.
148                 int totalSize = 0;
149                 for (int i = 0; i < m_syncChannels.Length; i++)
150                 {
151                     totalSize += m_syncChannels[i].Count;
152                 }
153                 TInputOutput[] array = new TInputOutput[totalSize];
154
155                 // And then blit the elements in.
156                 int current = 0;
157                 for (int i = 0; i < m_syncChannels.Length; i++)
158                 {
159                     m_syncChannels[i].CopyTo(array, current);
160                     current += m_syncChannels[i].Count;
161                 }
162                 return array;
163             }
164             else
165             {
166                 List<TInputOutput> output = new List<TInputOutput>();
167                 using (IEnumerator<TInputOutput> enumerator = ((IMergeHelper<TInputOutput>)this).GetEnumerator())
168                 {
169                     while (enumerator.MoveNext())
170                     {
171                         output.Add(enumerator.Current);
172                     }
173                 }
174
175                 return output.ToArray();
176             }            
177         }
178     }
179 }