3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // DefaultMergeHelper.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Collections.Generic;
15 using System.Threading;
16 using System.Threading.Tasks;
17 using System.Diagnostics.Contracts;
19 namespace System.Linq.Parallel
22 /// The default merge helper uses a set of straightforward algorithms for output
23 /// merging. Namely, for synchronous merges, the input data is yielded from the
24 /// input data streams in "depth first" left-to-right order. For asynchronous merges,
25 /// on the other hand, we use a biased choice algorithm to favor input channels in
26 /// a "fair" way. No order preservation is carried out by this helper.
28 /// <typeparam name="TInputOutput"></typeparam>
29 /// <typeparam name="TIgnoreKey"></typeparam>
30 internal class DefaultMergeHelper<TInputOutput, TIgnoreKey> : IMergeHelper<TInputOutput>
32 private QueryTaskGroupState m_taskGroupState; // State shared among tasks.
33 private PartitionedStream<TInputOutput, TIgnoreKey> m_partitions; // Source partitions.
34 private AsynchronousChannel<TInputOutput>[] m_asyncChannels; // Destination channels (async).
35 private SynchronousChannel<TInputOutput>[] m_syncChannels; // Destination channels (sync).
36 private IEnumerator<TInputOutput> m_channelEnumerator; // Output enumerator.
37 private TaskScheduler m_taskScheduler; // The task manager to execute the query.
38 private bool m_ignoreOutput; // Whether we're enumerating "for effect".
40 //-----------------------------------------------------------------------------------
41 // Instantiates a new merge helper.
44 // partitions - the source partitions from which to consume data.
45 // ignoreOutput - whether we're enumerating "for effect" or for output.
46 // pipeline - whether to use a pipelined merge.
49 internal DefaultMergeHelper(PartitionedStream<TInputOutput, TIgnoreKey> partitions, bool ignoreOutput, ParallelMergeOptions options,
50 TaskScheduler taskScheduler, CancellationState cancellationState, int queryId)
52 Contract.Assert(partitions != null);
54 m_taskGroupState = new QueryTaskGroupState(cancellationState, queryId);
55 m_partitions = partitions;
56 m_taskScheduler = taskScheduler;
57 m_ignoreOutput = ignoreOutput;
58 IntValueEvent consumerEvent = new IntValueEvent();
60 TraceHelpers.TraceInfo("DefaultMergeHelper::.ctor(..): creating a default merge helper");
62 // If output won't be ignored, we need to manufacture a set of channels for the consumer.
63 // Otherwise, when the merge is executed, we'll just invoke the activities themselves.
66 // Create the asynchronous or synchronous channels, based on whether we're pipelining.
67 if (options != ParallelMergeOptions.FullyBuffered)
69 if (partitions.PartitionCount > 1)
72 MergeExecutor<TInputOutput>.MakeAsynchronousChannels(partitions.PartitionCount, options, consumerEvent, cancellationState.MergedCancellationToken);
73 m_channelEnumerator = new AsynchronousChannelMergeEnumerator<TInputOutput>(m_taskGroupState, m_asyncChannels, consumerEvent);
77 // If there is only one partition, we don't need to create channels. The only producer enumerator
78 // will be used as the result enumerator.
79 m_channelEnumerator = ExceptionAggregator.WrapQueryEnumerator(partitions[0], m_taskGroupState.CancellationState).GetEnumerator();
85 MergeExecutor<TInputOutput>.MakeSynchronousChannels(partitions.PartitionCount);
86 m_channelEnumerator = new SynchronousChannelMergeEnumerator<TInputOutput>(m_taskGroupState, m_syncChannels);
89 Contract.Assert(m_asyncChannels == null || m_asyncChannels.Length == partitions.PartitionCount);
90 Contract.Assert(m_syncChannels == null || m_syncChannels.Length == partitions.PartitionCount);
91 Contract.Assert(m_channelEnumerator != null, "enumerator can't be null if we're not ignoring output");
95 //-----------------------------------------------------------------------------------
96 // Schedules execution of the merge itself.
99 // ordinalIndexState - the state of the ordinal index of the merged partitions
102 void IMergeHelper<TInputOutput>.Execute()
104 if (m_asyncChannels != null)
106 SpoolingTask.SpoolPipeline<TInputOutput, TIgnoreKey>(m_taskGroupState, m_partitions, m_asyncChannels, m_taskScheduler);
108 else if (m_syncChannels != null)
110 SpoolingTask.SpoolStopAndGo<TInputOutput, TIgnoreKey>(m_taskGroupState, m_partitions, m_syncChannels, m_taskScheduler);
112 else if (m_ignoreOutput)
114 SpoolingTask.SpoolForAll<TInputOutput, TIgnoreKey>(m_taskGroupState, m_partitions, m_taskScheduler);
118 // The last case is a pipelining merge when DOP = 1. In this case, the consumer thread itself will compute the results,
119 // so we don't need any tasks to compute the results asynchronously.
120 Contract.Assert(m_partitions.PartitionCount == 1);
124 //-----------------------------------------------------------------------------------
125 // Gets the enumerator from which to enumerate output results.
128 IEnumerator<TInputOutput> IMergeHelper<TInputOutput>.GetEnumerator()
130 Contract.Assert(m_ignoreOutput || m_channelEnumerator != null);
131 return m_channelEnumerator;
134 //-----------------------------------------------------------------------------------
135 // Returns the results as an array.
143 public TInputOutput[] GetResultsAsArray()
145 if (m_syncChannels != null)
147 // Right size an array.
149 for (int i = 0; i < m_syncChannels.Length; i++)
151 totalSize += m_syncChannels[i].Count;
153 TInputOutput[] array = new TInputOutput[totalSize];
155 // And then blit the elements in.
157 for (int i = 0; i < m_syncChannels.Length; i++)
159 m_syncChannels[i].CopyTo(array, current);
160 current += m_syncChannels[i].Count;
166 List<TInputOutput> output = new List<TInputOutput>();
167 using (IEnumerator<TInputOutput> enumerator = ((IMergeHelper<TInputOutput>)this).GetEnumerator())
169 while (enumerator.MoveNext())
171 output.Add(enumerator.Current);
175 return output.ToArray();