3 // Copyright (c) Microsoft Corporation. All rights reserved.
6 /*============================================================
8 ** Class: BufferedStream2
11 ===========================================================*/
13 using System.Diagnostics;
14 using System.Globalization;
15 using System.Runtime.Versioning;
16 using System.Threading;
17 using System.Runtime.InteropServices;
18 using System.Runtime.Remoting.Messaging;
19 using System.Runtime.CompilerServices;
21 using Microsoft.Win32;
22 using Microsoft.Win32.SafeHandles;
23 using System.Security.Permissions;
27 // This abstract implementation adds thread safe buffering on top
28 // of the underlying stream. For most streams, having this intermediate
29 // buffer translates to better performance due to the costly nature of
30 // underlying IO, P/Invoke (such as disk IO). This also improves the locking
31 // efficiency when operating under heavy concurrency. The synchronization
32 // technique used in this implementation is specifically optimized for IO
35 // The main differences between this implementation and the existing System.IO.BufferedStream
36 // - the design allows for inheritance as opposed to wrapping streams
37 // - it is thread safe, though currently only synchronous Write is optimized
40 [HostProtection(Synchronization=true)]
41 internal abstract class BufferedStream2 : Stream
43 protected internal const int DefaultBufferSize = 32*1024; //32KB or 64KB seems to give the best throughput
45 protected int bufferSize; // Length of internal buffer, if it's allocated.
46 private byte[] _buffer; // Internal buffer. Alloc on first use.
48 // At present only concurrent buffer writing is optimized implicitly
49 // while reading relies on explicit locking.
51 // Ideally we want these fields to be volatile
52 private /*volatile*/ int _pendingBufferCopy; // How many buffer writes are pending.
53 private /*volatile*/ int _writePos; // Write pointer within shared buffer.
55 // Should we use a separate buffer for reading Vs writing?
56 private /*volatile*/ int _readPos; // Read pointer within shared buffer.
57 private /*volatile*/ int _readLen; // Number of bytes read in buffer from file.
59 // Ideally we want this field to be volatile but Interlocked operations
60 // on 64bit int is not guaranteed to be atomic especially on 32bit platforms
61 protected long pos; // Cache current location in the underlying stream.
63 // Derived streams should override CanRead/CanWrite/CanSeek to enable/disable functionality as desired
65 [ResourceExposure(ResourceScope.None)]
66 [ResourceConsumption(ResourceScope.AppDomain, ResourceScope.AppDomain)]
67 public override void Write(byte[] array, int offset, int count)
70 throw new ArgumentNullException("array", SR.GetString(SR.ArgumentNull_Buffer));
72 throw new ArgumentOutOfRangeException("offset", SR.GetString(SR.ArgumentOutOfRange_NeedNonNegNum));
74 throw new ArgumentOutOfRangeException("count", SR.GetString(SR.ArgumentOutOfRange_NeedNonNegNum));
75 if (array.Length - offset < count)
76 throw new ArgumentException(SR.GetString(SR.Argument_InvalidOffLen));
78 Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
81 // Ensure we can write to the stream, and ready buffer for writing.
82 if (!CanWrite) __Error.WriteNotSupported();
83 if (_readPos < _readLen) FlushRead();
89 // Don't allocate a buffer then call memcpy for 0 bytes.
94 // Avoid contention around spilling over the buffer, the locking mechanism here is bit unconventional.
95 // Let's call this a YieldLock. It is closer to a spin lock than a semaphore but not quite a spin lock.
96 // Forced thread context switching is better than a tight spin lock here for several reasons.
97 // We utilize less CPU, yield to other threads (potentially the one doing the write, this is
98 // especially important under heavy thread/processor contention environment) and also yield to
99 // runtime thread aborts (important when run from a high pri thread like finalizer).
100 if (_writePos > bufferSize) {
106 // For input chunk larger than internal buffer size, write directly
107 // It is okay to have a ---- here with the _writePos check, which means
108 // we have a loose order between flushing the intenal cache Vs writing
109 // this larger chunk but that is fine. This step will nicely optimize
110 // repeated writing of larger chunks by skipping the interlocked operation
111 if ((_writePos == 0) && (count >= bufferSize)) {
112 WriteCore(array, offset, count, true);
116 // We should review whether we need critical region markers for hosts.
117 Thread.BeginCriticalRegion();
119 Interlocked.Increment(ref _pendingBufferCopy);
120 int newPos = Interlocked.Add(ref _writePos, count);
121 int oldPos = (newPos - count);
124 if (newPos > bufferSize) {
125 Interlocked.Decrement(ref _pendingBufferCopy);
126 Thread.EndCriticalRegion();
128 // Though the lock below is not necessary for correctness, when operating in a heavy
129 // thread contention environment, augmenting the YieldLock techinique with a critical
130 // section around write seems to be giving slightly better performance while
131 // not having noticable impact in the less contended situations.
132 // Perhaps we can build a technique that keeps track of the contention?
135 // Make sure we didn't get pre-empted by another thread
136 if (_writePos > bufferSize) {
137 if ((oldPos <= bufferSize) && (oldPos > 0)) {
138 while (_pendingBufferCopy != 0) {
141 WriteCore(_buffer, 0, oldPos, true);
149 Interlocked.CompareExchange(ref _buffer, new byte[bufferSize], null);
151 // Copy user data into buffer, to write at a later date.
152 Buffer.BlockCopy(array, offset, _buffer, oldPos, count);
154 Interlocked.Decrement(ref _pendingBufferCopy);
155 Thread.EndCriticalRegion();
162 #if _ENABLE_STREAM_FACTORING
163 public override long Position
165 // Making the getter thread safe is not very useful anyways
167 if (!CanSeek) __Error.SeekNotSupported();
168 Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
170 // Compensate for buffer that we read from the handle (_readLen) Vs what the user
171 // read so far from the internel buffer (_readPos). Of course add any unwrittern
173 return pos + (_readPos - _readLen + _writePos);
176 [MethodImplAttribute(MethodImplOptions.Synchronized)]
178 if (value < 0) throw new ArgumentOutOfRangeException("value", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
180 if (_writePos > 0) FlushWrite(false);
184 Seek(value, SeekOrigin.Begin);
188 [MethodImplAttribute(MethodImplOptions.Synchronized)]
189 public override int Read(/*[In, Out]*/ byte[] array, int offset, int count)
192 throw new ArgumentNullException("array", Helper.GetResourceString("ArgumentNull_Buffer"));
194 throw new ArgumentOutOfRangeException("offset", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
196 throw new ArgumentOutOfRangeException("count", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
197 if (array.Length - offset < count)
198 throw new ArgumentException(Helper.GetResourceString("Argument_InvalidOffLen"));
200 Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
202 bool isBlocked = false;
203 int n = _readLen - _readPos;
205 // If the read buffer is empty, read into either user's array or our
206 // buffer, depending on number of bytes user asked for and buffer size.
208 if (!CanRead) __Error.ReadNotSupported();
209 if (_writePos > 0) FlushWrite(false);
210 if (!CanSeek || (count >= bufferSize)) {
211 n = ReadCore(array, offset, count);
212 // Throw away read buffer.
217 if (_buffer == null) _buffer = new byte[bufferSize];
218 n = ReadCore(_buffer, 0, bufferSize);
219 if (n == 0) return 0;
220 isBlocked = n < bufferSize;
225 // Now copy min of count or numBytesAvailable (ie, near EOF) to array.
226 if (n > count) n = count;
227 Buffer.BlockCopy(_buffer, _readPos, array, offset, n);
230 // We may have read less than the number of bytes the user asked
231 // for, but that is part of the Stream contract. Reading again for
232 // more data may cause us to block if we're using a device with
233 // no clear end of file, such as a serial port or pipe. If we
234 // blocked here & this code was used with redirected pipes for a
235 // process's standard output, this can lead to deadlocks involving
236 // two processes. But leave this here for files to avoid what would
237 // probably be a breaking change. --
242 [HostProtection(ExternalThreading=true)]
243 [MethodImplAttribute(MethodImplOptions.Synchronized)]
244 public override IAsyncResult BeginRead(byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject)
247 throw new ArgumentNullException("array");
249 throw new ArgumentOutOfRangeException("offset", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
251 throw new ArgumentOutOfRangeException("numBytes", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
252 if (array.Length - offset < numBytes)
253 throw new ArgumentException(Helper.GetResourceString("Argument_InvalidOffLen"));
255 if (!CanRead) __Error.ReadNotSupported();
257 Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
259 if (_writePos > 0) FlushWrite(false);
260 if (_readPos == _readLen) {
261 // I can't see how to handle buffering of async requests when
262 // filling the buffer asynchronously, without a lot of complexity.
263 // The problems I see are issuing an async read, we do an async
264 // read to fill the buffer, then someone issues another read
265 // (either synchronously or asynchronously) before the first one
266 // returns. This would involve some sort of complex buffer locking
267 // that we probably don't want to get into, at least not in V1.
268 // If we did a sync read to fill the buffer, we could avoid the
269 // problem, and any async read less than 64K gets turned into a
270 // synchronous read by NT anyways... --
272 if (numBytes < bufferSize) {
273 if (_buffer == null) _buffer = new byte[bufferSize];
274 IAsyncResult bufferRead = BeginReadCore(_buffer, 0, bufferSize, null, null, 0);
275 _readLen = EndRead(bufferRead);
277 if (n > numBytes) n = numBytes;
278 Buffer.BlockCopy(_buffer, 0, array, offset, n);
282 return BufferedStreamAsyncResult.Complete(n, userCallback, stateObject, false);
285 // Here we're making our position pointer inconsistent
286 // with our read buffer. Throw away the read buffer's contents.
289 return BeginReadCore(array, offset, numBytes, userCallback, stateObject, 0);
292 int n = _readLen - _readPos;
293 if (n > numBytes) n = numBytes;
294 Buffer.BlockCopy(_buffer, _readPos, array, offset, n);
298 return BufferedStreamAsyncResult.Complete(n, userCallback, stateObject, false);
300 // For streams with no clear EOF like serial ports or pipes
301 // we cannot read more data without causing an app to block
302 // incorrectly. Pipes don't go down this path
303 // though. This code needs to be fixed.
304 // Throw away read buffer.
308 // WARNING: all state on asyncResult objects must be set before
309 // we call ReadFile in BeginReadCore, since the OS can run our
310 // callback & the user's callback before ReadFile returns.
311 return BeginReadCore(array, offset + n, numBytes - n, userCallback, stateObject, n);
315 public unsafe override int EndRead(IAsyncResult asyncResult)
317 if (asyncResult == null)
318 throw new ArgumentNullException("asyncResult");
320 BufferedStreamAsyncResult bsar = asyncResult as BufferedStreamAsyncResult;
323 __Error.WrongAsyncResult();
324 return bsar._numBytes;
327 return EndReadCore(asyncResult);
331 [HostProtection(ExternalThreading=true)]
332 [MethodImplAttribute(MethodImplOptions.Synchronized)]
333 public override IAsyncResult BeginWrite(byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject)
336 throw new ArgumentNullException("array");
338 throw new ArgumentOutOfRangeException("offset", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
340 throw new ArgumentOutOfRangeException("numBytes", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
341 if (array.Length - offset < numBytes)
342 throw new ArgumentException(Helper.GetResourceString("Argument_InvalidOffLen"));
344 if (!CanWrite) __Error.WriteNotSupported();
346 Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
349 if (_readPos < _readLen) FlushRead();
354 int n = bufferSize - _writePos;
356 if (_buffer == null) _buffer = new byte[bufferSize];
357 Buffer.BlockCopy(array, offset, _buffer, _writePos, numBytes);
358 _writePos += numBytes;
360 return BufferedStreamAsyncResult.Complete(numBytes, userCallback, stateObject, true);
363 if (_writePos > 0) FlushWrite(false);
364 return BeginWriteCore(array, offset, numBytes, userCallback, stateObject);
367 public unsafe override void EndWrite(IAsyncResult asyncResult)
369 if (asyncResult == null)
370 throw new ArgumentNullException("asyncResult");
372 BufferedStreamAsyncResult bsar = asyncResult as BufferedStreamAsyncResult;
374 EndWriteCore(asyncResult);
377 // Reads a byte from the file stream. Returns the byte cast to an int
378 // or -1 if reading from the end of the stream.
379 [MethodImplAttribute(MethodImplOptions.Synchronized)]
380 public override int ReadByte()
382 if (_readLen == 0 && !CanRead)
383 __Error.ReadNotSupported();
385 Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
387 if (_readPos == _readLen) {
388 if (_writePos > 0) FlushWrite(false);
390 Debug.Assert(bufferSize > 0, "bufferSize > 0");
391 if (_buffer == null) _buffer = new byte[bufferSize];
392 _readLen = ReadCore(_buffer, 0, bufferSize);
396 if (_readPos == _readLen)
399 int result = _buffer[_readPos];
404 [MethodImplAttribute(MethodImplOptions.Synchronized)]
405 public override void WriteByte(byte value)
407 if (_writePos == 0) {
409 __Error.WriteNotSupported();
410 if (_readPos < _readLen)
414 Debug.Assert(bufferSize > 0, "bufferSize > 0");
416 _buffer = new byte[bufferSize];
419 if (_writePos == bufferSize)
422 _buffer[_writePos] = value;
427 [MethodImplAttribute(MethodImplOptions.Synchronized)]
428 public override long Seek(long offset, SeekOrigin origin)
430 if (origin<SeekOrigin.Begin || origin>SeekOrigin.End)
431 throw new ArgumentException(Helper.GetResourceString("Argument_InvalidSeekOrigin"));
433 if (!CanSeek) __Error.SeekNotSupported();
435 Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
437 // If we've got bytes in our buffer to write, write them out.
438 // If we've read in and consumed some bytes, we'll have to adjust
439 // our seek positions ONLY IF we're seeking relative to the current
440 // position in the stream. This simulates doing a seek to the new
441 // position, then a read for the number of bytes we have in our buffer.
445 else if (origin == SeekOrigin.Current) {
446 // Don't call FlushRead here, which would have caused an infinite
447 // loop. Simply adjust the seek origin. This isn't necessary
448 // if we're seeking relative to the beginning or end of the stream.
449 offset -= (_readLen - _readPos);
452 long oldPos = pos + (_readPos - _readLen);
453 long curPos = SeekCore(offset, origin);
455 // We now must update the read buffer. We can in some cases simply
456 // update _readPos within the buffer, copy around the buffer so our
457 // Position property is still correct, and avoid having to do more
458 // reads from the disk. Otherwise, discard the buffer's contents.
460 // We can optimize the following condition:
461 // oldPos - _readPos <= curPos < oldPos + _readLen - _readPos
462 if (oldPos == curPos) {
464 Buffer.BlockCopy(_buffer, _readPos, _buffer, 0, _readLen - _readPos);
465 _readLen -= _readPos;
468 // If we still have buffered data, we must update the stream's
469 // position so our Position property is correct.
471 SeekCore(_readLen, SeekOrigin.Current);
473 else if (oldPos - _readPos < curPos && curPos < oldPos + _readLen - _readPos) {
474 int diff = (int)(curPos - oldPos);
475 Buffer.BlockCopy(_buffer, _readPos+diff, _buffer, 0, _readLen - (_readPos + diff));
476 _readLen -= (_readPos + diff);
479 SeekCore(_readLen, SeekOrigin.Current);
482 // Lose the read buffer.
486 Debug.Assert(_readLen >= 0 && _readPos <= _readLen, "_readLen should be nonnegative, and _readPos should be less than or equal _readLen");
487 Debug.Assert(curPos == Position, "Seek optimization: curPos != Position! Buffer math was mangled.");
491 #endif //_ENABLE_STREAM_FACTORING
493 [MethodImplAttribute(MethodImplOptions.Synchronized)]
494 public override void Flush()
499 else if (_readPos < _readLen)
509 // Reading is done by blocks from the file, but someone could read
510 // 1 byte from the buffer then write. At that point, the OS's file
511 // pointer is out of sync with the stream's position. All write
512 // functions should call this function to preserve the position in the file.
513 [MethodImplAttribute(MethodImplOptions.Synchronized)]
514 protected void FlushRead() {
515 #if _ENABLE_STREAM_FACTORING
516 Debug.Assert(_writePos == 0, "BufferedStream: Write buffer must be empty in FlushRead!");
518 if (_readPos - _readLen != 0) {
519 Debug.Assert(CanSeek, "BufferedStream will lose buffered read data now.");
521 SeekCore(_readPos - _readLen, SeekOrigin.Current);
525 #endif //_ENABLE_STREAM_FACTORING
528 // Writes are buffered. Anytime the buffer fills up
529 // (_writePos + delta > bufferSize) or the buffer switches to reading
530 // and there is left over data (_writePos > 0), this function must be called.
531 [MethodImplAttribute(MethodImplOptions.Synchronized)]
532 protected void FlushWrite(bool blockForWrite) {
533 Debug.Assert(_readPos == 0 && _readLen == 0, "BufferedStream: Read buffer must be empty in FlushWrite!");
536 WriteCore(_buffer, 0, _writePos, blockForWrite);
540 protected override void Dispose(bool disposing)
543 // Flush data to disk iff we were writing.
545 // With our Whidbey async IO & overlapped support for AD unloads,
546 // we don't strictly need to block here to release resources
547 // if the underlying IO is overlapped since that support
548 // takes care of the pinning & freeing the
549 // overlapped struct. We need to do this when called from
550 // Close so that the handle is closed when Close returns, but
551 // we do't need to call EndWrite from the finalizer.
552 // Additionally, if we do call EndWrite, we block forever
553 // because AD unloads prevent us from running the managed
554 // callback from the IO completion port. Blocking here when
555 // called from the finalizer during AD unload is clearly wrong,
556 // but we can't use any sort of test for whether the AD is
557 // unloading because if we weren't unloading, an AD unload
558 // could happen on a separate thread before we call EndWrite.
559 FlushWrite(disposing);
563 // Don't set the buffer to null, to avoid a NullReferenceException
564 // when users have a race condition in their code (ie, they call
565 // Close when calling another method on Stream like Read).
571 base.Dispose(disposing);
579 #if _ENABLE_STREAM_FACTORING
580 protected int BufferedWritePosition
582 // Making the getter thread safe is not very useful anyways
587 // Interlocked.Exchange(ref _writePos, value);
591 protected int BufferedReadPosition
593 // Making the getter thread safe is not very useful anyways
598 // Interlocked.Exchange(ref _readPos, value);
601 #endif //_ENABLE_STREAM_FACTORING
603 protected long UnderlyingStreamPosition
605 // Making the getter thread safe is not very useful anyways
610 Interlocked.Exchange(ref pos, value);
614 protected long AddUnderlyingStreamPosition(long posDelta)
616 return Interlocked.Add(ref pos, posDelta);
619 [MethodImplAttribute(MethodImplOptions.Synchronized)]
620 protected internal void DiscardBuffer()
627 private void WriteCore(byte[] buffer, int offset, int count, bool blockForWrite)
630 WriteCore(buffer, offset, count, blockForWrite, out streamPos);
632 protected abstract void WriteCore(byte[] buffer, int offset, int count, bool blockForWrite, out long streamPos);
634 #if _ENABLE_STREAM_FACTORING
635 private int ReadCore(byte[] buffer, int offset, int count)
638 return ReadCore(buffer, offset, count, out streamPos);
641 private int EndReadCore(IAsyncResult asyncResult)
644 return EndReadCore(asyncResult, out streamPos);
647 private void EndWriteCore(IAsyncResult asyncResult)
650 EndWriteCore(asyncResult, out streamPos);
653 // Derived streams should implement the following core methods
654 protected abstract int ReadCore(byte[] buffer, int offset, int count, out long streamPos);
655 [ResourceExposure(ResourceScope.None)]
656 [ResourceConsumption(ResourceScope.AppDomain, ResourceScope.AppDomain)]
657 protected abstract IAsyncResult BeginReadCore(byte[] bytes, int offset, int numBytes, AsyncCallback userCallback, Object stateObject, int numBufferedBytesRead);
658 protected abstract int EndReadCore(IAsyncResult asyncResult, out long streamPos);
659 [ResourceExposure(ResourceScope.None)]
660 [ResourceConsumption(ResourceScope.AppDomain, ResourceScope.AppDomain)]
661 protected abstract IAsyncResult BeginWriteCore(byte[] bytes, int offset, int numBytes, AsyncCallback userCallback, Object stateObject);
662 protected abstract void EndWriteCore(IAsyncResult asyncResult, out long streamPos);
663 protected abstract long SeekCore(long offset, SeekOrigin origin);
664 #endif //_ENABLE_STREAM_FACTORING
667 #if _ENABLE_STREAM_FACTORING
669 unsafe internal sealed class BufferedStreamAsyncResult : IAsyncResult
671 // User code callback
672 internal AsyncCallback _userCallback;
673 internal Object _userStateObject;
674 internal int _numBytes; // number of bytes read OR written
675 //internal int _errorCode;
677 internal bool _isWrite; // Whether this is a read or a write
679 public Object AsyncState
681 get { return _userStateObject; }
684 public bool IsCompleted
689 public WaitHandle AsyncWaitHandle
694 public bool CompletedSynchronously
699 internal static IAsyncResult Complete(int numBufferedBytes, AsyncCallback userCallback, Object userStateObject, bool isWrite)
702 BufferedStreamAsyncResult asyncResult = new BufferedStreamAsyncResult();
703 asyncResult._numBytes = numBufferedBytes;
704 asyncResult._userCallback = userCallback;
705 asyncResult._userStateObject = userStateObject;
706 asyncResult._isWrite = isWrite;
707 if (userCallback != null) {
708 userCallback(asyncResult);
713 #endif //_ENABLE_STREAM_FACTORING