3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 /*============================================================
10 ** <OWNER>gpaperin</OWNER>
13 ** Purpose: Abstract base class for all Streams. Provides
14 ** default implementations of asynchronous reads & writes, in
15 ** terms of the synchronous reads & writes (and vice versa).
18 ===========================================================*/
20 using System.Threading;
22 using System.Threading.Tasks;
26 using System.Runtime.InteropServices;
27 #if NEW_EXPERIMENTAL_ASYNC_IO
28 using System.Runtime.CompilerServices;
30 using System.Runtime.ExceptionServices;
31 using System.Security;
32 using System.Security.Permissions;
33 using System.Diagnostics.Contracts;
34 using System.Reflection;
40 [ContractClass(typeof(StreamContract))]
42 #if FEATURE_REMOTING || MONO
43 public abstract partial class Stream : MarshalByRefObject, IDisposable {
44 #else // FEATURE_REMOTING
45 public abstract class Stream : IDisposable {
46 #endif // FEATURE_REMOTING
48 public static readonly Stream Null = new NullStream();
50 //We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
51 // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
52 // improvement in Copy performance.
53 private const int _DefaultCopyBufferSize = 81920;
55 #if NEW_EXPERIMENTAL_ASYNC_IO
56 // To implement Async IO operations on streams that don't support async IO
59 private ReadWriteTask _activeReadWriteTask;
61 private SemaphoreSlim _asyncActiveSemaphore;
63 internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
65 // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
66 // WaitHandle, we don't need to worry about Disposing it.
67 return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1));
71 public abstract bool CanRead {
76 // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
77 public abstract bool CanSeek {
83 public virtual bool CanTimeout {
90 public abstract bool CanWrite {
95 public abstract long Length {
99 public abstract long Position {
105 public virtual int ReadTimeout {
107 Contract.Ensures(Contract.Result<int>() >= 0);
108 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
111 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
116 public virtual int WriteTimeout {
118 Contract.Ensures(Contract.Result<int>() >= 0);
119 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
122 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
127 [HostProtection(ExternalThreading = true)]
129 public Task CopyToAsync(Stream destination)
131 return CopyToAsync(destination, _DefaultCopyBufferSize);
134 [HostProtection(ExternalThreading = true)]
136 public Task CopyToAsync(Stream destination, Int32 bufferSize)
138 return CopyToAsync(destination, bufferSize, CancellationToken.None);
141 [HostProtection(ExternalThreading = true)]
143 public virtual Task CopyToAsync(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
145 if (destination == null)
146 throw new ArgumentNullException("destination");
148 throw new ArgumentOutOfRangeException("bufferSize", Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
149 if (!CanRead && !CanWrite)
150 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
151 if (!destination.CanRead && !destination.CanWrite)
152 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
154 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
155 if (!destination.CanWrite)
156 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
157 Contract.EndContractBlock();
159 return CopyToAsyncInternal(destination, bufferSize, cancellationToken);
162 private async Task CopyToAsyncInternal(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
164 Contract.Requires(destination != null);
165 Contract.Requires(bufferSize > 0);
166 Contract.Requires(CanRead);
167 Contract.Requires(destination.CanWrite);
169 byte[] buffer = new byte[bufferSize];
171 while ((bytesRead = await ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
173 await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
176 #endif // FEATURE_ASYNC_IO
178 // Reads the bytes from the current stream and writes the bytes to
179 // the destination stream until all bytes are read, starting at
180 // the current position.
181 public void CopyTo(Stream destination)
183 if (destination == null)
184 throw new ArgumentNullException("destination");
185 if (!CanRead && !CanWrite)
186 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
187 if (!destination.CanRead && !destination.CanWrite)
188 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
190 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
191 if (!destination.CanWrite)
192 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
193 Contract.EndContractBlock();
195 InternalCopyTo(destination, _DefaultCopyBufferSize);
201 public void CopyTo(Stream destination, int bufferSize)
203 if (destination == null)
204 throw new ArgumentNullException("destination");
206 throw new ArgumentOutOfRangeException("bufferSize",
207 Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
208 if (!CanRead && !CanWrite)
209 throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
210 if (!destination.CanRead && !destination.CanWrite)
211 throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
213 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
214 if (!destination.CanWrite)
215 throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
216 Contract.EndContractBlock();
218 InternalCopyTo(destination, bufferSize);
221 private void InternalCopyTo(Stream destination, int bufferSize)
223 Contract.Requires(destination != null);
224 Contract.Requires(CanRead);
225 Contract.Requires(destination.CanWrite);
226 Contract.Requires(bufferSize > 0);
228 byte[] buffer = new byte[bufferSize];
230 while ((read = Read(buffer, 0, buffer.Length)) != 0)
231 destination.Write(buffer, 0, read);
235 // Stream used to require that all cleanup logic went into Close(),
236 // which was thought up before we invented IDisposable. However, we
237 // need to follow the IDisposable pattern so that users can write
238 // sensible subclasses without needing to inspect all their base
239 // classes, and without worrying about version brittleness, from a
240 // base class switching to the Dispose pattern. We're moving
241 // Stream to the Dispose(bool) pattern - that's where all subclasses
242 // should put their cleanup starting in V2.
243 public virtual void Close()
245 /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
246 Contract.Ensures(CanRead == false);
247 Contract.Ensures(CanWrite == false);
248 Contract.Ensures(CanSeek == false);
252 GC.SuppressFinalize(this);
255 public void Dispose()
257 /* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
258 Contract.Ensures(CanRead == false);
259 Contract.Ensures(CanWrite == false);
260 Contract.Ensures(CanSeek == false);
267 protected virtual void Dispose(bool disposing)
269 // Note: Never change this to call other virtual methods on Stream
270 // like Write, since the state on subclasses has already been
271 // torn down. This is the last code to run on cleanup for a stream.
274 public abstract void Flush();
277 [HostProtection(ExternalThreading=true)]
279 public Task FlushAsync()
281 return FlushAsync(CancellationToken.None);
284 [HostProtection(ExternalThreading=true)]
286 public virtual Task FlushAsync(CancellationToken cancellationToken)
288 return Task.Factory.StartNew(state => ((Stream)state).Flush(), this,
289 cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
291 #endif // FEATURE_ASYNC_IO
293 [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
294 protected virtual WaitHandle CreateWaitHandle()
296 Contract.Ensures(Contract.Result<WaitHandle>() != null);
297 return new ManualResetEvent(false);
300 [HostProtection(ExternalThreading=true)]
301 public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
303 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
304 return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
307 [HostProtection(ExternalThreading = true)]
308 internal IAsyncResult BeginReadInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
310 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
311 if (!CanRead) __Error.ReadNotSupported();
313 #if !NEW_EXPERIMENTAL_ASYNC_IO
314 return BlockingBeginRead(buffer, offset, count, callback, state);
317 // Mango did not do Async IO.
318 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
320 return BlockingBeginRead(buffer, offset, count, callback, state);
323 // To avoid a race with a stream's position pointer & generating ----
324 // conditions with internal buffer indexes in our own streams that
325 // don't natively support async IO operations when there are multiple
326 // async requests outstanding, we will block the application's main
327 // thread if it does a second IO request until the first one completes.
328 var semaphore = EnsureAsyncActiveSemaphoreInitialized();
329 Task semaphoreTask = null;
330 if (serializeAsynchronously)
332 semaphoreTask = semaphore.WaitAsync();
339 // Create the task to asynchronously do a Read. This task serves both
340 // as the asynchronous work item and as the IAsyncResult returned to the user.
341 var asyncResult = new ReadWriteTask(true /*isRead*/, delegate
343 // The ReadWriteTask stores all of the parameters to pass to Read.
344 // As we're currently inside of it, we can get the current task
345 // and grab the parameters from it.
346 var thisTask = Task.InternalCurrent as ReadWriteTask;
347 Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
349 // Do the Read and return the number of bytes read
350 var bytesRead = thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
351 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
353 }, state, this, buffer, offset, count, callback);
356 if (semaphoreTask != null)
357 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
359 RunReadWriteTask(asyncResult);
362 return asyncResult; // return it
366 public virtual int EndRead(IAsyncResult asyncResult)
368 if (asyncResult == null)
369 throw new ArgumentNullException("asyncResult");
370 Contract.Ensures(Contract.Result<int>() >= 0);
371 Contract.EndContractBlock();
373 #if !NEW_EXPERIMENTAL_ASYNC_IO
374 return BlockingEndRead(asyncResult);
376 // Mango did not do async IO.
377 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
379 return BlockingEndRead(asyncResult);
382 var readTask = _activeReadWriteTask;
384 if (readTask == null)
386 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
388 else if (readTask != asyncResult)
390 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
392 else if (!readTask._isRead)
394 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
399 return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
403 _activeReadWriteTask = null;
404 Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
405 _asyncActiveSemaphore.Release();
411 [HostProtection(ExternalThreading = true)]
413 public Task<int> ReadAsync(Byte[] buffer, int offset, int count)
415 return ReadAsync(buffer, offset, count, CancellationToken.None);
418 [HostProtection(ExternalThreading = true)]
420 public virtual Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
422 // If cancellation was requested, bail early with an already completed task.
423 // Otherwise, return a task that represents the Begin/End methods.
424 return cancellationToken.IsCancellationRequested
425 ? Task.FromCancellation<int>(cancellationToken)
426 : BeginEndReadAsync(buffer, offset, count);
429 private Task<Int32> BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
431 return TaskFactory<Int32>.FromAsyncTrim(
432 this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
433 (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
434 (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
437 private struct ReadWriteParameters // struct for arguments to Read and Write calls
439 internal byte[] Buffer;
443 #endif //FEATURE_ASYNC_IO
447 [HostProtection(ExternalThreading=true)]
448 public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
450 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
451 return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
454 [HostProtection(ExternalThreading = true)]
455 internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
457 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
458 if (!CanWrite) __Error.WriteNotSupported();
459 #if !NEW_EXPERIMENTAL_ASYNC_IO
460 return BlockingBeginWrite(buffer, offset, count, callback, state);
463 // Mango did not do Async IO.
464 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
466 return BlockingBeginWrite(buffer, offset, count, callback, state);
469 // To avoid a race with a stream's position pointer & generating ----
470 // conditions with internal buffer indexes in our own streams that
471 // don't natively support async IO operations when there are multiple
472 // async requests outstanding, we will block the application's main
473 // thread if it does a second IO request until the first one completes.
474 var semaphore = EnsureAsyncActiveSemaphoreInitialized();
475 Task semaphoreTask = null;
476 if (serializeAsynchronously)
478 semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
482 semaphore.Wait(); // synchronously wait here
485 // Create the task to asynchronously do a Write. This task serves both
486 // as the asynchronous work item and as the IAsyncResult returned to the user.
487 var asyncResult = new ReadWriteTask(false /*isRead*/, delegate
489 // The ReadWriteTask stores all of the parameters to pass to Write.
490 // As we're currently inside of it, we can get the current task
491 // and grab the parameters from it.
492 var thisTask = Task.InternalCurrent as ReadWriteTask;
493 Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
496 thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
497 thisTask.ClearBeginState(); // just to help alleviate some memory pressure
498 return 0; // not used, but signature requires a value be returned
499 }, state, this, buffer, offset, count, callback);
502 if (semaphoreTask != null)
503 RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
505 RunReadWriteTask(asyncResult);
507 return asyncResult; // return it
511 #if NEW_EXPERIMENTAL_ASYNC_IO
512 private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask)
514 Contract.Assert(readWriteTask != null); // Should be Contract.Requires, but CCRewrite is doing a poor job with
515 // preconditions in async methods that await. Mike & Manuel are aware. (10/6/2011, bug 290222)
516 Contract.Assert(asyncWaiter != null); // Ditto
518 // If the wait has already complete, run the task.
519 if (asyncWaiter.IsCompleted)
521 Contract.Assert(asyncWaiter.IsRanToCompletion, "The semaphore wait should always complete successfully.");
522 RunReadWriteTask(readWriteTask);
524 else // Otherwise, wait for our turn, and then run the task.
526 asyncWaiter.ContinueWith((t, state) =>
528 Contract.Assert(t.IsRanToCompletion, "The semaphore wait should always complete successfully.");
529 var tuple = (Tuple<Stream,ReadWriteTask>)state;
530 tuple.Item1.RunReadWriteTask(tuple.Item2); // RunReadWriteTask(readWriteTask);
531 }, Tuple.Create<Stream,ReadWriteTask>(this, readWriteTask),
532 default(CancellationToken),
533 TaskContinuationOptions.ExecuteSynchronously,
534 TaskScheduler.Default);
538 private void RunReadWriteTask(ReadWriteTask readWriteTask)
540 Contract.Requires(readWriteTask != null);
541 Contract.Assert(_activeReadWriteTask == null, "Expected no other readers or writers");
543 // Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
544 // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
545 // two interlocked operations. However, if ReadWriteTask is ever changed to use
546 // a cancellation token, this should be changed to use Start.
547 _activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one
548 readWriteTask.m_taskScheduler = TaskScheduler.Default;
549 readWriteTask.ScheduleAndStart(needsProtection: false);
553 public virtual void EndWrite(IAsyncResult asyncResult)
555 if (asyncResult==null)
556 throw new ArgumentNullException("asyncResult");
557 Contract.EndContractBlock();
559 #if !NEW_EXPERIMENTAL_ASYNC_IO
560 BlockingEndWrite(asyncResult);
563 // Mango did not do Async IO.
564 if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
566 BlockingEndWrite(asyncResult);
570 var writeTask = _activeReadWriteTask;
571 if (writeTask == null)
573 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
575 else if (writeTask != asyncResult)
577 throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
579 else if (writeTask._isRead)
581 throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
586 writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
587 Contract.Assert(writeTask.Status == TaskStatus.RanToCompletion);
591 _activeReadWriteTask = null;
592 Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
593 _asyncActiveSemaphore.Release();
598 #if NEW_EXPERIMENTAL_ASYNC_IO
599 // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
600 // A single instance of this task serves four purposes:
601 // 1. The work item scheduled to run the Read / Write operation
602 // 2. The state holding the arguments to be passed to Read / Write
603 // 3. The IAsyncResult returned from BeginRead / BeginWrite
604 // 4. The completion action that runs to invoke the user-provided callback.
605 // This last item is a bit tricky. Before the AsyncCallback is invoked, the
606 // IAsyncResult must have completed, so we can't just invoke the handler
607 // from within the task, since it is the IAsyncResult, and thus it's not
608 // yet completed. Instead, we use AddCompletionAction to install this
609 // task as its own completion handler. That saves the need to allocate
610 // a separate completion handler, it guarantees that the task will
611 // have completed by the time the handler is invoked, and it allows
612 // the handler to be invoked synchronously upon the completion of the
613 // task. This all enables BeginRead / BeginWrite to be implemented
614 // with a single allocation.
615 private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
617 internal readonly bool _isRead;
618 internal Stream _stream;
619 internal byte [] _buffer;
620 internal int _offset;
622 private AsyncCallback _callback;
623 private ExecutionContext _context;
625 internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
631 [SecuritySafeCritical] // necessary for EC.Capture
632 [MethodImpl(MethodImplOptions.NoInlining)]
633 public ReadWriteTask(
635 Func<object,int> function, object state,
636 Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback) :
637 base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
639 Contract.Requires(function != null);
640 Contract.Requires(stream != null);
641 Contract.Requires(buffer != null);
642 Contract.EndContractBlock();
644 StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
646 // Store the arguments
653 // If a callback was provided, we need to:
654 // - Store the user-provided handler
655 // - Capture an ExecutionContext under which to invoke the handler
656 // - Add this task as its own completion handler so that the Invoke method
657 // will run the callback when this task completes.
658 if (callback != null)
660 _callback = callback;
661 _context = ExecutionContext.Capture(ref stackMark,
662 ExecutionContext.CaptureOptions.OptimizeDefaultCase | ExecutionContext.CaptureOptions.IgnoreSyncCtx);
663 base.AddCompletionAction(this);
667 [SecurityCritical] // necessary for CoreCLR
668 private static void InvokeAsyncCallback(object completedTask)
670 var rwc = (ReadWriteTask)completedTask;
671 var callback = rwc._callback;
672 rwc._callback = null;
676 [SecurityCritical] // necessary for CoreCLR
677 private static ContextCallback s_invokeAsyncCallback;
679 [SecuritySafeCritical] // necessary for ExecutionContext.Run
680 void ITaskCompletionAction.Invoke(Task completingTask)
682 // Get the ExecutionContext. If there is none, just run the callback
683 // directly, passing in the completed task as the IAsyncResult.
684 // If there is one, process it with ExecutionContext.Run.
685 var context = _context;
688 var callback = _callback;
690 callback(completingTask);
696 var invokeAsyncCallback = s_invokeAsyncCallback;
697 if (invokeAsyncCallback == null) s_invokeAsyncCallback = invokeAsyncCallback = InvokeAsyncCallback; // benign ----
699 using(context) ExecutionContext.Run(context, invokeAsyncCallback, this, true);
706 [HostProtection(ExternalThreading = true)]
708 public Task WriteAsync(Byte[] buffer, int offset, int count)
710 return WriteAsync(buffer, offset, count, CancellationToken.None);
713 [HostProtection(ExternalThreading = true)]
715 public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
717 // If cancellation was requested, bail early with an already completed task.
718 // Otherwise, return a task that represents the Begin/End methods.
719 return cancellationToken.IsCancellationRequested
720 ? Task.FromCancellation(cancellationToken)
721 : BeginEndWriteAsync(buffer, offset, count);
725 private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count)
727 return TaskFactory<VoidTaskResult>.FromAsyncTrim(
728 this, new ReadWriteParameters { Buffer=buffer, Offset=offset, Count=count },
729 (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
730 (stream, asyncResult) => // cached by compiler
732 stream.EndWrite(asyncResult);
733 return default(VoidTaskResult);
736 #endif // FEATURE_ASYNC_IO
738 public abstract long Seek(long offset, SeekOrigin origin);
740 public abstract void SetLength(long value);
742 public abstract int Read([In, Out] byte[] buffer, int offset, int count);
744 // Reads one byte from the stream by calling Read(byte[], int, int).
745 // Will return an unsigned byte cast to an int or -1 on end of stream.
746 // This implementation does not perform well because it allocates a new
747 // byte[] each time you call it, and should be overridden by any
748 // subclass that maintains an internal buffer. Then, it can help perf
749 // significantly for people who are reading one byte at a time.
750 public virtual int ReadByte()
752 Contract.Ensures(Contract.Result<int>() >= -1);
753 Contract.Ensures(Contract.Result<int>() < 256);
755 byte[] oneByteArray = new byte[1];
756 int r = Read(oneByteArray, 0, 1);
759 return oneByteArray[0];
762 public abstract void Write(byte[] buffer, int offset, int count);
764 // Writes one byte from the stream by calling Write(byte[], int, int).
765 // This implementation does not perform well because it allocates a new
766 // byte[] each time you call it, and should be overridden by any
767 // subclass that maintains an internal buffer. Then, it can help perf
768 // significantly for people who are writing one byte at a time.
769 public virtual void WriteByte(byte value)
771 byte[] oneByteArray = new byte[1];
772 oneByteArray[0] = value;
773 Write(oneByteArray, 0, 1);
776 [HostProtection(Synchronization=true)]
777 public static Stream Synchronized(Stream stream)
780 throw new ArgumentNullException("stream");
781 Contract.Ensures(Contract.Result<Stream>() != null);
782 Contract.EndContractBlock();
783 if (stream is SyncStream)
786 return new SyncStream(stream);
789 #if !FEATURE_PAL || MONO // This method shouldn't have been exposed in Dev10 (we revised object invariants after locking down).
790 [Obsolete("Do not call or override this method.")]
791 protected virtual void ObjectInvariant()
796 internal IAsyncResult BlockingBeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
798 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
800 // To avoid a race with a stream's position pointer & generating ----
801 // conditions with internal buffer indexes in our own streams that
802 // don't natively support async IO operations when there are multiple
803 // async requests outstanding, we will block the application's main
804 // thread and do the IO synchronously.
805 // This can't perform well - use a different approach.
806 SynchronousAsyncResult asyncResult;
808 int numRead = Read(buffer, offset, count);
809 asyncResult = new SynchronousAsyncResult(numRead, state);
811 catch (IOException ex) {
812 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: false);
815 if (callback != null) {
816 callback(asyncResult);
822 internal static int BlockingEndRead(IAsyncResult asyncResult)
824 Contract.Ensures(Contract.Result<int>() >= 0);
826 return SynchronousAsyncResult.EndRead(asyncResult);
829 internal IAsyncResult BlockingBeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
831 Contract.Ensures(Contract.Result<IAsyncResult>() != null);
833 // To avoid a race with a stream's position pointer & generating ----
834 // conditions with internal buffer indexes in our own streams that
835 // don't natively support async IO operations when there are multiple
836 // async requests outstanding, we will block the application's main
837 // thread and do the IO synchronously.
838 // This can't perform well - use a different approach.
839 SynchronousAsyncResult asyncResult;
841 Write(buffer, offset, count);
842 asyncResult = new SynchronousAsyncResult(state);
844 catch (IOException ex) {
845 asyncResult = new SynchronousAsyncResult(ex, state, isWrite: true);
848 if (callback != null) {
849 callback(asyncResult);
855 internal static void BlockingEndWrite(IAsyncResult asyncResult)
857 SynchronousAsyncResult.EndWrite(asyncResult);
861 private sealed class NullStream : Stream
863 internal NullStream() {}
865 public override bool CanRead {
870 public override bool CanWrite {
875 public override bool CanSeek {
880 public override long Length {
884 public override long Position {
889 protected override void Dispose(bool disposing)
891 // Do nothing - we don't want NullStream singleton (static) to be closable
894 public override void Flush()
900 public override Task FlushAsync(CancellationToken cancellationToken)
902 return cancellationToken.IsCancellationRequested ?
903 Task.FromCancellation(cancellationToken) :
906 #endif // FEATURE_ASYNC_IO
908 [HostProtection(ExternalThreading = true)]
909 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
911 if (!CanRead) __Error.ReadNotSupported();
913 return BlockingBeginRead(buffer, offset, count, callback, state);
916 public override int EndRead(IAsyncResult asyncResult)
918 if (asyncResult == null)
919 throw new ArgumentNullException("asyncResult");
920 Contract.EndContractBlock();
922 return BlockingEndRead(asyncResult);
925 [HostProtection(ExternalThreading = true)]
926 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
928 if (!CanWrite) __Error.WriteNotSupported();
930 return BlockingBeginWrite(buffer, offset, count, callback, state);
933 public override void EndWrite(IAsyncResult asyncResult)
935 if (asyncResult == null)
936 throw new ArgumentNullException("asyncResult");
937 Contract.EndContractBlock();
939 BlockingEndWrite(asyncResult);
942 public override int Read([In, Out] byte[] buffer, int offset, int count)
949 public override Task<int> ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
951 var nullReadTask = s_nullReadTask;
952 if (nullReadTask == null)
953 s_nullReadTask = nullReadTask = new Task<int>(false, 0, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, CancellationToken.None); // benign ----
956 private static Task<int> s_nullReadTask;
957 #endif //FEATURE_ASYNC_IO
959 public override int ReadByte()
964 public override void Write(byte[] buffer, int offset, int count)
970 public override Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
972 return cancellationToken.IsCancellationRequested ?
973 Task.FromCancellation(cancellationToken) :
976 #endif // FEATURE_ASYNC_IO
978 public override void WriteByte(byte value)
982 public override long Seek(long offset, SeekOrigin origin)
987 public override void SetLength(long length)
993 /// <summary>Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class.</summary>
994 internal sealed class SynchronousAsyncResult : IAsyncResult {
996 private readonly Object _stateObject;
997 private readonly bool _isWrite;
998 private ManualResetEvent _waitHandle;
999 private ExceptionDispatchInfo _exceptionInfo;
1001 private bool _endXxxCalled;
1002 private Int32 _bytesRead;
1004 internal SynchronousAsyncResult(Int32 bytesRead, Object asyncStateObject) {
1005 _bytesRead = bytesRead;
1006 _stateObject = asyncStateObject;
1010 internal SynchronousAsyncResult(Object asyncStateObject) {
1011 _stateObject = asyncStateObject;
1015 internal SynchronousAsyncResult(Exception ex, Object asyncStateObject, bool isWrite) {
1016 _exceptionInfo = ExceptionDispatchInfo.Capture(ex);
1017 _stateObject = asyncStateObject;
1021 public bool IsCompleted {
1022 // We never hand out objects of this type to the user before the synchronous IO completed:
1023 get { return true; }
1026 public WaitHandle AsyncWaitHandle {
1028 return LazyInitializer.EnsureInitialized(ref _waitHandle, () => new ManualResetEvent(true));
1032 public Object AsyncState {
1033 get { return _stateObject; }
1036 public bool CompletedSynchronously {
1037 get { return true; }
1040 internal void ThrowIfError() {
1041 if (_exceptionInfo != null)
1042 _exceptionInfo.Throw();
1045 internal static Int32 EndRead(IAsyncResult asyncResult) {
1047 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1048 if (ar == null || ar._isWrite)
1049 __Error.WrongAsyncResult();
1051 if (ar._endXxxCalled)
1052 __Error.EndReadCalledTwice();
1054 ar._endXxxCalled = true;
1057 return ar._bytesRead;
1060 internal static void EndWrite(IAsyncResult asyncResult) {
1062 SynchronousAsyncResult ar = asyncResult as SynchronousAsyncResult;
1063 if (ar == null || !ar._isWrite)
1064 __Error.WrongAsyncResult();
1066 if (ar._endXxxCalled)
1067 __Error.EndWriteCalledTwice();
1069 ar._endXxxCalled = true;
1073 } // class SynchronousAsyncResult
1076 // SyncStream is a wrapper around a stream that takes
1077 // a lock for every operation making it thread safe.
1079 internal sealed class SyncStream : Stream, IDisposable
1081 private Stream _stream;
1083 private bool? _overridesBeginRead;
1085 private bool? _overridesBeginWrite;
1087 internal SyncStream(Stream stream)
1090 throw new ArgumentNullException("stream");
1091 Contract.EndContractBlock();
1095 public override bool CanRead {
1097 get { return _stream.CanRead; }
1100 public override bool CanWrite {
1102 get { return _stream.CanWrite; }
1105 public override bool CanSeek {
1107 get { return _stream.CanSeek; }
1111 public override bool CanTimeout {
1114 return _stream.CanTimeout;
1118 public override long Length {
1121 return _stream.Length;
1126 public override long Position {
1129 return _stream.Position;
1134 _stream.Position = value;
1140 public override int ReadTimeout {
1142 return _stream.ReadTimeout;
1145 _stream.ReadTimeout = value;
1150 public override int WriteTimeout {
1152 return _stream.WriteTimeout;
1155 _stream.WriteTimeout = value;
1159 // In the off chance that some wrapped stream has different
1160 // semantics for Close vs. Dispose, let's preserve that.
1161 public override void Close()
1173 protected override void Dispose(bool disposing)
1177 // Explicitly pick up a potentially methodimpl'ed Dispose
1179 ((IDisposable)_stream).Dispose();
1182 base.Dispose(disposing);
1187 public override void Flush()
1193 public override int Read([In, Out]byte[] bytes, int offset, int count)
1196 return _stream.Read(bytes, offset, count);
1199 public override int ReadByte()
1202 return _stream.ReadByte();
1205 private static bool OverridesBeginMethod(Stream stream, string methodName)
1207 Contract.Requires(stream != null, "Expected a non-null stream.");
1208 Contract.Requires(methodName == "BeginRead" || methodName == "BeginWrite",
1209 "Expected BeginRead or BeginWrite as the method name to check.");
1211 // Get all of the methods on the underlying stream
1212 var methods = stream.GetType().GetMethods(BindingFlags.Public | BindingFlags.Instance);
1214 // If any of the methods have the desired name and are defined on the base Stream
1215 // Type, then the method was not overridden. If none of them were defined on the
1216 // base Stream, then it must have been overridden.
1217 foreach (var method in methods)
1219 if (method.DeclaringType == typeof(Stream) &&
1220 method.Name == methodName)
1228 [HostProtection(ExternalThreading=true)]
1229 public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1231 // Lazily-initialize whether the wrapped stream overrides BeginRead
1232 if (_overridesBeginRead == null)
1234 _overridesBeginRead = OverridesBeginMethod(_stream, "BeginRead");
1239 // If the Stream does have its own BeginRead implementation, then we must use that override.
1240 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1241 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1242 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1243 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1244 // _stream due to this call blocked while holding the lock.
1245 return _overridesBeginRead.Value ?
1246 _stream.BeginRead(buffer, offset, count, callback, state) :
1247 _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
1251 public override int EndRead(IAsyncResult asyncResult)
1253 if (asyncResult == null)
1254 throw new ArgumentNullException("asyncResult");
1255 Contract.Ensures(Contract.Result<int>() >= 0);
1256 Contract.EndContractBlock();
1259 return _stream.EndRead(asyncResult);
1262 public override long Seek(long offset, SeekOrigin origin)
1265 return _stream.Seek(offset, origin);
1268 public override void SetLength(long length)
1271 _stream.SetLength(length);
1274 public override void Write(byte[] bytes, int offset, int count)
1277 _stream.Write(bytes, offset, count);
1280 public override void WriteByte(byte b)
1283 _stream.WriteByte(b);
1286 [HostProtection(ExternalThreading=true)]
1287 public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
1289 // Lazily-initialize whether the wrapped stream overrides BeginWrite
1290 if (_overridesBeginWrite == null)
1292 _overridesBeginWrite = OverridesBeginMethod(_stream, "BeginWrite");
1297 // If the Stream does have its own BeginWrite implementation, then we must use that override.
1298 // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
1299 // which ensures only one asynchronous operation does so with an asynchronous wait rather
1300 // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
1301 // the EndXx method for the outstanding async operation won't be able to acquire the lock on
1302 // _stream due to this call blocked while holding the lock.
1303 return _overridesBeginWrite.Value ?
1304 _stream.BeginWrite(buffer, offset, count, callback, state) :
1305 _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true);
1309 public override void EndWrite(IAsyncResult asyncResult)
1311 if (asyncResult == null)
1312 throw new ArgumentNullException("asyncResult");
1313 Contract.EndContractBlock();
1316 _stream.EndWrite(asyncResult);
1322 [ContractClassFor(typeof(Stream))]
1323 internal abstract class StreamContract : Stream
1325 public override long Seek(long offset, SeekOrigin origin)
1327 Contract.Ensures(Contract.Result<long>() >= 0);
1328 throw new NotImplementedException();
1331 public override void SetLength(long value)
1333 throw new NotImplementedException();
1336 public override int Read(byte[] buffer, int offset, int count)
1338 Contract.Ensures(Contract.Result<int>() >= 0);
1339 Contract.Ensures(Contract.Result<int>() <= count);
1340 throw new NotImplementedException();
1343 public override void Write(byte[] buffer, int offset, int count)
1345 throw new NotImplementedException();
1348 public override long Position {
1350 Contract.Ensures(Contract.Result<long>() >= 0);
1351 throw new NotImplementedException();
1354 throw new NotImplementedException();
1358 public override void Flush()
1360 throw new NotImplementedException();
1363 public override bool CanRead {
1364 get { throw new NotImplementedException(); }
1367 public override bool CanWrite {
1368 get { throw new NotImplementedException(); }
1371 public override bool CanSeek {
1372 get { throw new NotImplementedException(); }
1375 public override long Length
1378 Contract.Ensures(Contract.Result<long>() >= 0);
1379 throw new NotImplementedException();
1383 #endif // CONTRACTS_FULL