// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== /*============================================================ ** ** Class: Stream ** ** gpaperin ** ** ** Purpose: Abstract base class for all Streams. Provides ** default implementations of asynchronous reads & writes, in ** terms of the synchronous reads & writes (and vice versa). ** ** ===========================================================*/ using System; using System.Threading; #if FEATURE_ASYNC_IO using System.Threading.Tasks; #endif using System.Runtime; using System.Runtime.InteropServices; #if NEW_EXPERIMENTAL_ASYNC_IO using System.Runtime.CompilerServices; #endif using System.Runtime.ExceptionServices; using System.Security; using System.Security.Permissions; using System.Diagnostics.Contracts; using System.Reflection; namespace System.IO { [Serializable] [ComVisible(true)] #if CONTRACTS_FULL [ContractClass(typeof(StreamContract))] #endif #if FEATURE_REMOTING public abstract class Stream : MarshalByRefObject, IDisposable { #else // FEATURE_REMOTING public abstract class Stream : IDisposable { #endif // FEATURE_REMOTING public static readonly Stream Null = new NullStream(); //We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K). // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant // improvement in Copy performance. private const int _DefaultCopyBufferSize = 81920; #if NEW_EXPERIMENTAL_ASYNC_IO // To implement Async IO operations on streams that don't support async IO [NonSerialized] private ReadWriteTask _activeReadWriteTask; [NonSerialized] private SemaphoreSlim _asyncActiveSemaphore; internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized() { // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's // WaitHandle, we don't need to worry about Disposing it. return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1)); } #endif public abstract bool CanRead { [Pure] get; } // If CanSeek is false, Position, Seek, Length, and SetLength should throw. public abstract bool CanSeek { [Pure] get; } [ComVisible(false)] public virtual bool CanTimeout { [Pure] get { return false; } } public abstract bool CanWrite { [Pure] get; } public abstract long Length { get; } public abstract long Position { get; set; } [ComVisible(false)] public virtual int ReadTimeout { get { Contract.Ensures(Contract.Result() >= 0); throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported")); } set { throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported")); } } [ComVisible(false)] public virtual int WriteTimeout { get { Contract.Ensures(Contract.Result() >= 0); throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported")); } set { throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported")); } } #if FEATURE_ASYNC_IO [HostProtection(ExternalThreading = true)] [ComVisible(false)] public Task CopyToAsync(Stream destination) { return CopyToAsync(destination, _DefaultCopyBufferSize); } [HostProtection(ExternalThreading = true)] [ComVisible(false)] public Task CopyToAsync(Stream destination, Int32 bufferSize) { return CopyToAsync(destination, bufferSize, CancellationToken.None); } [HostProtection(ExternalThreading = true)] [ComVisible(false)] public virtual Task CopyToAsync(Stream destination, Int32 bufferSize, CancellationToken cancellationToken) { if (destination == null) throw new ArgumentNullException("destination"); if (bufferSize <= 0) throw new ArgumentOutOfRangeException("bufferSize", Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum")); if (!CanRead && !CanWrite) throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed")); if (!destination.CanRead && !destination.CanWrite) throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed")); if (!CanRead) throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream")); if (!destination.CanWrite) throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream")); Contract.EndContractBlock(); return CopyToAsyncInternal(destination, bufferSize, cancellationToken); } private async Task CopyToAsyncInternal(Stream destination, Int32 bufferSize, CancellationToken cancellationToken) { Contract.Requires(destination != null); Contract.Requires(bufferSize > 0); Contract.Requires(CanRead); Contract.Requires(destination.CanWrite); byte[] buffer = new byte[bufferSize]; int bytesRead; while ((bytesRead = await ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0) { await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false); } } #endif // FEATURE_ASYNC_IO // Reads the bytes from the current stream and writes the bytes to // the destination stream until all bytes are read, starting at // the current position. public void CopyTo(Stream destination) { if (destination == null) throw new ArgumentNullException("destination"); if (!CanRead && !CanWrite) throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed")); if (!destination.CanRead && !destination.CanWrite) throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed")); if (!CanRead) throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream")); if (!destination.CanWrite) throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream")); Contract.EndContractBlock(); InternalCopyTo(destination, _DefaultCopyBufferSize); } public void CopyTo(Stream destination, int bufferSize) { if (destination == null) throw new ArgumentNullException("destination"); if (bufferSize <= 0) throw new ArgumentOutOfRangeException("bufferSize", Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum")); if (!CanRead && !CanWrite) throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed")); if (!destination.CanRead && !destination.CanWrite) throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed")); if (!CanRead) throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream")); if (!destination.CanWrite) throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream")); Contract.EndContractBlock(); InternalCopyTo(destination, bufferSize); } private void InternalCopyTo(Stream destination, int bufferSize) { Contract.Requires(destination != null); Contract.Requires(CanRead); Contract.Requires(destination.CanWrite); Contract.Requires(bufferSize > 0); byte[] buffer = new byte[bufferSize]; int read; while ((read = Read(buffer, 0, buffer.Length)) != 0) destination.Write(buffer, 0, read); } // Stream used to require that all cleanup logic went into Close(), // which was thought up before we invented IDisposable. However, we // need to follow the IDisposable pattern so that users can write // sensible subclasses without needing to inspect all their base // classes, and without worrying about version brittleness, from a // base class switching to the Dispose pattern. We're moving // Stream to the Dispose(bool) pattern - that's where all subclasses // should put their cleanup starting in V2. public virtual void Close() { /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully. Contract.Ensures(CanRead == false); Contract.Ensures(CanWrite == false); Contract.Ensures(CanSeek == false); */ Dispose(true); GC.SuppressFinalize(this); } public void Dispose() { /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully. Contract.Ensures(CanRead == false); Contract.Ensures(CanWrite == false); Contract.Ensures(CanSeek == false); */ Close(); } protected virtual void Dispose(bool disposing) { // Note: Never change this to call other virtual methods on Stream // like Write, since the state on subclasses has already been // torn down. This is the last code to run on cleanup for a stream. } public abstract void Flush(); #if FEATURE_ASYNC_IO [HostProtection(ExternalThreading=true)] [ComVisible(false)] public Task FlushAsync() { return FlushAsync(CancellationToken.None); } [HostProtection(ExternalThreading=true)] [ComVisible(false)] public virtual Task FlushAsync(CancellationToken cancellationToken) { return Task.Factory.StartNew(state => ((Stream)state).Flush(), this, cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); } #endif // FEATURE_ASYNC_IO [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")] protected virtual WaitHandle CreateWaitHandle() { Contract.Ensures(Contract.Result() != null); return new ManualResetEvent(false); } [HostProtection(ExternalThreading=true)] public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { Contract.Ensures(Contract.Result() != null); return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false); } [HostProtection(ExternalThreading = true)] internal IAsyncResult BeginReadInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously) { Contract.Ensures(Contract.Result() != null); if (!CanRead) __Error.ReadNotSupported(); #if !NEW_EXPERIMENTAL_ASYNC_IO return BlockingBeginRead(buffer, offset, count, callback, state); #else // Mango did not do Async IO. if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8) { return BlockingBeginRead(buffer, offset, count, callback, state); } // To avoid a race with a stream's position pointer & generating ---- // conditions with internal buffer indexes in our own streams that // don't natively support async IO operations when there are multiple // async requests outstanding, we will block the application's main // thread if it does a second IO request until the first one completes. var semaphore = EnsureAsyncActiveSemaphoreInitialized(); Task semaphoreTask = null; if (serializeAsynchronously) { semaphoreTask = semaphore.WaitAsync(); } else { semaphore.Wait(); } // Create the task to asynchronously do a Read. This task serves both // as the asynchronous work item and as the IAsyncResult returned to the user. var asyncResult = new ReadWriteTask(true /*isRead*/, delegate { // The ReadWriteTask stores all of the parameters to pass to Read. // As we're currently inside of it, we can get the current task // and grab the parameters from it. var thisTask = Task.InternalCurrent as ReadWriteTask; Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask"); // Do the Read and return the number of bytes read var bytesRead = thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count); thisTask.ClearBeginState(); // just to help alleviate some memory pressure return bytesRead; }, state, this, buffer, offset, count, callback); // Schedule it if (semaphoreTask != null) RunReadWriteTaskWhenReady(semaphoreTask, asyncResult); else RunReadWriteTask(asyncResult); return asyncResult; // return it #endif } public virtual int EndRead(IAsyncResult asyncResult) { if (asyncResult == null) throw new ArgumentNullException("asyncResult"); Contract.Ensures(Contract.Result() >= 0); Contract.EndContractBlock(); #if !NEW_EXPERIMENTAL_ASYNC_IO return BlockingEndRead(asyncResult); #else // Mango did not do async IO. if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8) { return BlockingEndRead(asyncResult); } var readTask = _activeReadWriteTask; if (readTask == null) { throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple")); } else if (readTask != asyncResult) { throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple")); } else if (!readTask._isRead) { throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple")); } try { return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception } finally { _activeReadWriteTask = null; Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here."); _asyncActiveSemaphore.Release(); } #endif } #if FEATURE_ASYNC_IO [HostProtection(ExternalThreading = true)] [ComVisible(false)] public Task ReadAsync(Byte[] buffer, int offset, int count) { return ReadAsync(buffer, offset, count, CancellationToken.None); } [HostProtection(ExternalThreading = true)] [ComVisible(false)] public virtual Task ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken) { // If cancellation was requested, bail early with an already completed task. // Otherwise, return a task that represents the Begin/End methods. return cancellationToken.IsCancellationRequested ? Task.FromCancellation(cancellationToken) : BeginEndReadAsync(buffer, offset, count); } private Task BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count) { return TaskFactory.FromAsyncTrim( this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count }, (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler } private struct ReadWriteParameters // struct for arguments to Read and Write calls { internal byte[] Buffer; internal int Offset; internal int Count; } #endif //FEATURE_ASYNC_IO [HostProtection(ExternalThreading=true)] public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { Contract.Ensures(Contract.Result() != null); return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false); } [HostProtection(ExternalThreading = true)] internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously) { Contract.Ensures(Contract.Result() != null); if (!CanWrite) __Error.WriteNotSupported(); #if !NEW_EXPERIMENTAL_ASYNC_IO return BlockingBeginWrite(buffer, offset, count, callback, state); #else // Mango did not do Async IO. if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8) { return BlockingBeginWrite(buffer, offset, count, callback, state); } // To avoid a race with a stream's position pointer & generating ---- // conditions with internal buffer indexes in our own streams that // don't natively support async IO operations when there are multiple // async requests outstanding, we will block the application's main // thread if it does a second IO request until the first one completes. var semaphore = EnsureAsyncActiveSemaphoreInitialized(); Task semaphoreTask = null; if (serializeAsynchronously) { semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block } else { semaphore.Wait(); // synchronously wait here } // Create the task to asynchronously do a Write. This task serves both // as the asynchronous work item and as the IAsyncResult returned to the user. var asyncResult = new ReadWriteTask(false /*isRead*/, delegate { // The ReadWriteTask stores all of the parameters to pass to Write. // As we're currently inside of it, we can get the current task // and grab the parameters from it. var thisTask = Task.InternalCurrent as ReadWriteTask; Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask"); // Do the Write thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count); thisTask.ClearBeginState(); // just to help alleviate some memory pressure return 0; // not used, but signature requires a value be returned }, state, this, buffer, offset, count, callback); // Schedule it if (semaphoreTask != null) RunReadWriteTaskWhenReady(semaphoreTask, asyncResult); else RunReadWriteTask(asyncResult); return asyncResult; // return it #endif } #if NEW_EXPERIMENTAL_ASYNC_IO private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask) { Contract.Assert(readWriteTask != null); // Should be Contract.Requires, but CCRewrite is doing a poor job with // preconditions in async methods that await. Mike & Manuel are aware. (10/6/2011, bug 290222) Contract.Assert(asyncWaiter != null); // Ditto // If the wait has already complete, run the task. if (asyncWaiter.IsCompleted) { Contract.Assert(asyncWaiter.IsRanToCompletion, "The semaphore wait should always complete successfully."); RunReadWriteTask(readWriteTask); } else // Otherwise, wait for our turn, and then run the task. { asyncWaiter.ContinueWith((t, state) => { Contract.Assert(t.IsRanToCompletion, "The semaphore wait should always complete successfully."); var tuple = (Tuple)state; tuple.Item1.RunReadWriteTask(tuple.Item2); // RunReadWriteTask(readWriteTask); }, Tuple.Create(this, readWriteTask), default(CancellationToken), TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } } private void RunReadWriteTask(ReadWriteTask readWriteTask) { Contract.Requires(readWriteTask != null); Contract.Assert(_activeReadWriteTask == null, "Expected no other readers or writers"); // Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race. // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding // two interlocked operations. However, if ReadWriteTask is ever changed to use // a cancellation token, this should be changed to use Start. _activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one readWriteTask.m_taskScheduler = TaskScheduler.Default; readWriteTask.ScheduleAndStart(needsProtection: false); } #endif public virtual void EndWrite(IAsyncResult asyncResult) { if (asyncResult==null) throw new ArgumentNullException("asyncResult"); Contract.EndContractBlock(); #if !NEW_EXPERIMENTAL_ASYNC_IO BlockingEndWrite(asyncResult); #else // Mango did not do Async IO. if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8) { BlockingEndWrite(asyncResult); return; } var writeTask = _activeReadWriteTask; if (writeTask == null) { throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple")); } else if (writeTask != asyncResult) { throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple")); } else if (writeTask._isRead) { throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple")); } try { writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions Contract.Assert(writeTask.Status == TaskStatus.RanToCompletion); } finally { _activeReadWriteTask = null; Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here."); _asyncActiveSemaphore.Release(); } #endif } #if NEW_EXPERIMENTAL_ASYNC_IO // Task used by BeginRead / BeginWrite to do Read / Write asynchronously. // A single instance of this task serves four purposes: // 1. The work item scheduled to run the Read / Write operation // 2. The state holding the arguments to be passed to Read / Write // 3. The IAsyncResult returned from BeginRead / BeginWrite // 4. The completion action that runs to invoke the user-provided callback. // This last item is a bit tricky. Before the AsyncCallback is invoked, the // IAsyncResult must have completed, so we can't just invoke the handler // from within the task, since it is the IAsyncResult, and thus it's not // yet completed. Instead, we use AddCompletionAction to install this // task as its own completion handler. That saves the need to allocate // a separate completion handler, it guarantees that the task will // have completed by the time the handler is invoked, and it allows // the handler to be invoked synchronously upon the completion of the // task. This all enables BeginRead / BeginWrite to be implemented // with a single allocation. private sealed class ReadWriteTask : Task, ITaskCompletionAction { internal readonly bool _isRead; internal Stream _stream; internal byte [] _buffer; internal int _offset; internal int _count; private AsyncCallback _callback; private ExecutionContext _context; internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC { _stream = null; _buffer = null; } [SecuritySafeCritical] // necessary for EC.Capture [MethodImpl(MethodImplOptions.NoInlining)] public ReadWriteTask( bool isRead, Func function, object state, Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback) : base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach) { Contract.Requires(function != null); Contract.Requires(stream != null); Contract.Requires(buffer != null); Contract.EndContractBlock(); StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; // Store the arguments _isRead = isRead; _stream = stream; _buffer = buffer; _offset = offset; _count = count; // If a callback was provided, we need to: // - Store the user-provided handler // - Capture an ExecutionContext under which to invoke the handler // - Add this task as its own completion handler so that the Invoke method // will run the callback when this task completes. if (callback != null) { _callback = callback; _context = ExecutionContext.Capture(ref stackMark, ExecutionContext.CaptureOptions.OptimizeDefaultCase | ExecutionContext.CaptureOptions.IgnoreSyncCtx); base.AddCompletionAction(this); } } [SecurityCritical] // necessary for CoreCLR private static void InvokeAsyncCallback(object completedTask) { var rwc = (ReadWriteTask)completedTask; var callback = rwc._callback; rwc._callback = null; callback(rwc); } [SecurityCritical] // necessary for CoreCLR private static ContextCallback s_invokeAsyncCallback; [SecuritySafeCritical] // necessary for ExecutionContext.Run void ITaskCompletionAction.Invoke(Task completingTask) { // Get the ExecutionContext. If there is none, just run the callback // directly, passing in the completed task as the IAsyncResult. // If there is one, process it with ExecutionContext.Run. var context = _context; if (context == null) { var callback = _callback; _callback = null; callback(completingTask); } else { _context = null; var invokeAsyncCallback = s_invokeAsyncCallback; if (invokeAsyncCallback == null) s_invokeAsyncCallback = invokeAsyncCallback = InvokeAsyncCallback; // benign ---- using(context) ExecutionContext.Run(context, invokeAsyncCallback, this, true); } } } #endif #if FEATURE_ASYNC_IO [HostProtection(ExternalThreading = true)] [ComVisible(false)] public Task WriteAsync(Byte[] buffer, int offset, int count) { return WriteAsync(buffer, offset, count, CancellationToken.None); } [HostProtection(ExternalThreading = true)] [ComVisible(false)] public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken) { // If cancellation was requested, bail early with an already completed task. // Otherwise, return a task that represents the Begin/End methods. return cancellationToken.IsCancellationRequested ? Task.FromCancellation(cancellationToken) : BeginEndWriteAsync(buffer, offset, count); } private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count) { return TaskFactory.FromAsyncTrim( this, new ReadWriteParameters { Buffer=buffer, Offset=offset, Count=count }, (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler (stream, asyncResult) => // cached by compiler { stream.EndWrite(asyncResult); return default(VoidTaskResult); }); } #endif // FEATURE_ASYNC_IO public abstract long Seek(long offset, SeekOrigin origin); public abstract void SetLength(long value); public abstract int Read([In, Out] byte[] buffer, int offset, int count); // Reads one byte from the stream by calling Read(byte[], int, int). // Will return an unsigned byte cast to an int or -1 on end of stream. // This implementation does not perform well because it allocates a new // byte[] each time you call it, and should be overridden by any // subclass that maintains an internal buffer. Then, it can help perf // significantly for people who are reading one byte at a time. public virtual int ReadByte() { Contract.Ensures(Contract.Result() >= -1); Contract.Ensures(Contract.Result() < 256); byte[] oneByteArray = new byte[1]; int r = Read(oneByteArray, 0, 1); if (r==0) return -1; return oneByteArray[0]; } public abstract void Write(byte[] buffer, int offset, int count); // Writes one byte from the stream by calling Write(byte[], int, int). // This implementation does not perform well because it allocates a new // byte[] each time you call it, and should be overridden by any // subclass that maintains an internal buffer. Then, it can help perf // significantly for people who are writing one byte at a time. public virtual void WriteByte(byte value) { byte[] oneByteArray = new byte[1]; oneByteArray[0] = value; Write(oneByteArray, 0, 1); } [HostProtection(Synchronization=true)] public static Stream Synchronized(Stream stream) { if (stream==null) throw new ArgumentNullException("stream"); Contract.Ensures(Contract.Result() != null); Contract.EndContractBlock(); if (stream is SyncStream) return stream; return new SyncStream(stream); } #if !FEATURE_PAL || MONO // This method shouldn't have been exposed in Dev10 (we revised object invariants after locking down). [Obsolete("Do not call or override this method.")] protected virtual void ObjectInvariant() { } #endif internal IAsyncResult BlockingBeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { Contract.Ensures(Contract.Result() != null); // To avoid a race with a stream's position pointer & generating ---- // conditions with internal buffer indexes in our own streams that // don't natively support async IO operations when there are multiple // async requests outstanding, we will block the application's main // thread and do the IO synchronously. // This can't perform well - use a different approach. SynchronousAsyncResult asyncResult; try { int numRead = Read(buffer, offset, count); asyncResult = new SynchronousAsyncResult(numRead, state); } catch (IOException ex) { asyncResult = new SynchronousAsyncResult(ex, state, isWrite: false); } if (callback != null) { callback(asyncResult); } return asyncResult; } internal static int BlockingEndRead(IAsyncResult asyncResult) { Contract.Ensures(Contract.Result() >= 0); return SynchronousAsyncResult.EndRead(asyncResult); } internal IAsyncResult BlockingBeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { Contract.Ensures(Contract.Result() != null); // To avoid a race with a stream's position pointer & generating ---- // conditions with internal buffer indexes in our own streams that // don't natively support async IO operations when there are multiple // async requests outstanding, we will block the application's main // thread and do the IO synchronously. // This can't perform well - use a different approach. SynchronousAsyncResult asyncResult; try { Write(buffer, offset, count); asyncResult = new SynchronousAsyncResult(state); } catch (IOException ex) { asyncResult = new SynchronousAsyncResult(ex, state, isWrite: true); } if (callback != null) { callback(asyncResult); } return asyncResult; } internal static void BlockingEndWrite(IAsyncResult asyncResult) { SynchronousAsyncResult.EndWrite(asyncResult); } [Serializable] private sealed class NullStream : Stream { internal NullStream() {} public override bool CanRead { [Pure] get { return true; } } public override bool CanWrite { [Pure] get { return true; } } public override bool CanSeek { [Pure] get { return true; } } public override long Length { get { return 0; } } public override long Position { get { return 0; } set {} } protected override void Dispose(bool disposing) { // Do nothing - we don't want NullStream singleton (static) to be closable } public override void Flush() { } #if FEATURE_ASYNC_IO [ComVisible(false)] public override Task FlushAsync(CancellationToken cancellationToken) { return cancellationToken.IsCancellationRequested ? Task.FromCancellation(cancellationToken) : Task.CompletedTask; } #endif // FEATURE_ASYNC_IO [HostProtection(ExternalThreading = true)] public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { if (!CanRead) __Error.ReadNotSupported(); return BlockingBeginRead(buffer, offset, count, callback, state); } public override int EndRead(IAsyncResult asyncResult) { if (asyncResult == null) throw new ArgumentNullException("asyncResult"); Contract.EndContractBlock(); return BlockingEndRead(asyncResult); } [HostProtection(ExternalThreading = true)] public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { if (!CanWrite) __Error.WriteNotSupported(); return BlockingBeginWrite(buffer, offset, count, callback, state); } public override void EndWrite(IAsyncResult asyncResult) { if (asyncResult == null) throw new ArgumentNullException("asyncResult"); Contract.EndContractBlock(); BlockingEndWrite(asyncResult); } public override int Read([In, Out] byte[] buffer, int offset, int count) { return 0; } #if FEATURE_ASYNC_IO [ComVisible(false)] public override Task ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken) { var nullReadTask = s_nullReadTask; if (nullReadTask == null) s_nullReadTask = nullReadTask = new Task(false, 0, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, CancellationToken.None); // benign ---- return nullReadTask; } private static Task s_nullReadTask; #endif //FEATURE_ASYNC_IO public override int ReadByte() { return -1; } public override void Write(byte[] buffer, int offset, int count) { } #if FEATURE_ASYNC_IO [ComVisible(false)] public override Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken) { return cancellationToken.IsCancellationRequested ? Task.FromCancellation(cancellationToken) : Task.CompletedTask; } #endif // FEATURE_ASYNC_IO public override void WriteByte(byte value) { } public override long Seek(long offset, SeekOrigin origin) { return 0; } public override void SetLength(long length) { } } /// Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class. internal sealed class SynchronousAsyncResult : IAsyncResult { private readonly Object _stateObject; private readonly bool _isWrite; private ManualResetEvent _waitHandle; private ExceptionDispatchInfo _exceptionInfo; private bool _endXxxCalled; private Int32 _bytesRead; internal SynchronousAsyncResult(Int32 bytesRead, Object asyncStateObject) { _bytesRead = bytesRead; _stateObject = asyncStateObject; //_isWrite = false; } internal SynchronousAsyncResult(Object asyncStateObject) { _stateObject = asyncStateObject; _isWrite = true; } internal SynchronousAsyncResult(Exception ex, Object asyncStateObject, bool isWrite) { _exceptionInfo = ExceptionDispatchInfo.Capture(ex); _stateObject = asyncStateObject; _isWrite = isWrite; } public bool IsCompleted { // We never hand out objects of this type to the user before the synchronous IO completed: get { return true; } } public WaitHandle AsyncWaitHandle { get { return LazyInitializer.EnsureInitialized(ref _waitHandle, () => new ManualResetEvent(true)); } } public Object AsyncState { get { return _stateObject; } } public bool CompletedSynchronously { get { return true; } } internal void ThrowIfError() { if (_exceptionInfo != null) _exceptionInfo.Throw(); } internal static Int32 EndRead(IAsyncResult asyncResult) { SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult; if (ar == null || ar._isWrite) __Error.WrongAsyncResult(); if (ar._endXxxCalled) __Error.EndReadCalledTwice(); ar._endXxxCalled = true; ar.ThrowIfError(); return ar._bytesRead; } internal static void EndWrite(IAsyncResult asyncResult) { SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult; if (ar == null || !ar._isWrite) __Error.WrongAsyncResult(); if (ar._endXxxCalled) __Error.EndWriteCalledTwice(); ar._endXxxCalled = true; ar.ThrowIfError(); } } // class SynchronousAsyncResult // SyncStream is a wrapper around a stream that takes // a lock for every operation making it thread safe. [Serializable] internal sealed class SyncStream : Stream, IDisposable { private Stream _stream; [NonSerialized] private bool? _overridesBeginRead; [NonSerialized] private bool? _overridesBeginWrite; internal SyncStream(Stream stream) { if (stream == null) throw new ArgumentNullException("stream"); Contract.EndContractBlock(); _stream = stream; } public override bool CanRead { [Pure] get { return _stream.CanRead; } } public override bool CanWrite { [Pure] get { return _stream.CanWrite; } } public override bool CanSeek { [Pure] get { return _stream.CanSeek; } } [ComVisible(false)] public override bool CanTimeout { [Pure] get { return _stream.CanTimeout; } } public override long Length { get { lock(_stream) { return _stream.Length; } } } public override long Position { get { lock(_stream) { return _stream.Position; } } set { lock(_stream) { _stream.Position = value; } } } [ComVisible(false)] public override int ReadTimeout { get { return _stream.ReadTimeout; } set { _stream.ReadTimeout = value; } } [ComVisible(false)] public override int WriteTimeout { get { return _stream.WriteTimeout; } set { _stream.WriteTimeout = value; } } // In the off chance that some wrapped stream has different // semantics for Close vs. Dispose, let's preserve that. public override void Close() { lock(_stream) { try { _stream.Close(); } finally { base.Dispose(true); } } } protected override void Dispose(bool disposing) { lock(_stream) { try { // Explicitly pick up a potentially methodimpl'ed Dispose if (disposing) ((IDisposable)_stream).Dispose(); } finally { base.Dispose(disposing); } } } public override void Flush() { lock(_stream) _stream.Flush(); } public override int Read([In, Out]byte[] bytes, int offset, int count) { lock(_stream) return _stream.Read(bytes, offset, count); } public override int ReadByte() { lock(_stream) return _stream.ReadByte(); } private static bool OverridesBeginMethod(Stream stream, string methodName) { Contract.Requires(stream != null, "Expected a non-null stream."); Contract.Requires(methodName == "BeginRead" || methodName == "BeginWrite", "Expected BeginRead or BeginWrite as the method name to check."); // Get all of the methods on the underlying stream var methods = stream.GetType().GetMethods(BindingFlags.Public | BindingFlags.Instance); // If any of the methods have the desired name and are defined on the base Stream // Type, then the method was not overridden. If none of them were defined on the // base Stream, then it must have been overridden. foreach (var method in methods) { if (method.DeclaringType == typeof(Stream) && method.Name == methodName) { return false; } } return true; } [HostProtection(ExternalThreading=true)] public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { // Lazily-initialize whether the wrapped stream overrides BeginRead if (_overridesBeginRead == null) { _overridesBeginRead = OverridesBeginMethod(_stream, "BeginRead"); } lock (_stream) { // If the Stream does have its own BeginRead implementation, then we must use that override. // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic // which ensures only one asynchronous operation does so with an asynchronous wait rather // than a synchronous wait. A synchronous wait will result in a deadlock condition, because // the EndXx method for the outstanding async operation won't be able to acquire the lock on // _stream due to this call blocked while holding the lock. return _overridesBeginRead.Value ? _stream.BeginRead(buffer, offset, count, callback, state) : _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true); } } public override int EndRead(IAsyncResult asyncResult) { if (asyncResult == null) throw new ArgumentNullException("asyncResult"); Contract.Ensures(Contract.Result() >= 0); Contract.EndContractBlock(); lock(_stream) return _stream.EndRead(asyncResult); } public override long Seek(long offset, SeekOrigin origin) { lock(_stream) return _stream.Seek(offset, origin); } public override void SetLength(long length) { lock(_stream) _stream.SetLength(length); } public override void Write(byte[] bytes, int offset, int count) { lock(_stream) _stream.Write(bytes, offset, count); } public override void WriteByte(byte b) { lock(_stream) _stream.WriteByte(b); } [HostProtection(ExternalThreading=true)] public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) { // Lazily-initialize whether the wrapped stream overrides BeginWrite if (_overridesBeginWrite == null) { _overridesBeginWrite = OverridesBeginMethod(_stream, "BeginWrite"); } lock (_stream) { // If the Stream does have its own BeginWrite implementation, then we must use that override. // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic // which ensures only one asynchronous operation does so with an asynchronous wait rather // than a synchronous wait. A synchronous wait will result in a deadlock condition, because // the EndXx method for the outstanding async operation won't be able to acquire the lock on // _stream due to this call blocked while holding the lock. return _overridesBeginWrite.Value ? _stream.BeginWrite(buffer, offset, count, callback, state) : _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true); } } public override void EndWrite(IAsyncResult asyncResult) { if (asyncResult == null) throw new ArgumentNullException("asyncResult"); Contract.EndContractBlock(); lock(_stream) _stream.EndWrite(asyncResult); } } } #if CONTRACTS_FULL [ContractClassFor(typeof(Stream))] internal abstract class StreamContract : Stream { public override long Seek(long offset, SeekOrigin origin) { Contract.Ensures(Contract.Result() >= 0); throw new NotImplementedException(); } public override void SetLength(long value) { throw new NotImplementedException(); } public override int Read(byte[] buffer, int offset, int count) { Contract.Ensures(Contract.Result() >= 0); Contract.Ensures(Contract.Result() <= count); throw new NotImplementedException(); } public override void Write(byte[] buffer, int offset, int count) { throw new NotImplementedException(); } public override long Position { get { Contract.Ensures(Contract.Result() >= 0); throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public override void Flush() { throw new NotImplementedException(); } public override bool CanRead { get { throw new NotImplementedException(); } } public override bool CanWrite { get { throw new NotImplementedException(); } } public override bool CanSeek { get { throw new NotImplementedException(); } } public override long Length { get { Contract.Ensures(Contract.Result() >= 0); throw new NotImplementedException(); } } } #endif // CONTRACTS_FULL }