3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 /*============================================================
10 ** <OWNER>gpaperin</OWNER>
13 ** Purpose: Abstract base class for all Streams. Provides
14 ** default implementations of asynchronous reads & writes, in
15 ** terms of the synchronous reads & writes (and vice versa).
18 ===========================================================*/
20 using System.Threading;
22 using System.Threading.Tasks;
26 using System.Runtime.InteropServices;
27 #if NEW_EXPERIMENTAL_ASYNC_IO
28 using System.Runtime.CompilerServices;
30 using System.Runtime.ExceptionServices;
31 using System.Security;
32 using System.Security.Permissions;
33 using System.Diagnostics.Contracts;
34 using System.Reflection;
40 [ContractClass(typeof(StreamContract))]
42 #if FEATURE_REMOTING || MONO
43 public abstract class Stream : MarshalByRefObject, IDisposable {
44 #else // FEATURE_REMOTING
45 public abstract class Stream : IDisposable {
46 #endif // FEATURE_REMOTING
48 public static readonly Stream Null = new NullStream();
50 //We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
51 // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
52 // improvement in Copy performance.
53 private const int _DefaultCopyBufferSize = 81920;
55 #if NEW_EXPERIMENTAL_ASYNC_IO
56 // To implement Async IO operations on streams that don't support async IO
59 private ReadWriteTask _activeReadWriteTask;
61 private SemaphoreSlim _asyncActiveSemaphore;
63 internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
65 // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
66 // WaitHandle, we don't need to worry about Disposing it.
67 return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1));
71 public abstract bool CanRead {
76 // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
77 public abstract bool CanSeek {
83 public virtual bool CanTimeout {
90 public abstract bool CanWrite {
95 public abstract long Length {
99 public abstract long Position {
105 public virtual int ReadTimeout {
107 Contract.Ensures(Contract.Result<int>() >= 0);
108 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
111 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
116 public virtual int WriteTimeout {
118 Contract.Ensures(Contract.Result<int>() >= 0);
119 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
122 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
127 [HostProtection(ExternalThreading = true)]
129 public Task CopyToAsync(Stream destination)
131 return CopyToAsync(destination, _DefaultCopyBufferSize);
134 [HostProtection(ExternalThreading = true)]
136 public Task CopyToAsync(Stream destination, Int32 bufferSize)
138 return CopyToAsync(destination, bufferSize, CancellationToken.None);
141 [HostProtection(ExternalThreading = true)]
143 public virtual Task CopyToAsync(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
145 if (destination == null)
146 throw new ArgumentNullException("destination");
148 throw new ArgumentOutOfRangeException("bufferSize", Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
149 if (!CanRead && !CanWrite)
150 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
151 if (!destination.CanRead && !destination.CanWrite)
152 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
154 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
155 if (!destination.CanWrite)
156 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
157 Contract.EndContractBlock();
159 return CopyToAsyncInternal(destination, bufferSize, cancellationToken);
162 private async Task CopyToAsyncInternal(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
164 Contract.Requires(destination != null);
165 Contract.Requires(bufferSize > 0);
166 Contract.Requires(CanRead);
167 Contract.Requires(destination.CanWrite);
169 byte[] buffer = new byte[bufferSize];
171 while ((bytesRead = await ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
173 await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
176 #endif // FEATURE_ASYNC_IO
178 // Reads the bytes from the current stream and writes the bytes to
179 // the destination stream until all bytes are read, starting at
180 // the current position.
181 public void CopyTo(Stream destination)
183 if (destination == null)
184 throw new ArgumentNullException("destination");
185 if (!CanRead && !CanWrite)
186 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
187 if (!destination.CanRead && !destination.CanWrite)
188 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
190 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
191 if (!destination.CanWrite)
192 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
193 Contract.EndContractBlock();
195 InternalCopyTo(destination, _DefaultCopyBufferSize);
198 public void CopyTo(Stream destination, int bufferSize)
200 if (destination == null)
201 throw new ArgumentNullException("destination");
203 throw new ArgumentOutOfRangeException("bufferSize",
204 Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
205 if (!CanRead && !CanWrite)
206 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
207 if (!destination.CanRead && !destination.CanWrite)
208 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
210 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
211 if (!destination.CanWrite)
212 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
213 Contract.EndContractBlock();
215 InternalCopyTo(destination, bufferSize);
218 private void InternalCopyTo(Stream destination, int bufferSize)
220 Contract.Requires(destination != null);
221 Contract.Requires(CanRead);
222 Contract.Requires(destination.CanWrite);
223 Contract.Requires(bufferSize > 0);
225 byte[] buffer = new byte[bufferSize];
227 while ((read = Read(buffer, 0, buffer.Length)) != 0)
228 destination.Write(buffer, 0, read);
232 // Stream used to require that all cleanup logic went into Close(),
233 // which was thought up before we invented IDisposable. However, we
234 // need to follow the IDisposable pattern so that users can write
235 // sensible subclasses without needing to inspect all their base
236 // classes, and without worrying about version brittleness, from a
237 // base class switching to the Dispose pattern. We're moving
238 // Stream to the Dispose(bool) pattern - that's where all subclasses
239 // should put their cleanup starting in V2.
240 public virtual void Close()
242 /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
243 Contract.Ensures(CanRead == false);
244 Contract.Ensures(CanWrite == false);
245 Contract.Ensures(CanSeek == false);
249 GC.SuppressFinalize(this);
252 public void Dispose()
254 /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
255 Contract.Ensures(CanRead == false);
256 Contract.Ensures(CanWrite == false);
257 Contract.Ensures(CanSeek == false);
264 protected virtual void Dispose(bool disposing)
266 // Note: Never change this to call other virtual methods on Stream
267 // like Write, since the state on subclasses has already been
268 // torn down. This is the last code to run on cleanup for a stream.
271 public abstract void Flush();
274 [HostProtection(ExternalThreading=true)]
276 public Task FlushAsync()
278 return FlushAsync(CancellationToken.None);
281 [HostProtection(ExternalThreading=true)]
283 public virtual Task FlushAsync(CancellationToken cancellationToken)
285 return Task.Factory.StartNew(state => ((Stream)state).Flush(), this,
286 cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
288 #endif // FEATURE_ASYNC_IO
290 [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
291 protected virtual WaitHandle CreateWaitHandle()
293 Contract.Ensures(Contract.Result<WaitHandle>() != null);
294 return new ManualResetEvent(false);
297 [HostProtection(ExternalThreading=true)]
298 public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
300 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
301 return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
304 [HostProtection(ExternalThreading = true)]
305 internal IAsyncResult BeginReadInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
307 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
308 if (!CanRead) __Error.ReadNotSupported();
310 #if !NEW_EXPERIMENTAL_ASYNC_IO
311 return BlockingBeginRead(buffer, offset, count, callback, state);
314 // Mango did not do Async IO.
315 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
317 return BlockingBeginRead(buffer, offset, count, callback, state);
320 // To avoid a race with a stream's position pointer & generating ----
321 // conditions with internal buffer indexes in our own streams that
322 // don't natively support async IO operations when there are multiple
323 // async requests outstanding, we will block the application's main
324 // thread if it does a second IO request until the first one completes.
325 var semaphore = EnsureAsyncActiveSemaphoreInitialized();
326 Task semaphoreTask = null;
327 if (serializeAsynchronously)
329 semaphoreTask = semaphore.WaitAsync();
336 // Create the task to asynchronously do a Read. This task serves both
337 // as the asynchronous work item and as the IAsyncResult returned to the user.
338 var asyncResult = new ReadWriteTask(true /*isRead*/, delegate
340 // The ReadWriteTask stores all of the parameters to pass to Read.
341 // As we're currently inside of it, we can get the current task
342 // and grab the parameters from it.
343 var thisTask = Task.InternalCurrent as ReadWriteTask;
344 Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
346 // Do the Read and return the number of bytes read
347 var bytesRead = thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
348 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
350 }, state, this, buffer, offset, count, callback);
353 if (semaphoreTask != null)
354 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
356 RunReadWriteTask(asyncResult);
359 return asyncResult; // return it
363 public virtual int EndRead(IAsyncResult asyncResult)
365 if (asyncResult == null)
366 throw new ArgumentNullException("asyncResult");
367 Contract.Ensures(Contract.Result<int>() >= 0);
368 Contract.EndContractBlock();
370 #if !NEW_EXPERIMENTAL_ASYNC_IO
371 return BlockingEndRead(asyncResult);
373 // Mango did not do async IO.
374 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
376 return BlockingEndRead(asyncResult);
379 var readTask = _activeReadWriteTask;
381 if (readTask == null)
383 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
385 else if (readTask != asyncResult)
387 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
389 else if (!readTask._isRead)
391 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
396 return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
400 _activeReadWriteTask = null;
401 Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
402 _asyncActiveSemaphore.Release();
408 [HostProtection(ExternalThreading = true)]
410 public Task<int> ReadAsync(Byte[] buffer, int offset, int count)
412 return ReadAsync(buffer, offset, count, CancellationToken.None);
415 [HostProtection(ExternalThreading = true)]
417 public virtual Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
419 // If cancellation was requested, bail early with an already completed task.
420 // Otherwise, return a task that represents the Begin/End methods.
421 return cancellationToken.IsCancellationRequested
422 ? Task.FromCancellation<int>(cancellationToken)
423 : BeginEndReadAsync(buffer, offset, count);
426 private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
428 return TaskFactory<Int32>.FromAsyncTrim(
429 this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
430 (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
431 (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
434 private struct ReadWriteParameters // struct for arguments to Read and Write calls
436 internal byte[] Buffer;
440 #endif //FEATURE_ASYNC_IO
444 [HostProtection(ExternalThreading=true)]
445 public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
447 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
448 return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
451 [HostProtection(ExternalThreading = true)]
452 internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
454 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
455 if (!CanWrite) __Error.WriteNotSupported();
456 #if !NEW_EXPERIMENTAL_ASYNC_IO
457 return BlockingBeginWrite(buffer, offset, count, callback, state);
460 // Mango did not do Async IO.
461 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
463 return BlockingBeginWrite(buffer, offset, count, callback, state);
466 // To avoid a race with a stream's position pointer & generating ----
467 // conditions with internal buffer indexes in our own streams that
468 // don't natively support async IO operations when there are multiple
469 // async requests outstanding, we will block the application's main
470 // thread if it does a second IO request until the first one completes.
471 var semaphore = EnsureAsyncActiveSemaphoreInitialized();
472 Task semaphoreTask = null;
473 if (serializeAsynchronously)
475 semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
479 semaphore.Wait(); // synchronously wait here
482 // Create the task to asynchronously do a Write. This task serves both
483 // as the asynchronous work item and as the IAsyncResult returned to the user.
484 var asyncResult = new ReadWriteTask(false /*isRead*/, delegate
486 // The ReadWriteTask stores all of the parameters to pass to Write.
487 // As we're currently inside of it, we can get the current task
488 // and grab the parameters from it.
489 var thisTask = Task.InternalCurrent as ReadWriteTask;
490 Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
493 thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
494 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
495 return 0; // not used, but signature requires a value be returned
496 }, state, this, buffer, offset, count, callback);
499 if (semaphoreTask != null)
500 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
502 RunReadWriteTask(asyncResult);
504 return asyncResult; // return it
508 #if NEW_EXPERIMENTAL_ASYNC_IO
509 private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask)
511 Contract.Assert(readWriteTask != null); // Should be Contract.Requires, but CCRewrite is doing a poor job with
512 // preconditions in async methods that await. Mike & Manuel are aware. (10/6/2011, bug 290222)
513 Contract.Assert(asyncWaiter != null); // Ditto
515 // If the wait has already complete, run the task.
516 if (asyncWaiter.IsCompleted)
518 Contract.Assert(asyncWaiter.IsRanToCompletion, "The semaphore wait should always complete successfully.");
519 RunReadWriteTask(readWriteTask);
521 else // Otherwise, wait for our turn, and then run the task.
523 asyncWaiter.ContinueWith((t, state) =>
525 Contract.Assert(t.IsRanToCompletion, "The semaphore wait should always complete successfully.");
526 var tuple = (Tuple<Stream,ReadWriteTask>)state;
527 tuple.Item1.RunReadWriteTask(tuple.Item2); // RunReadWriteTask(readWriteTask);
528 }, Tuple.Create<Stream,ReadWriteTask>(this, readWriteTask),
529 default(CancellationToken),
530 TaskContinuationOptions.ExecuteSynchronously,
531 TaskScheduler.Default);
535 private void RunReadWriteTask(ReadWriteTask readWriteTask)
537 Contract.Requires(readWriteTask != null);
538 Contract.Assert(_activeReadWriteTask == null, "Expected no other readers or writers");
540 // Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
541 // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
542 // two interlocked operations. However, if ReadWriteTask is ever changed to use
543 // a cancellation token, this should be changed to use Start.
544 _activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one
545 readWriteTask.m_taskScheduler = TaskScheduler.Default;
546 readWriteTask.ScheduleAndStart(needsProtection: false);
550 public virtual void EndWrite(IAsyncResult asyncResult)
552 if (asyncResult==null)
553 throw new ArgumentNullException("asyncResult");
554 Contract.EndContractBlock();
556 #if !NEW_EXPERIMENTAL_ASYNC_IO
557 BlockingEndWrite(asyncResult);
560 // Mango did not do Async IO.
561 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
563 BlockingEndWrite(asyncResult);
567 var writeTask = _activeReadWriteTask;
568 if (writeTask == null)
570 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
572 else if (writeTask != asyncResult)
574 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
576 else if (writeTask._isRead)
578 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
583 writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
584 Contract.Assert(writeTask.Status == TaskStatus.RanToCompletion);
588 _activeReadWriteTask = null;
589 Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
590 _asyncActiveSemaphore.Release();
595 #if NEW_EXPERIMENTAL_ASYNC_IO
596 // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
597 // A single instance of this task serves four purposes:
598 // 1. The work item scheduled to run the Read / Write operation
599 // 2. The state holding the arguments to be passed to Read / Write
600 // 3. The IAsyncResult returned from BeginRead / BeginWrite
601 // 4. The completion action that runs to invoke the user-provided callback.
602 // This last item is a bit tricky. Before the AsyncCallback is invoked, the
603 // IAsyncResult must have completed, so we can't just invoke the handler
604 // from within the task, since it is the IAsyncResult, and thus it's not
605 // yet completed. Instead, we use AddCompletionAction to install this
606 // task as its own completion handler. That saves the need to allocate
607 // a separate completion handler, it guarantees that the task will
608 // have completed by the time the handler is invoked, and it allows
609 // the handler to be invoked synchronously upon the completion of the
610 // task. This all enables BeginRead / BeginWrite to be implemented
611 // with a single allocation.
612 private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
614 internal readonly bool _isRead;
615 internal Stream _stream;
616 internal byte [] _buffer;
617 internal int _offset;
619 private AsyncCallback _callback;
620 private ExecutionContext _context;
622 internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
628 [SecuritySafeCritical] // necessary for EC.Capture
629 [MethodImpl(MethodImplOptions.NoInlining)]
630 public ReadWriteTask(
632 Func<object,int> function, object state,
633 Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback) :
634 base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
636 Contract.Requires(function != null);
637 Contract.Requires(stream != null);
638 Contract.Requires(buffer != null);
639 Contract.EndContractBlock();
641 StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
643 // Store the arguments
650 // If a callback was provided, we need to:
651 // - Store the user-provided handler
652 // - Capture an ExecutionContext under which to invoke the handler
653 // - Add this task as its own completion handler so that the Invoke method
654 // will run the callback when this task completes.
655 if (callback != null)
657 _callback = callback;
658 _context = ExecutionContext.Capture(ref stackMark,
659 ExecutionContext.CaptureOptions.OptimizeDefaultCase | ExecutionContext.CaptureOptions.IgnoreSyncCtx);
660 base.AddCompletionAction(this);
664 [SecurityCritical] // necessary for CoreCLR
665 private static void InvokeAsyncCallback(object completedTask)
667 var rwc = (ReadWriteTask)completedTask;
668 var callback = rwc._callback;
669 rwc._callback = null;
673 [SecurityCritical] // necessary for CoreCLR
674 private static ContextCallback s_invokeAsyncCallback;
676 [SecuritySafeCritical] // necessary for ExecutionContext.Run
677 void ITaskCompletionAction.Invoke(Task completingTask)
679 // Get the ExecutionContext. If there is none, just run the callback
680 // directly, passing in the completed task as the IAsyncResult.
681 // If there is one, process it with ExecutionContext.Run.
682 var context = _context;
685 var callback = _callback;
687 callback(completingTask);
693 var invokeAsyncCallback = s_invokeAsyncCallback;
694 if (invokeAsyncCallback == null) s_invokeAsyncCallback = invokeAsyncCallback = InvokeAsyncCallback; // benign ----
696 using(context) ExecutionContext.Run(context, invokeAsyncCallback, this, true);
703 [HostProtection(ExternalThreading = true)]
705 public Task WriteAsync(Byte[] buffer, int offset, int count)
707 return WriteAsync(buffer, offset, count, CancellationToken.None);
710 [HostProtection(ExternalThreading = true)]
712 public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
714 // If cancellation was requested, bail early with an already completed task.
715 // Otherwise, return a task that represents the Begin/End methods.
716 return cancellationToken.IsCancellationRequested
717 ? Task.FromCancellation(cancellationToken)
718 : BeginEndWriteAsync(buffer, offset, count);
722 private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count)
724 return TaskFactory<VoidTaskResult>.FromAsyncTrim(
725 this, new ReadWriteParameters { Buffer=buffer, Offset=offset, Count=count },
726 (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
727 (stream, asyncResult) => // cached by compiler
729 stream.EndWrite(asyncResult);
730 return default(VoidTaskResult);
733 #endif // FEATURE_ASYNC_IO
735 public abstract long Seek(long offset, SeekOrigin origin);
737 public abstract void SetLength(long value);
739 public abstract int Read([In, Out] byte[] buffer, int offset, int count);
741 // Reads one byte from the stream by calling Read(byte[], int, int).
742 // Will return an unsigned byte cast to an int or -1 on end of stream.
743 // This implementation does not perform well because it allocates a new
744 // byte[] each time you call it, and should be overridden by any
745 // subclass that maintains an internal buffer. Then, it can help perf
746 // significantly for people who are reading one byte at a time.
747 public virtual int ReadByte()
749 Contract.Ensures(Contract.Result<int>() >= -1);
750 Contract.Ensures(Contract.Result<int>() < 256);
752 byte[] oneByteArray = new byte[1];
753 int r = Read(oneByteArray, 0, 1);
756 return oneByteArray[0];
759 public abstract void Write(byte[] buffer, int offset, int count);
761 // Writes one byte from the stream by calling Write(byte[], int, int).
762 // This implementation does not perform well because it allocates a new
763 // byte[] each time you call it, and should be overridden by any
764 // subclass that maintains an internal buffer. Then, it can help perf
765 // significantly for people who are writing one byte at a time.
766 public virtual void WriteByte(byte value)
768 byte[] oneByteArray = new byte[1];
769 oneByteArray[0] = value;
770 Write(oneByteArray, 0, 1);
773 [HostProtection(Synchronization=true)]
774 public static Stream Synchronized(Stream stream)
777 throw new ArgumentNullException("stream");
778 Contract.Ensures(Contract.Result<Stream>() != null);
779 Contract.EndContractBlock();
780 if (stream is SyncStream)
783 return new SyncStream(stream);
786 #if !FEATURE_PAL || MONO // This method shouldn't have been exposed in Dev10 (we revised object invariants after locking down).
787 [Obsolete("Do not call or override this method.")]
788 protected virtual void ObjectInvariant()
793 internal IAsyncResult BlockingBeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
795 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
797 // To avoid a race with a stream's position pointer & generating ----
798 // conditions with internal buffer indexes in our own streams that
799 // don't natively support async IO operations when there are multiple
800 // async requests outstanding, we will block the application's main
801 // thread and do the IO synchronously.
802 // This can't perform well - use a different approach.
803 SynchronousAsyncResult asyncResult;
805 int numRead = Read(buffer, offset, count);
806 asyncResult = new SynchronousAsyncResult(numRead, state);
808 catch (IOException ex) {
809 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: false);
812 if (callback != null) {
813 callback(asyncResult);
819 internal static int BlockingEndRead(IAsyncResult asyncResult)
821 Contract.Ensures(Contract.Result<int>() >= 0);
823 return SynchronousAsyncResult.EndRead(asyncResult);
826 internal IAsyncResult BlockingBeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
828 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
830 // To avoid a race with a stream's position pointer & generating ----
831 // conditions with internal buffer indexes in our own streams that
832 // don't natively support async IO operations when there are multiple
833 // async requests outstanding, we will block the application's main
834 // thread and do the IO synchronously.
835 // This can't perform well - use a different approach.
836 SynchronousAsyncResult asyncResult;
838 Write(buffer, offset, count);
839 asyncResult = new SynchronousAsyncResult(state);
841 catch (IOException ex) {
842 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: true);
845 if (callback != null) {
846 callback(asyncResult);
852 internal static void BlockingEndWrite(IAsyncResult asyncResult)
854 SynchronousAsyncResult.EndWrite(asyncResult);
858 private sealed class NullStream : Stream
860 internal NullStream() {}
862 public override bool CanRead {
867 public override bool CanWrite {
872 public override bool CanSeek {
877 public override long Length {
881 public override long Position {
886 protected override void Dispose(bool disposing)
888 // Do nothing - we don't want NullStream singleton (static) to be closable
891 public override void Flush()
897 public override Task FlushAsync(CancellationToken cancellationToken)
899 return cancellationToken.IsCancellationRequested ?
900 Task.FromCancellation(cancellationToken) :
903 #endif // FEATURE_ASYNC_IO
905 [HostProtection(ExternalThreading = true)]
906 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
908 if (!CanRead) __Error.ReadNotSupported();
910 return BlockingBeginRead(buffer, offset, count, callback, state);
913 public override int EndRead(IAsyncResult asyncResult)
915 if (asyncResult == null)
916 throw new ArgumentNullException("asyncResult");
917 Contract.EndContractBlock();
919 return BlockingEndRead(asyncResult);
922 [HostProtection(ExternalThreading = true)]
923 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
925 if (!CanWrite) __Error.WriteNotSupported();
927 return BlockingBeginWrite(buffer, offset, count, callback, state);
930 public override void EndWrite(IAsyncResult asyncResult)
932 if (asyncResult == null)
933 throw new ArgumentNullException("asyncResult");
934 Contract.EndContractBlock();
936 BlockingEndWrite(asyncResult);
939 public override int Read([In, Out] byte[] buffer, int offset, int count)
946 public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
948 var nullReadTask = s_nullReadTask;
949 if (nullReadTask == null)
950 s_nullReadTask = nullReadTask = new Task<int>(false, 0, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, CancellationToken.None); // benign ----
953 private static Task<int> s_nullReadTask;
954 #endif //FEATURE_ASYNC_IO
956 public override int ReadByte()
961 public override void Write(byte[] buffer, int offset, int count)
967 public override Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
969 return cancellationToken.IsCancellationRequested ?
970 Task.FromCancellation(cancellationToken) :
973 #endif // FEATURE_ASYNC_IO
975 public override void WriteByte(byte value)
979 public override long Seek(long offset, SeekOrigin origin)
984 public override void SetLength(long length)
990 /// <summary>Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class.</summary>
991 internal sealed class SynchronousAsyncResult : IAsyncResult {
993 private readonly Object _stateObject;
994 private readonly bool _isWrite;
995 private ManualResetEvent _waitHandle;
996 private ExceptionDispatchInfo _exceptionInfo;
998 private bool _endXxxCalled;
999 private Int32 _bytesRead;
1001 internal SynchronousAsyncResult(Int32 bytesRead, Object asyncStateObject) {
1002 _bytesRead = bytesRead;
1003 _stateObject = asyncStateObject;
1007 internal SynchronousAsyncResult(Object asyncStateObject) {
1008 _stateObject = asyncStateObject;
1012 internal SynchronousAsyncResult(Exception ex, Object asyncStateObject, bool isWrite) {
1013 _exceptionInfo = ExceptionDispatchInfo.Capture(ex);
1014 _stateObject = asyncStateObject;
1018 public bool IsCompleted {
1019 // We never hand out objects of this type to the user before the synchronous IO completed:
1020 get { return true; }
1023 public WaitHandle AsyncWaitHandle {
1025 return LazyInitializer.EnsureInitialized(ref _waitHandle, () => new ManualResetEvent(true));
1029 public Object AsyncState {
1030 get { return _stateObject; }
1033 public bool CompletedSynchronously {
1034 get { return true; }
1037 internal void ThrowIfError() {
1038 if (_exceptionInfo != null)
1039 _exceptionInfo.Throw();
1042 internal static Int32 EndRead(IAsyncResult asyncResult) {
1044 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1045 if (ar == null || ar._isWrite)
1046 __Error.WrongAsyncResult();
1048 if (ar._endXxxCalled)
1049 __Error.EndReadCalledTwice();
1051 ar._endXxxCalled = true;
1054 return ar._bytesRead;
1057 internal static void EndWrite(IAsyncResult asyncResult) {
1059 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1060 if (ar == null || !ar._isWrite)
1061 __Error.WrongAsyncResult();
1063 if (ar._endXxxCalled)
1064 __Error.EndWriteCalledTwice();
1066 ar._endXxxCalled = true;
1070 } // class SynchronousAsyncResult
1073 // SyncStream is a wrapper around a stream that takes
1074 // a lock for every operation making it thread safe.
1076 internal sealed class SyncStream : Stream, IDisposable
1078 private Stream _stream;
1080 private bool? _overridesBeginRead;
1082 private bool? _overridesBeginWrite;
1084 internal SyncStream(Stream stream)
1087 throw new ArgumentNullException("stream");
1088 Contract.EndContractBlock();
1092 public override bool CanRead {
1094 get { return _stream.CanRead; }
1097 public override bool CanWrite {
1099 get { return _stream.CanWrite; }
1102 public override bool CanSeek {
1104 get { return _stream.CanSeek; }
1108 public override bool CanTimeout {
1111 return _stream.CanTimeout;
1115 public override long Length {
1118 return _stream.Length;
1123 public override long Position {
1126 return _stream.Position;
1131 _stream.Position = value;
1137 public override int ReadTimeout {
1139 return _stream.ReadTimeout;
1142 _stream.ReadTimeout = value;
1147 public override int WriteTimeout {
1149 return _stream.WriteTimeout;
1152 _stream.WriteTimeout = value;
1156 // In the off chance that some wrapped stream has different
1157 // semantics for Close vs. Dispose, let's preserve that.
1158 public override void Close()
1170 protected override void Dispose(bool disposing)
1174 // Explicitly pick up a potentially methodimpl'ed Dispose
1176 ((IDisposable)_stream).Dispose();
1179 base.Dispose(disposing);
1184 public override void Flush()
1190 public override int Read([In, Out]byte[] bytes, int offset, int count)
1193 return _stream.Read(bytes, offset, count);
1196 public override int ReadByte()
1199 return _stream.ReadByte();
1202 private static bool OverridesBeginMethod(Stream stream, string methodName)
1204 Contract.Requires(stream != null, "Expected a non-null stream.");
1205 Contract.Requires(methodName == "BeginRead" || methodName == "BeginWrite",
1206 "Expected BeginRead or BeginWrite as the method name to check.");
1208 // Get all of the methods on the underlying stream
1209 var methods = stream.GetType().GetMethods(BindingFlags.Public | BindingFlags.Instance);
1211 // If any of the methods have the desired name and are defined on the base Stream
1212 // Type, then the method was not overridden. If none of them were defined on the
1213 // base Stream, then it must have been overridden.
1214 foreach (var method in methods)
1216 if (method.DeclaringType == typeof(Stream) &&
1217 method.Name == methodName)
1225 [HostProtection(ExternalThreading=true)]
1226 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1228 // Lazily-initialize whether the wrapped stream overrides BeginRead
1229 if (_overridesBeginRead == null)
1231 _overridesBeginRead = OverridesBeginMethod(_stream, "BeginRead");
1236 // If the Stream does have its own BeginRead implementation, then we must use that override.
1237 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1238 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1239 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1240 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1241 // _stream due to this call blocked while holding the lock.
1242 return _overridesBeginRead.Value ?
1243 _stream.BeginRead(buffer, offset, count, callback, state) :
1244 _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
1248 public override int EndRead(IAsyncResult asyncResult)
1250 if (asyncResult == null)
1251 throw new ArgumentNullException("asyncResult");
1252 Contract.Ensures(Contract.Result<int>() >= 0);
1253 Contract.EndContractBlock();
1256 return _stream.EndRead(asyncResult);
1259 public override long Seek(long offset, SeekOrigin origin)
1262 return _stream.Seek(offset, origin);
1265 public override void SetLength(long length)
1268 _stream.SetLength(length);
1271 public override void Write(byte[] bytes, int offset, int count)
1274 _stream.Write(bytes, offset, count);
1277 public override void WriteByte(byte b)
1280 _stream.WriteByte(b);
1283 [HostProtection(ExternalThreading=true)]
1284 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1286 // Lazily-initialize whether the wrapped stream overrides BeginWrite
1287 if (_overridesBeginWrite == null)
1289 _overridesBeginWrite = OverridesBeginMethod(_stream, "BeginWrite");
1294 // If the Stream does have its own BeginWrite implementation, then we must use that override.
1295 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1296 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1297 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1298 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1299 // _stream due to this call blocked while holding the lock.
1300 return _overridesBeginWrite.Value ?
1301 _stream.BeginWrite(buffer, offset, count, callback, state) :
1302 _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
1306 public override void EndWrite(IAsyncResult asyncResult)
1308 if (asyncResult == null)
1309 throw new ArgumentNullException("asyncResult");
1310 Contract.EndContractBlock();
1313 _stream.EndWrite(asyncResult);
1319 [ContractClassFor(typeof(Stream))]
1320 internal abstract class StreamContract : Stream
1322 public override long Seek(long offset, SeekOrigin origin)
1324 Contract.Ensures(Contract.Result<long>() >= 0);
1325 throw new NotImplementedException();
1328 public override void SetLength(long value)
1330 throw new NotImplementedException();
1333 public override int Read(byte[] buffer, int offset, int count)
1335 Contract.Ensures(Contract.Result<int>() >= 0);
1336 Contract.Ensures(Contract.Result<int>() <= count);
1337 throw new NotImplementedException();
1340 public override void Write(byte[] buffer, int offset, int count)
1342 throw new NotImplementedException();
1345 public override long Position {
1347 Contract.Ensures(Contract.Result<long>() >= 0);
1348 throw new NotImplementedException();
1351 throw new NotImplementedException();
1355 public override void Flush()
1357 throw new NotImplementedException();
1360 public override bool CanRead {
1361 get { throw new NotImplementedException(); }
1364 public override bool CanWrite {
1365 get { throw new NotImplementedException(); }
1368 public override bool CanSeek {
1369 get { throw new NotImplementedException(); }
1372 public override long Length
1375 Contract.Ensures(Contract.Result<long>() >= 0);
1376 throw new NotImplementedException();
1380 #endif // CONTRACTS_FULL