3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 // PartitionedStreamMerger.cs
10 // <OWNER>Microsoft</OWNER>
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 using System.Threading.Tasks;
15 using System.Diagnostics.Contracts;
17 namespace System.Linq.Parallel
20 /// Partitioned stream recipient that will merge the results.
22 internal class PartitionedStreamMerger<TOutput> : IPartitionedStreamRecipient<TOutput>
24 private bool m_forEffectMerge;
25 private ParallelMergeOptions m_mergeOptions;
26 private bool m_isOrdered;
27 private MergeExecutor<TOutput> m_mergeExecutor = null;
28 private TaskScheduler m_taskScheduler;
29 private int m_queryId; // ID of the current query execution
31 private CancellationState m_cancellationState;
34 private bool m_received = false;
36 // Returns the merge executor which merges the received partitioned stream.
37 internal MergeExecutor<TOutput> MergeExecutor
42 Contract.Assert(m_received, "Cannot return the merge executor because Receive() has not been called yet.");
44 return m_mergeExecutor;
48 internal PartitionedStreamMerger(bool forEffectMerge, ParallelMergeOptions mergeOptions, TaskScheduler taskScheduler, bool outputOrdered,
49 CancellationState cancellationState, int queryId)
51 m_forEffectMerge = forEffectMerge;
52 m_mergeOptions = mergeOptions;
53 m_isOrdered = outputOrdered;
54 m_taskScheduler = taskScheduler;
55 m_cancellationState = cancellationState;
59 public void Receive<TKey>(PartitionedStream<TOutput, TKey> partitionedStream)
64 m_mergeExecutor = MergeExecutor<TOutput>.Execute<TKey>(
65 partitionedStream, m_forEffectMerge, m_mergeOptions, m_taskScheduler, m_isOrdered, m_cancellationState, m_queryId);
67 TraceHelpers.TraceInfo("[timing]: {0}: finished opening - QueryOperator<>::GetEnumerator", DateTime.Now.Ticks);