3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 /*============================================================
10 ** <OWNER>gpaperin</OWNER>
13 ** Purpose: Abstract base class for all Streams. Provides
14 ** default implementations of asynchronous reads & writes, in
15 ** terms of the synchronous reads & writes (and vice versa).
18 ===========================================================*/
20 using System.Threading;
22 using System.Threading.Tasks;
26 using System.Runtime.InteropServices;
27 #if NEW_EXPERIMENTAL_ASYNC_IO
28 using System.Runtime.CompilerServices;
30 using System.Runtime.ExceptionServices;
31 using System.Security;
32 using System.Security.Permissions;
33 using System.Diagnostics.Contracts;
34 using System.Reflection;
40 [ContractClass(typeof(StreamContract))]
43 public abstract class Stream : MarshalByRefObject, IDisposable {
44 #else // FEATURE_REMOTING
45 public abstract class Stream : IDisposable {
46 #endif // FEATURE_REMOTING
48 public static readonly Stream Null = new NullStream();
50 //We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
51 // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
52 // improvement in Copy performance.
53 private const int _DefaultCopyBufferSize = 81920;
55 #if NEW_EXPERIMENTAL_ASYNC_IO
56 // To implement Async IO operations on streams that don't support async IO
59 private ReadWriteTask _activeReadWriteTask;
61 private SemaphoreSlim _asyncActiveSemaphore;
63 internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
65 // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
66 // WaitHandle, we don't need to worry about Disposing it.
67 return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1));
71 public abstract bool CanRead {
76 // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
77 public abstract bool CanSeek {
83 public virtual bool CanTimeout {
90 public abstract bool CanWrite {
95 public abstract long Length {
99 public abstract long Position {
105 public virtual int ReadTimeout {
107 Contract.Ensures(Contract.Result<int>() >= 0);
108 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
111 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
116 public virtual int WriteTimeout {
118 Contract.Ensures(Contract.Result<int>() >= 0);
119 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
122 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
127 [HostProtection(ExternalThreading = true)]
129 public Task CopyToAsync(Stream destination)
131 return CopyToAsync(destination, _DefaultCopyBufferSize);
134 [HostProtection(ExternalThreading = true)]
136 public Task CopyToAsync(Stream destination, Int32 bufferSize)
138 return CopyToAsync(destination, bufferSize, CancellationToken.None);
141 [HostProtection(ExternalThreading = true)]
143 public virtual Task CopyToAsync(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
145 if (destination == null)
146 throw new ArgumentNullException("destination");
148 throw new ArgumentOutOfRangeException("bufferSize", Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
149 if (!CanRead && !CanWrite)
150 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
151 if (!destination.CanRead && !destination.CanWrite)
152 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
154 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
155 if (!destination.CanWrite)
156 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
157 Contract.EndContractBlock();
159 return CopyToAsyncInternal(destination, bufferSize, cancellationToken);
162 private async Task CopyToAsyncInternal(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
164 Contract.Requires(destination != null);
165 Contract.Requires(bufferSize > 0);
166 Contract.Requires(CanRead);
167 Contract.Requires(destination.CanWrite);
169 byte[] buffer = new byte[bufferSize];
171 while ((bytesRead = await ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
173 await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
176 #endif // FEATURE_ASYNC_IO
178 // Reads the bytes from the current stream and writes the bytes to
179 // the destination stream until all bytes are read, starting at
180 // the current position.
181 public void CopyTo(Stream destination)
183 if (destination == null)
184 throw new ArgumentNullException("destination");
185 if (!CanRead && !CanWrite)
186 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
187 if (!destination.CanRead && !destination.CanWrite)
188 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
190 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
191 if (!destination.CanWrite)
192 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
193 Contract.EndContractBlock();
195 InternalCopyTo(destination, _DefaultCopyBufferSize);
198 public void CopyTo(Stream destination, int bufferSize)
200 if (destination == null)
201 throw new ArgumentNullException("destination");
203 throw new ArgumentOutOfRangeException("bufferSize",
204 Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
205 if (!CanRead && !CanWrite)
206 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
207 if (!destination.CanRead && !destination.CanWrite)
208 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
210 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
211 if (!destination.CanWrite)
212 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
213 Contract.EndContractBlock();
215 InternalCopyTo(destination, bufferSize);
218 private void InternalCopyTo(Stream destination, int bufferSize)
220 Contract.Requires(destination != null);
221 Contract.Requires(CanRead);
222 Contract.Requires(destination.CanWrite);
223 Contract.Requires(bufferSize > 0);
225 byte[] buffer = new byte[bufferSize];
227 while ((read = Read(buffer, 0, buffer.Length)) != 0)
228 destination.Write(buffer, 0, read);
232 // Stream used to require that all cleanup logic went into Close(),
233 // which was thought up before we invented IDisposable. However, we
234 // need to follow the IDisposable pattern so that users can write
235 // sensible subclasses without needing to inspect all their base
236 // classes, and without worrying about version brittleness, from a
237 // base class switching to the Dispose pattern. We're moving
238 // Stream to the Dispose(bool) pattern - that's where all subclasses
239 // should put their cleanup starting in V2.
240 public virtual void Close()
242 /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
243 Contract.Ensures(CanRead == false);
244 Contract.Ensures(CanWrite == false);
245 Contract.Ensures(CanSeek == false);
249 GC.SuppressFinalize(this);
252 public void Dispose()
254 /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
255 Contract.Ensures(CanRead == false);
256 Contract.Ensures(CanWrite == false);
257 Contract.Ensures(CanSeek == false);
264 protected virtual void Dispose(bool disposing)
266 // Note: Never change this to call other virtual methods on Stream
267 // like Write, since the state on subclasses has already been
268 // torn down. This is the last code to run on cleanup for a stream.
271 public abstract void Flush();
274 [HostProtection(ExternalThreading=true)]
276 public Task FlushAsync()
278 return FlushAsync(CancellationToken.None);
281 [HostProtection(ExternalThreading=true)]
283 public virtual Task FlushAsync(CancellationToken cancellationToken)
285 return Task.Factory.StartNew(state => ((Stream)state).Flush(), this,
286 cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
288 #endif // FEATURE_ASYNC_IO
290 [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
291 protected virtual WaitHandle CreateWaitHandle()
293 Contract.Ensures(Contract.Result<WaitHandle>() != null);
294 return new ManualResetEvent(false);
297 [HostProtection(ExternalThreading=true)]
298 public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
300 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
301 return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
304 [HostProtection(ExternalThreading = true)]
305 internal IAsyncResult BeginReadInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
307 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
308 if (!CanRead) __Error.ReadNotSupported();
310 #if !NEW_EXPERIMENTAL_ASYNC_IO
311 return BlockingBeginRead(buffer, offset, count, callback, state);
314 // Mango did not do Async IO.
315 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
317 return BlockingBeginRead(buffer, offset, count, callback, state);
320 // To avoid a race with a stream's position pointer & generating ----
321 // conditions with internal buffer indexes in our own streams that
322 // don't natively support async IO operations when there are multiple
323 // async requests outstanding, we will block the application's main
324 // thread if it does a second IO request until the first one completes.
325 var semaphore = EnsureAsyncActiveSemaphoreInitialized();
326 Task semaphoreTask = null;
327 if (serializeAsynchronously)
329 semaphoreTask = semaphore.WaitAsync();
336 // Create the task to asynchronously do a Read. This task serves both
337 // as the asynchronous work item and as the IAsyncResult returned to the user.
338 var asyncResult = new ReadWriteTask(true /*isRead*/, delegate
340 // The ReadWriteTask stores all of the parameters to pass to Read.
341 // As we're currently inside of it, we can get the current task
342 // and grab the parameters from it.
343 var thisTask = Task.InternalCurrent as ReadWriteTask;
344 Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
346 // Do the Read and return the number of bytes read
347 var bytesRead = thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
348 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
350 }, state, this, buffer, offset, count, callback);
353 if (semaphoreTask != null)
354 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
356 RunReadWriteTask(asyncResult);
359 return asyncResult; // return it
363 public virtual int EndRead(IAsyncResult asyncResult)
365 if (asyncResult == null)
366 throw new ArgumentNullException("asyncResult");
367 Contract.Ensures(Contract.Result<int>() >= 0);
368 Contract.EndContractBlock();
370 #if !NEW_EXPERIMENTAL_ASYNC_IO
371 return BlockingEndRead(asyncResult);
373 // Mango did not do async IO.
374 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
376 return BlockingEndRead(asyncResult);
379 var readTask = _activeReadWriteTask;
381 if (readTask == null)
383 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
385 else if (readTask != asyncResult)
387 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
389 else if (!readTask._isRead)
391 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
396 return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
400 _activeReadWriteTask = null;
401 Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
402 _asyncActiveSemaphore.Release();
408 [HostProtection(ExternalThreading = true)]
410 public Task<int> ReadAsync(Byte[] buffer, int offset, int count)
412 return ReadAsync(buffer, offset, count, CancellationToken.None);
415 [HostProtection(ExternalThreading = true)]
417 public virtual Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
419 // If cancellation was requested, bail early with an already completed task.
420 // Otherwise, return a task that represents the Begin/End methods.
421 return cancellationToken.IsCancellationRequested
422 ? Task.FromCancellation<int>(cancellationToken)
423 : BeginEndReadAsync(buffer, offset, count);
426 private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
428 return TaskFactory<Int32>.FromAsyncTrim(
429 this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
430 (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
431 (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
434 private struct ReadWriteParameters // struct for arguments to Read and Write calls
436 internal byte[] Buffer;
440 #endif //FEATURE_ASYNC_IO
444 [HostProtection(ExternalThreading=true)]
445 public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
447 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
448 return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
451 [HostProtection(ExternalThreading = true)]
452 internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
454 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
455 if (!CanWrite) __Error.WriteNotSupported();
456 #if !NEW_EXPERIMENTAL_ASYNC_IO
457 return BlockingBeginWrite(buffer, offset, count, callback, state);
460 // Mango did not do Async IO.
461 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
463 return BlockingBeginWrite(buffer, offset, count, callback, state);
466 // To avoid a race with a stream's position pointer & generating ----
467 // conditions with internal buffer indexes in our own streams that
468 // don't natively support async IO operations when there are multiple
469 // async requests outstanding, we will block the application's main
470 // thread if it does a second IO request until the first one completes.
471 var semaphore = EnsureAsyncActiveSemaphoreInitialized();
472 Task semaphoreTask = null;
473 if (serializeAsynchronously)
475 semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
479 semaphore.Wait(); // synchronously wait here
482 // Create the task to asynchronously do a Write. This task serves both
483 // as the asynchronous work item and as the IAsyncResult returned to the user.
484 var asyncResult = new ReadWriteTask(false /*isRead*/, delegate
486 // The ReadWriteTask stores all of the parameters to pass to Write.
487 // As we're currently inside of it, we can get the current task
488 // and grab the parameters from it.
489 var thisTask = Task.InternalCurrent as ReadWriteTask;
490 Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
493 thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
494 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
495 return 0; // not used, but signature requires a value be returned
496 }, state, this, buffer, offset, count, callback);
499 if (semaphoreTask != null)
500 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
502 RunReadWriteTask(asyncResult);
504 return asyncResult; // return it
508 #if NEW_EXPERIMENTAL_ASYNC_IO
509 private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask)
511 Contract.Assert(readWriteTask != null); // Should be Contract.Requires, but CCRewrite is doing a poor job with
512 // preconditions in async methods that await. Mike & Manuel are aware. (10/6/2011, bug 290222)
513 Contract.Assert(asyncWaiter != null); // Ditto
515 // If the wait has already complete, run the task.
516 if (asyncWaiter.IsCompleted)
518 Contract.Assert(asyncWaiter.IsRanToCompletion, "The semaphore wait should always complete successfully.");
519 RunReadWriteTask(readWriteTask);
521 else // Otherwise, wait for our turn, and then run the task.
523 asyncWaiter.ContinueWith((t, state) =>
525 Contract.Assert(t.IsRanToCompletion, "The semaphore wait should always complete successfully.");
526 var tuple = (Tuple<Stream,ReadWriteTask>)state;
527 tuple.Item1.RunReadWriteTask(tuple.Item2); // RunReadWriteTask(readWriteTask);
528 }, Tuple.Create<Stream,ReadWriteTask>(this, readWriteTask),
529 default(CancellationToken),
530 TaskContinuationOptions.ExecuteSynchronously,
531 TaskScheduler.Default);
535 private void RunReadWriteTask(ReadWriteTask readWriteTask)
537 Contract.Requires(readWriteTask != null);
538 Contract.Assert(_activeReadWriteTask == null, "Expected no other readers or writers");
540 // Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
541 // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
542 // two interlocked operations. However, if ReadWriteTask is ever changed to use
543 // a cancellation token, this should be changed to use Start.
544 _activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one
545 readWriteTask.m_taskScheduler = TaskScheduler.Default;
546 readWriteTask.ScheduleAndStart(needsProtection: false);
550 public virtual void EndWrite(IAsyncResult asyncResult)
552 if (asyncResult==null)
553 throw new ArgumentNullException("asyncResult");
554 Contract.EndContractBlock();
556 #if !NEW_EXPERIMENTAL_ASYNC_IO
557 BlockingEndWrite(asyncResult);
560 // Mango did not do Async IO.
561 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
563 BlockingEndWrite(asyncResult);
567 var writeTask = _activeReadWriteTask;
568 if (writeTask == null)
570 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
572 else if (writeTask != asyncResult)
574 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
576 else if (writeTask._isRead)
578 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
583 writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
584 Contract.Assert(writeTask.Status == TaskStatus.RanToCompletion);
588 _activeReadWriteTask = null;
589 Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
590 _asyncActiveSemaphore.Release();
595 #if NEW_EXPERIMENTAL_ASYNC_IO
596 // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
597 // A single instance of this task serves four purposes:
598 // 1. The work item scheduled to run the Read / Write operation
599 // 2. The state holding the arguments to be passed to Read / Write
600 // 3. The IAsyncResult returned from BeginRead / BeginWrite
601 // 4. The completion action that runs to invoke the user-provided callback.
602 // This last item is a bit tricky. Before the AsyncCallback is invoked, the
603 // IAsyncResult must have completed, so we can't just invoke the handler
604 // from within the task, since it is the IAsyncResult, and thus it's not
605 // yet completed. Instead, we use AddCompletionAction to install this
606 // task as its own completion handler. That saves the need to allocate
607 // a separate completion handler, it guarantees that the task will
608 // have completed by the time the handler is invoked, and it allows
609 // the handler to be invoked synchronously upon the completion of the
610 // task. This all enables BeginRead / BeginWrite to be implemented
611 // with a single allocation.
612 private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
614 internal readonly bool _isRead;
615 internal Stream _stream;
616 internal byte [] _buffer;
617 internal int _offset;
619 private AsyncCallback _callback;
620 private ExecutionContext _context;
622 internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
628 [SecuritySafeCritical] // necessary for EC.Capture
629 [MethodImpl(MethodImplOptions.NoInlining)]
630 public ReadWriteTask(
632 Func<object,int> function, object state,
633 Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback) :
634 base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
636 Contract.Requires(function != null);
637 Contract.Requires(stream != null);
638 Contract.Requires(buffer != null);
639 Contract.EndContractBlock();
641 StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
643 // Store the arguments
650 // If a callback was provided, we need to:
651 // - Store the user-provided handler
652 // - Capture an ExecutionContext under which to invoke the handler
653 // - Add this task as its own completion handler so that the Invoke method
654 // will run the callback when this task completes.
655 if (callback != null)
657 _callback = callback;
658 _context = ExecutionContext.Capture(ref stackMark,
659 ExecutionContext.CaptureOptions.OptimizeDefaultCase | ExecutionContext.CaptureOptions.IgnoreSyncCtx);
660 base.AddCompletionAction(this);
664 [SecurityCritical] // necessary for CoreCLR
665 private static void InvokeAsyncCallback(object completedTask)
667 var rwc = (ReadWriteTask)completedTask;
668 var callback = rwc._callback;
669 rwc._callback = null;
673 [SecurityCritical] // necessary for CoreCLR
674 private static ContextCallback s_invokeAsyncCallback;
676 [SecuritySafeCritical] // necessary for ExecutionContext.Run
677 void ITaskCompletionAction.Invoke(Task completingTask)
679 // Get the ExecutionContext. If there is none, just run the callback
680 // directly, passing in the completed task as the IAsyncResult.
681 // If there is one, process it with ExecutionContext.Run.
682 var context = _context;
685 var callback = _callback;
687 callback(completingTask);
693 var invokeAsyncCallback = s_invokeAsyncCallback;
694 if (invokeAsyncCallback == null) s_invokeAsyncCallback = invokeAsyncCallback = InvokeAsyncCallback; // benign ----
696 using(context) ExecutionContext.Run(context, invokeAsyncCallback, this, true);
703 [HostProtection(ExternalThreading = true)]
705 public Task WriteAsync(Byte[] buffer, int offset, int count)
707 return WriteAsync(buffer, offset, count, CancellationToken.None);
710 [HostProtection(ExternalThreading = true)]
712 public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
714 // If cancellation was requested, bail early with an already completed task.
715 // Otherwise, return a task that represents the Begin/End methods.
716 return cancellationToken.IsCancellationRequested
717 ? Task.FromCancellation(cancellationToken)
718 : BeginEndWriteAsync(buffer, offset, count);
722 private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count)
724 return TaskFactory<VoidTaskResult>.FromAsyncTrim(
725 this, new ReadWriteParameters { Buffer=buffer, Offset=offset, Count=count },
726 (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
727 (stream, asyncResult) => // cached by compiler
729 stream.EndWrite(asyncResult);
730 return default(VoidTaskResult);
733 #endif // FEATURE_ASYNC_IO
735 public abstract long Seek(long offset, SeekOrigin origin);
737 public abstract void SetLength(long value);
739 public abstract int Read([In, Out] byte[] buffer, int offset, int count);
741 // Reads one byte from the stream by calling Read(byte[], int, int).
742 // Will return an unsigned byte cast to an int or -1 on end of stream.
743 // This implementation does not perform well because it allocates a new
744 // byte[] each time you call it, and should be overridden by any
745 // subclass that maintains an internal buffer. Then, it can help perf
746 // significantly for people who are reading one byte at a time.
748 [TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
750 public virtual int ReadByte()
752 Contract.Ensures(Contract.Result<int>() >= -1);
753 Contract.Ensures(Contract.Result<int>() < 256);
755 byte[] oneByteArray = new byte[1];
756 int r = Read(oneByteArray, 0, 1);
759 return oneByteArray[0];
762 public abstract void Write(byte[] buffer, int offset, int count);
764 // Writes one byte from the stream by calling Write(byte[], int, int).
765 // This implementation does not perform well because it allocates a new
766 // byte[] each time you call it, and should be overridden by any
767 // subclass that maintains an internal buffer. Then, it can help perf
768 // significantly for people who are writing one byte at a time.
769 public virtual void WriteByte(byte value)
771 byte[] oneByteArray = new byte[1];
772 oneByteArray[0] = value;
773 Write(oneByteArray, 0, 1);
776 [HostProtection(Synchronization=true)]
777 public static Stream Synchronized(Stream stream)
780 throw new ArgumentNullException("stream");
781 Contract.Ensures(Contract.Result<Stream>() != null);
782 Contract.EndContractBlock();
783 if (stream is SyncStream)
786 return new SyncStream(stream);
789 #if !FEATURE_PAL // This method shouldn't have been exposed in Dev10 (we revised object invariants after locking down).
790 [Obsolete("Do not call or override this method.")]
791 protected virtual void ObjectInvariant()
796 internal IAsyncResult BlockingBeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
798 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
800 // To avoid a race with a stream's position pointer & generating ----
801 // conditions with internal buffer indexes in our own streams that
802 // don't natively support async IO operations when there are multiple
803 // async requests outstanding, we will block the application's main
804 // thread and do the IO synchronously.
805 // This can't perform well - use a different approach.
806 SynchronousAsyncResult asyncResult;
808 int numRead = Read(buffer, offset, count);
809 asyncResult = new SynchronousAsyncResult(numRead, state);
811 catch (IOException ex) {
812 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: false);
815 if (callback != null) {
816 callback(asyncResult);
822 internal static int BlockingEndRead(IAsyncResult asyncResult)
824 Contract.Ensures(Contract.Result<int>() >= 0);
826 return SynchronousAsyncResult.EndRead(asyncResult);
829 internal IAsyncResult BlockingBeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
831 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
833 // To avoid a race with a stream's position pointer & generating ----
834 // conditions with internal buffer indexes in our own streams that
835 // don't natively support async IO operations when there are multiple
836 // async requests outstanding, we will block the application's main
837 // thread and do the IO synchronously.
838 // This can't perform well - use a different approach.
839 SynchronousAsyncResult asyncResult;
841 Write(buffer, offset, count);
842 asyncResult = new SynchronousAsyncResult(state);
844 catch (IOException ex) {
845 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: true);
848 if (callback != null) {
849 callback(asyncResult);
855 internal static void BlockingEndWrite(IAsyncResult asyncResult)
857 SynchronousAsyncResult.EndWrite(asyncResult);
861 private sealed class NullStream : Stream
863 internal NullStream() {}
865 public override bool CanRead {
870 public override bool CanWrite {
875 public override bool CanSeek {
880 public override long Length {
884 public override long Position {
889 protected override void Dispose(bool disposing)
891 // Do nothing - we don't want NullStream singleton (static) to be closable
894 public override void Flush()
900 public override Task FlushAsync(CancellationToken cancellationToken)
902 return cancellationToken.IsCancellationRequested ?
903 Task.FromCancellation(cancellationToken) :
906 #endif // FEATURE_ASYNC_IO
908 [HostProtection(ExternalThreading = true)]
909 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
911 if (!CanRead) __Error.ReadNotSupported();
913 return BlockingBeginRead(buffer, offset, count, callback, state);
916 public override int EndRead(IAsyncResult asyncResult)
918 if (asyncResult == null)
919 throw new ArgumentNullException("asyncResult");
920 Contract.EndContractBlock();
922 return BlockingEndRead(asyncResult);
925 [HostProtection(ExternalThreading = true)]
926 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
928 if (!CanWrite) __Error.WriteNotSupported();
930 return BlockingBeginWrite(buffer, offset, count, callback, state);
933 public override void EndWrite(IAsyncResult asyncResult)
935 if (asyncResult == null)
936 throw new ArgumentNullException("asyncResult");
937 Contract.EndContractBlock();
939 BlockingEndWrite(asyncResult);
942 public override int Read([In, Out] byte[] buffer, int offset, int count)
949 public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
951 var nullReadTask = s_nullReadTask;
952 if (nullReadTask == null)
953 s_nullReadTask = nullReadTask = new Task<int>(false, 0, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, CancellationToken.None); // benign ----
956 private static Task<int> s_nullReadTask;
957 #endif //FEATURE_ASYNC_IO
959 public override int ReadByte()
964 public override void Write(byte[] buffer, int offset, int count)
970 public override Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
972 return cancellationToken.IsCancellationRequested ?
973 Task.FromCancellation(cancellationToken) :
976 #endif // FEATURE_ASYNC_IO
978 public override void WriteByte(byte value)
982 public override long Seek(long offset, SeekOrigin origin)
987 public override void SetLength(long length)
993 /// <summary>Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class.</summary>
994 internal sealed class SynchronousAsyncResult : IAsyncResult {
996 private readonly Object _stateObject;
997 private readonly bool _isWrite;
998 private ManualResetEvent _waitHandle;
999 private ExceptionDispatchInfo _exceptionInfo;
1001 private bool _endXxxCalled;
1002 private Int32 _bytesRead;
1004 internal SynchronousAsyncResult(Int32 bytesRead, Object asyncStateObject) {
1005 _bytesRead = bytesRead;
1006 _stateObject = asyncStateObject;
1010 internal SynchronousAsyncResult(Object asyncStateObject) {
1011 _stateObject = asyncStateObject;
1015 internal SynchronousAsyncResult(Exception ex, Object asyncStateObject, bool isWrite) {
1016 _exceptionInfo = ExceptionDispatchInfo.Capture(ex);
1017 _stateObject = asyncStateObject;
1021 public bool IsCompleted {
1022 // We never hand out objects of this type to the user before the synchronous IO completed:
1023 get { return true; }
1026 public WaitHandle AsyncWaitHandle {
1028 return LazyInitializer.EnsureInitialized(ref _waitHandle, () => new ManualResetEvent(true));
1032 public Object AsyncState {
1033 get { return _stateObject; }
1036 public bool CompletedSynchronously {
1037 get { return true; }
1040 internal void ThrowIfError() {
1041 if (_exceptionInfo != null)
1042 _exceptionInfo.Throw();
1045 internal static Int32 EndRead(IAsyncResult asyncResult) {
1047 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1048 if (ar == null || ar._isWrite)
1049 __Error.WrongAsyncResult();
1051 if (ar._endXxxCalled)
1052 __Error.EndReadCalledTwice();
1054 ar._endXxxCalled = true;
1057 return ar._bytesRead;
1060 internal static void EndWrite(IAsyncResult asyncResult) {
1062 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1063 if (ar == null || !ar._isWrite)
1064 __Error.WrongAsyncResult();
1066 if (ar._endXxxCalled)
1067 __Error.EndWriteCalledTwice();
1069 ar._endXxxCalled = true;
1073 } // class SynchronousAsyncResult
1076 // SyncStream is a wrapper around a stream that takes
1077 // a lock for every operation making it thread safe.
1079 internal sealed class SyncStream : Stream, IDisposable
1081 private Stream _stream;
1083 private bool? _overridesBeginRead;
1085 private bool? _overridesBeginWrite;
1087 internal SyncStream(Stream stream)
1090 throw new ArgumentNullException("stream");
1091 Contract.EndContractBlock();
1095 public override bool CanRead {
1097 get { return _stream.CanRead; }
1100 public override bool CanWrite {
1102 get { return _stream.CanWrite; }
1105 public override bool CanSeek {
1107 get { return _stream.CanSeek; }
1111 public override bool CanTimeout {
1114 return _stream.CanTimeout;
1118 public override long Length {
1121 return _stream.Length;
1126 public override long Position {
1129 return _stream.Position;
1134 _stream.Position = value;
1140 public override int ReadTimeout {
1142 return _stream.ReadTimeout;
1145 _stream.ReadTimeout = value;
1150 public override int WriteTimeout {
1152 return _stream.WriteTimeout;
1155 _stream.WriteTimeout = value;
1159 // In the off chance that some wrapped stream has different
1160 // semantics for Close vs. Dispose, let's preserve that.
1161 public override void Close()
1173 protected override void Dispose(bool disposing)
1177 // Explicitly pick up a potentially methodimpl'ed Dispose
1179 ((IDisposable)_stream).Dispose();
1182 base.Dispose(disposing);
1187 public override void Flush()
1193 public override int Read([In, Out]byte[] bytes, int offset, int count)
1196 return _stream.Read(bytes, offset, count);
1199 public override int ReadByte()
1202 return _stream.ReadByte();
1205 private static bool OverridesBeginMethod(Stream stream, string methodName)
1207 Contract.Requires(stream != null, "Expected a non-null stream.");
1208 Contract.Requires(methodName == "BeginRead" || methodName == "BeginWrite",
1209 "Expected BeginRead or BeginWrite as the method name to check.");
1211 // Get all of the methods on the underlying stream
1212 var methods = stream.GetType().GetMethods(BindingFlags.Public | BindingFlags.Instance);
1214 // If any of the methods have the desired name and are defined on the base Stream
1215 // Type, then the method was not overridden. If none of them were defined on the
1216 // base Stream, then it must have been overridden.
1217 foreach (var method in methods)
1219 if (method.DeclaringType == typeof(Stream) &&
1220 method.Name == methodName)
1228 [HostProtection(ExternalThreading=true)]
1229 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1231 // Lazily-initialize whether the wrapped stream overrides BeginRead
1232 if (_overridesBeginRead == null)
1234 _overridesBeginRead = OverridesBeginMethod(_stream, "BeginRead");
1239 // If the Stream does have its own BeginRead implementation, then we must use that override.
1240 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1241 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1242 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1243 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1244 // _stream due to this call blocked while holding the lock.
1245 return _overridesBeginRead.Value ?
1246 _stream.BeginRead(buffer, offset, count, callback, state) :
1247 _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
1251 public override int EndRead(IAsyncResult asyncResult)
1253 if (asyncResult == null)
1254 throw new ArgumentNullException("asyncResult");
1255 Contract.Ensures(Contract.Result<int>() >= 0);
1256 Contract.EndContractBlock();
1259 return _stream.EndRead(asyncResult);
1262 public override long Seek(long offset, SeekOrigin origin)
1265 return _stream.Seek(offset, origin);
1268 public override void SetLength(long length)
1271 _stream.SetLength(length);
1274 public override void Write(byte[] bytes, int offset, int count)
1277 _stream.Write(bytes, offset, count);
1280 public override void WriteByte(byte b)
1283 _stream.WriteByte(b);
1286 [HostProtection(ExternalThreading=true)]
1287 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1289 // Lazily-initialize whether the wrapped stream overrides BeginWrite
1290 if (_overridesBeginWrite == null)
1292 _overridesBeginWrite = OverridesBeginMethod(_stream, "BeginWrite");
1297 // If the Stream does have its own BeginWrite implementation, then we must use that override.
1298 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1299 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1300 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1301 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1302 // _stream due to this call blocked while holding the lock.
1303 return _overridesBeginWrite.Value ?
1304 _stream.BeginWrite(buffer, offset, count, callback, state) :
1305 _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
1309 public override void EndWrite(IAsyncResult asyncResult)
1311 if (asyncResult == null)
1312 throw new ArgumentNullException("asyncResult");
1313 Contract.EndContractBlock();
1316 _stream.EndWrite(asyncResult);
1322 [ContractClassFor(typeof(Stream))]
1323 internal abstract class StreamContract : Stream
1325 public override long Seek(long offset, SeekOrigin origin)
1327 Contract.Ensures(Contract.Result<long>() >= 0);
1328 throw new NotImplementedException();
1331 public override void SetLength(long value)
1333 throw new NotImplementedException();
1336 public override int Read(byte[] buffer, int offset, int count)
1338 Contract.Ensures(Contract.Result<int>() >= 0);
1339 Contract.Ensures(Contract.Result<int>() <= count);
1340 throw new NotImplementedException();
1343 public override void Write(byte[] buffer, int offset, int count)
1345 throw new NotImplementedException();
1348 public override long Position {
1350 Contract.Ensures(Contract.Result<long>() >= 0);
1351 throw new NotImplementedException();
1354 throw new NotImplementedException();
1358 public override void Flush()
1360 throw new NotImplementedException();
1363 public override bool CanRead {
1364 get { throw new NotImplementedException(); }
1367 public override bool CanWrite {
1368 get { throw new NotImplementedException(); }
1371 public override bool CanSeek {
1372 get { throw new NotImplementedException(); }
1375 public override long Length
1378 Contract.Ensures(Contract.Result<long>() >= 0);
1379 throw new NotImplementedException();
1383 #endif // CONTRACTS_FULL