Merge pull request #946 from akoeplinger/fix-mono-parallel
[mono.git] / mcs / class / corlib / System.IO / MemoryStream.cs
index dcf631fece9ac1e5a3e5f3dcfbc27a1b206d1c13..4ce86df30b616f0b9ff96498f8d7fa783d76a698 100644 (file)
@@ -4,14 +4,12 @@
 // Authors:    Marcin Szczepanski (marcins@zipworld.com.au)
 //             Patrik Torstensson
 //             Gonzalo Paniagua Javier (gonzalo@ximian.com)
+//             Marek Safar (marek.safar@gmail.com)
 //
 // (c) 2001,2002 Marcin Szczepanski, Patrik Torstensson
 // (c) 2003 Ximian, Inc. (http://www.ximian.com)
-// Copyright (C) 2004 Novell (http://www.novell.com)
-//
-
-//
 // Copyright (C) 2004 Novell, Inc (http://www.novell.com)
+// Copyright 2011 Xamarin, Inc (http://www.xamarin.com)
 //
 // Permission is hereby granted, free of charge, to any person obtaining
 // a copy of this software and associated documentation files (the
 
 using System.Globalization;
 using System.Runtime.InteropServices;
+using System.Threading;
+#if NET_4_5
+using System.Threading.Tasks;
+#endif
 
 namespace System.IO
 {
        [Serializable]
-#if NET_2_0
        [ComVisible (true)]
-#endif
-       [MonoTODO ("Serialization format not compatible with .NET")]
+       [MonoLimitation ("Serialization format not compatible with .NET")]
        public class MemoryStream : Stream
        {
                bool canWrite;
@@ -54,6 +54,11 @@ namespace System.IO
                bool expandable;
                bool streamClosed;
                int position;
+               int dirty_bytes;
+#if NET_4_5
+               [NonSerialized]
+               Task<int> read_task;
+#endif
 
                public MemoryStream () : this (0)
                {
@@ -154,8 +159,6 @@ namespace System.IO
 
                        set {
                                CheckIfClosedThrowDisposed ();
-                               if (value == capacity)
-                                       return; // LAMENESS: see MemoryStreamTest.ConstructorFive
 
                                if (!expandable)
                                        throw new NotSupportedException ("Cannot expand this MemoryStream");
@@ -164,12 +167,17 @@ namespace System.IO
                                        throw new ArgumentOutOfRangeException ("value",
                                        "New capacity cannot be negative or less than the current capacity " + value + " " + capacity);
 
+                               if (internalBuffer != null && value == internalBuffer.Length)
+                                       return;
+
                                byte [] newBuffer = null;
                                if (value != 0) {
                                        newBuffer = new byte [value];
-                                       Buffer.BlockCopy (internalBuffer, 0, newBuffer, 0, length);
+                                       if (internalBuffer != null)
+                                               Buffer.BlockCopy (internalBuffer, 0, newBuffer, 0, length);
                                }
 
+                               dirty_bytes = 0; // discard any dirty area beyond previous length
                                internalBuffer = newBuffer; // It's null when capacity is set to 0
                                capacity = value;
                        }
@@ -209,11 +217,7 @@ namespace System.IO
                        }
                }
 
-#if NET_2_0
                protected override void Dispose (bool disposing)
-#else
-               public override void Close ()
-#endif
                {
                        streamClosed = true;
                        expandable = false;
@@ -234,8 +238,6 @@ namespace System.IO
 
                public override int Read ([In,Out] byte [] buffer, int offset, int count)
                {
-                       CheckIfClosedThrowDisposed ();
-
                        if (buffer == null)
                                throw new ArgumentNullException ("buffer");
 
@@ -246,6 +248,8 @@ namespace System.IO
                                throw new ArgumentException ("offset+count",
                                                              "The size of the buffer is less than offset + count.");
 
+                       CheckIfClosedThrowDisposed ();
+
                        if (position >= length || count == 0)
                                return 0;
 
@@ -317,6 +321,18 @@ namespace System.IO
                        return minimum;
                }
 
+               void Expand (int newSize)
+               {
+                       // We don't need to take into account the dirty bytes when incrementing the
+                       // Capacity, as changing it will only preserve the valid clear region.
+                       if (newSize > capacity)
+                               Capacity = CalculateNewCapacity (newSize);
+                       else if (dirty_bytes > 0) {
+                               Array.Clear (internalBuffer, length, dirty_bytes);
+                               dirty_bytes = 0;
+                       }
+               }
+
                public override void SetLength (long value)
                {
                        if (!expandable && value > capacity)
@@ -337,12 +353,11 @@ namespace System.IO
                                throw new ArgumentOutOfRangeException ();
 
                        int newSize = (int) value + initialIndex;
-                       if (newSize > capacity)
-                               Capacity = CalculateNewCapacity (newSize);
-                       else if (newSize < length)
-                               // zeroize present data (so we don't get it 
-                               // back if we expand the stream using Seek)
-                               Array.Clear (internalBuffer, newSize, length - newSize);
+
+                       if (newSize > length)
+                               Expand (newSize);
+                       else if (newSize < length) // Postpone the call to Array.Clear till expand time
+                               dirty_bytes += length - newSize;
 
                        length = newSize;
                        if (position > length)
@@ -361,11 +376,6 @@ namespace System.IO
 
                public override void Write (byte [] buffer, int offset, int count)
                {
-                       CheckIfClosedThrowDisposed ();
-
-                       if (!canWrite)
-                               throw new NotSupportedException ("Cannot write to this stream.");
-
                        if (buffer == null)
                                throw new ArgumentNullException ("buffer");
                        
@@ -376,9 +386,14 @@ namespace System.IO
                                throw new ArgumentException ("offset+count",
                                                             "The size of the buffer is less than offset + count.");
 
+                       CheckIfClosedThrowDisposed ();
+
+                       if (!CanWrite)
+                               throw new NotSupportedException ("Cannot write to this stream.");
+
                        // reordered to avoid possible integer overflow
-                       if (position > capacity - count)
-                               Capacity = CalculateNewCapacity (position + count);
+                       if (position > length - count)
+                               Expand (position + count);
 
                        Buffer.BlockCopy (buffer, offset, internalBuffer, position, count);
                        position += count;
@@ -392,11 +407,10 @@ namespace System.IO
                        if (!canWrite)
                                throw new NotSupportedException ("Cannot write to this stream.");
 
-                       if (position >= capacity)
-                               Capacity = CalculateNewCapacity (position + 1);
-
-                       if (position >= length)
+                       if (position >= length) {
+                               Expand (position + 1);
                                length = position + 1;
+                       }
 
                        internalBuffer [position++] = value;
                }
@@ -410,5 +424,77 @@ namespace System.IO
 
                        stream.Write (internalBuffer, initialIndex, length - initialIndex);
                }
+
+#if NET_4_5
+
+               public override Task CopyToAsync (Stream destination, int bufferSize, CancellationToken cancellationToken)
+               {
+                       // TODO: Specialization but what for?
+                       return base.CopyToAsync (destination, bufferSize, cancellationToken);
+               }
+
+               public override Task FlushAsync (CancellationToken cancellationToken)
+               {
+                       if (cancellationToken.IsCancellationRequested)
+                               return TaskConstants.Canceled;
+
+                       try {
+                               Flush ();
+                               return TaskConstants.Finished;
+                       } catch (Exception ex) {
+                               return Task<object>.FromException (ex);
+                       }
+               }
+
+               public override Task<int> ReadAsync (byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+               {
+                       if (buffer == null)
+                               throw new ArgumentNullException ("buffer");
+
+                       if (offset < 0 || count < 0)
+                               throw new ArgumentOutOfRangeException ("offset or count less than zero.");
+
+                       if (buffer.Length - offset < count )
+                               throw new ArgumentException ("offset+count",
+                                                            "The size of the buffer is less than offset + count.");
+                       if (cancellationToken.IsCancellationRequested)
+                               return TaskConstants<int>.Canceled;
+
+                       try {
+                               count = Read (buffer, offset, count);
+
+                               // Try not to allocate a new task for every buffer read
+                               if (read_task == null || read_task.Result != count)
+                                       read_task = Task<int>.FromResult (count);
+
+                               return read_task;
+                       } catch (Exception ex) {
+                               return Task<int>.FromException (ex);
+                       }
+               }
+
+               public override Task WriteAsync (byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+               {
+                       if (buffer == null)
+                               throw new ArgumentNullException ("buffer");
+                       
+                       if (offset < 0 || count < 0)
+                               throw new ArgumentOutOfRangeException ();
+
+                       if (buffer.Length - offset < count)
+                               throw new ArgumentException ("offset+count",
+                                                            "The size of the buffer is less than offset + count.");
+
+                       if (cancellationToken.IsCancellationRequested)
+                               return TaskConstants.Canceled;
+
+                       try {
+                               Write (buffer, offset, count);
+                               return TaskConstants.Finished;
+                       } catch (Exception ex) {
+                               return Task<object>.FromException (ex);
+                       }
+               }
+#endif
        }               
 }