//
-// System.IO/Stream.cs
+// System.IO.Stream.cs
//
// Authors:
// Dietmar Maurer (dietmar@ximian.com)
// Miguel de Icaza (miguel@ximian.com)
// Gonzalo Paniagua Javier (gonzalo@ximian.com)
+// Marek Safar (marek.safar@gmail.com)
//
// (C) 2001, 2002 Ximian, Inc. http://www.ximian.com
// (c) 2004 Novell, Inc. (http://www.novell.com)
-//
-
-//
-// Copyright (C) 2004 Novell, Inc (http://www.novell.com)
+// Copyright 2011 Xamarin, Inc (http://www.xamarin.com)
//
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files (the
using System.Threading;
using System.Runtime.Remoting.Messaging;
using System.Runtime.InteropServices;
+#if NET_4_5
+using System.Threading.Tasks;
+#endif
namespace System.IO
{
[Serializable]
+ [ComVisible (true)]
+#if NET_2_1
+ public abstract class Stream : IDisposable
+#else
public abstract class Stream : MarshalByRefObject, IDisposable
+#endif
{
public static readonly Stream Null = new NullStream ();
+ [NonSerialized]
+ Func<byte[], int, int, int> async_read;
+ [NonSerialized]
+ Action<byte[], int, int> async_write;
+ [NonSerialized]
+ AutoResetEvent async_event;
+
protected Stream ()
{
}
get;
}
-#if NET_2_0
+ [ComVisible (false)]
public virtual bool CanTimeout {
get {
return false;
}
}
-#endif
public abstract long Length
{
}
-#if NET_2_0
- // 2.0 version of Dispose.
public void Dispose ()
{
Close ();
}
- // 2.0 version of Dispose.
protected virtual void Dispose (bool disposing)
{
- // nothing.
+ if (async_event != null && disposing) {
+ async_event.Close ();
+ async_event = null;
+ }
}
- //
- // 2.0 version of Close (): calls Dispose (true)
- //
public virtual void Close ()
{
Dispose (true);
+ GC.SuppressFinalize (this);
}
+ [ComVisible (false)]
public virtual int ReadTimeout {
get {
throw new InvalidOperationException ("Timeouts are not supported on this stream.");
}
}
+ [ComVisible (false)]
public virtual int WriteTimeout {
get {
throw new InvalidOperationException ("Timeouts are not supported on this stream.");
throw new InvalidOperationException ("Timeouts are not supported on this stream.");
}
}
-#else
- // 1.1 version of Close
- public virtual void Close ()
- {
- // nothing
- }
-#endif
- void IDisposable.Dispose ()
+ public static Stream Synchronized (Stream stream)
{
- Close ();
+ return new SynchronizedStream (stream);
}
+ [Obsolete ("CreateWaitHandle is due for removal. Use \"new ManualResetEvent(false)\" instead.")]
protected virtual WaitHandle CreateWaitHandle()
{
return new ManualResetEvent (false);
Write (buffer, 0, 1);
}
- public virtual IAsyncResult
- BeginRead (byte [] buffer, int offset, int count, AsyncCallback cback, object state)
+ public virtual IAsyncResult BeginRead (byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
if (!CanRead)
throw new NotSupportedException ("This stream does not support reading");
- // Creating a class derived from Stream that doesn't override BeginRead
- // shows that it actually calls Read and does everything synchronously.
- // Just put this in the Read override:
- // Console.WriteLine ("Read");
- // Console.WriteLine (Environment.StackTrace);
- // Thread.Sleep (10000);
- // return 10;
-
- StreamAsyncResult result = new StreamAsyncResult (state);
- try {
- int nbytes = Read (buffer, offset, count);
- result.SetComplete (null, nbytes);
- } catch (Exception e) {
- result.SetComplete (e, 0);
+ if (async_event == null) {
+ lock (this) {
+ if (async_event == null)
+ async_event = new AutoResetEvent (true);
+ }
}
- if (cback != null)
- cback (result);
-
- return result;
+ async_event.WaitOne ();
+ async_read = Read;
+ return async_read.BeginInvoke (buffer, offset, count, callback, state);
}
- delegate void WriteDelegate (byte [] buffer, int offset, int count);
-
- public virtual IAsyncResult
- BeginWrite (byte [] buffer, int offset, int count, AsyncCallback cback, object state)
+ public virtual IAsyncResult BeginWrite (byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
if (!CanWrite)
throw new NotSupportedException ("This stream does not support writing");
-
- // Creating a class derived from Stream that doesn't override BeginWrite
- // shows that it actually calls Write and does everything synchronously except
- // when invoking the callback, which is done from the ThreadPool.
- // Just put this in the Write override:
- // Console.WriteLine ("Write");
- // Console.WriteLine (Environment.StackTrace);
- // Thread.Sleep (10000);
-
- StreamAsyncResult result = new StreamAsyncResult (state);
+
+ if (async_event == null) {
+ lock (this) {
+ if (async_event == null)
+ async_event = new AutoResetEvent (true);
+ }
+ }
+
+ async_event.WaitOne ();
+ async_write = Write;
+ return async_write.BeginInvoke (buffer, offset, count, callback, state);
+ }
+
+ public virtual int EndRead (IAsyncResult asyncResult)
+ {
+ if (asyncResult == null)
+ throw new ArgumentNullException ("asyncResult");
+
+ if (async_read == null)
+ throw new ArgumentException ("EndRead cannot be called multiple times");
+
+ try {
+ return async_read.EndInvoke (asyncResult);
+ } finally {
+ async_read = null;
+ async_event.Set ();
+ }
+ }
+
+ public virtual void EndWrite (IAsyncResult asyncResult)
+ {
+ if (asyncResult == null)
+ throw new ArgumentNullException ("asyncResult");
+
+ if (async_write == null)
+ throw new ArgumentException ("EndWrite cannot be called multiple times");
+
try {
- Write (buffer, offset, count);
- result.SetComplete (null);
- } catch (Exception e) {
- result.SetComplete (e);
+ async_write.EndInvoke (asyncResult);
+ } finally {
+ async_write = null;
+ async_event.Set ();
}
+ }
- if (cback != null)
- cback.BeginInvoke (result, null, null);
+#if MOONLIGHT || NET_4_0 || MOBILE
+ public void CopyTo (Stream destination)
+ {
+ CopyTo (destination, 16*1024);
+ }
- return result;
+ public void CopyTo (Stream destination, int bufferSize)
+ {
+ if (destination == null)
+ throw new ArgumentNullException ("destination");
+ if (!CanRead)
+ throw new NotSupportedException ("This stream does not support reading");
+ if (!destination.CanWrite)
+ throw new NotSupportedException ("This destination stream does not support writing");
+ if (bufferSize <= 0)
+ throw new ArgumentOutOfRangeException ("bufferSize");
+
+ var buffer = new byte [bufferSize];
+ int nread;
+ while ((nread = Read (buffer, 0, bufferSize)) != 0)
+ destination.Write (buffer, 0, nread);
}
+
+#if NET_4_5
+ [ObsoleteAttribute("Do not call or override this method")]
+#endif
+ protected virtual void ObjectInvariant ()
+ {
+ }
+#endif
- public virtual int EndRead (IAsyncResult async_result)
+#if NET_4_5
+
+ public Task CopyToAsync (Stream destination)
{
- if (async_result == null)
- throw new ArgumentNullException ("async_result");
+ return CopyToAsync (destination, 16 * 1024, CancellationToken.None);
+ }
- StreamAsyncResult result = async_result as StreamAsyncResult;
- if (result == null || result.NBytes == -1)
- throw new ArgumentException ("Invalid IAsyncResult", "async_result");
+ public Task CopyToAsync (Stream destination, int bufferSize)
+ {
+ return CopyToAsync (destination, bufferSize, CancellationToken.None);
+ }
- if (result.Done)
- throw new InvalidOperationException ("EndRead already called.");
+ public virtual Task CopyToAsync (Stream destination, int bufferSize, CancellationToken cancellationToken)
+ {
+ if (destination == null)
+ throw new ArgumentNullException ("destination");
+ if (!CanRead)
+ throw new NotSupportedException ("This stream does not support reading");
+ if (!destination.CanWrite)
+ throw new NotSupportedException ("This destination stream does not support writing");
+ if (bufferSize <= 0)
+ throw new ArgumentOutOfRangeException ("bufferSize");
+
+ if (cancellationToken.IsCancellationRequested)
+ return TaskConstants.Canceled;
+
+ return CopyToAsync (destination, new byte[bufferSize], cancellationToken);
+ }
+
+ async Task CopyToAsync (Stream destination, byte[] buffer, CancellationToken cancellationToken)
+ {
+ int nread;
+ while ((nread = await ReadAsync (buffer, 0, buffer.Length).ConfigureAwait (false)) != 0)
+ await destination.WriteAsync (buffer, 0, nread, cancellationToken).ConfigureAwait (false);
+ }
- result.Done = true;
- if (result.Exception != null)
- throw result.Exception;
+ public Task FlushAsync ()
+ {
+ return FlushAsync (CancellationToken.None);
+ }
+
+ public virtual Task FlushAsync (CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ return TaskConstants.Canceled;
- return result.NBytes;
+ return Task.Factory.StartNew (l => ((Stream) l).Flush (), this, cancellationToken);
}
- public virtual void EndWrite (IAsyncResult async_result)
+ public Task<int> ReadAsync (byte[] buffer, int offset, int count)
{
- if (async_result == null)
- throw new ArgumentNullException ("async_result");
+ return ReadAsync (buffer, offset, count, CancellationToken.None);
+ }
+
+ public virtual Task<int> ReadAsync (byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ return TaskConstants<int>.Canceled;
- StreamAsyncResult result = async_result as StreamAsyncResult;
- if (result == null || result.NBytes != -1)
- throw new ArgumentException ("Invalid IAsyncResult", "async_result");
+ return Task<int>.Factory.FromAsync (BeginRead, EndRead, buffer, offset, count, null);
+ }
- if (result.Done)
- throw new InvalidOperationException ("EndWrite already called.");
+ public Task WriteAsync (byte[] buffer, int offset, int count)
+ {
+ return WriteAsync (buffer, offset, count, CancellationToken.None);
+ }
- result.Done = true;
- if (result.Exception != null)
- throw result.Exception;
+ public virtual Task WriteAsync (byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ return Task.Factory.FromAsync (BeginWrite, EndWrite, buffer, offset, count, null);
}
+#endif
}
class NullStream : Stream
{
}
- public override int Read (byte[] buffer,
- int offset,
- int count)
+ public override int Read (byte[] buffer, int offset, int count)
{
return 0;
}
return -1;
}
- public override long Seek (long offset,
- SeekOrigin origin)
+ public override long Seek (long offset, SeekOrigin origin)
{
return 0;
}
{
}
- public override void Write (byte[] buffer,
- int offset,
- int count)
+ public override void Write (byte[] buffer, int offset, int count)
+ {
+ }
+
+ public override void WriteByte (byte value)
+ {
+ }
+ }
+
+ class SynchronizedStream : Stream {
+ Stream source;
+ object slock;
+
+ internal SynchronizedStream (Stream source)
+ {
+ this.source = source;
+ slock = new object ();
+ }
+
+ public override bool CanRead
+ {
+ get {
+ lock (slock)
+ return source.CanRead;
+ }
+ }
+
+ public override bool CanSeek
+ {
+ get {
+ lock (slock)
+ return source.CanSeek;
+ }
+ }
+
+ public override bool CanWrite
+ {
+ get {
+ lock (slock)
+ return source.CanWrite;
+ }
+ }
+
+ public override long Length
+ {
+ get {
+ lock (slock)
+ return source.Length;
+ }
+ }
+
+ public override long Position
+ {
+ get {
+ lock (slock)
+ return source.Position;
+ }
+ set {
+ lock (slock)
+ source.Position = value;
+ }
+ }
+
+ public override void Flush ()
+ {
+ lock (slock)
+ source.Flush ();
+ }
+
+ public override int Read (byte[] buffer, int offset, int count)
+ {
+ lock (slock)
+ return source.Read (buffer, offset, count);
+ }
+
+ public override int ReadByte ()
+ {
+ lock (slock)
+ return source.ReadByte ();
+ }
+
+ public override long Seek (long offset, SeekOrigin origin)
+ {
+ lock (slock)
+ return source.Seek (offset, origin);
+ }
+
+ public override void SetLength (long value)
+ {
+ lock (slock)
+ source.SetLength (value);
+ }
+
+ public override void Write (byte[] buffer, int offset, int count)
{
+ lock (slock)
+ source.Write (buffer, offset, count);
}
public override void WriteByte (byte value)
{
+ lock (slock)
+ source.WriteByte (value);
}
}
}