bool disposed;
bool headersSent;
object locker = new object ();
+ bool initRead;
+ bool read_eof;
+ bool complete_request_written;
+ long max_buffer_size;
public WebConnectionStream (WebConnection cnc)
{
isRead = true;
pending = new ManualResetEvent (true);
+ this.request = cnc.Data.request;
this.cnc = cnc;
+ string contentType = cnc.Data.Headers ["Transfer-Encoding"];
+ bool chunkedRead = (contentType != null && contentType.ToLower ().IndexOf ("chunked") != -1);
string clength = cnc.Data.Headers ["Content-Length"];
- if (clength != null && clength != "") {
+ if (!chunkedRead && clength != null && clength != "") {
+
try {
contentLength = Int32.Parse (clength);
} catch {
this.request = request;
allowBuffering = request.InternalAllowBuffering;
sendChunked = request.SendChunked;
- if (allowBuffering)
+ if (allowBuffering) {
writeBuffer = new MemoryStream ();
+ max_buffer_size = request.ContentLength;
+ } else {
+ max_buffer_size = -1;
+ }
if (sendChunked)
pending = new ManualResetEvent (true);
}
+ internal bool CompleteRequestWritten {
+ get { return complete_request_written; }
+ }
+
internal bool SendChunked {
set { sendChunked = value; }
}
internal void ReadAll ()
{
- if (!isRead || totalRead >= contentLength || nextReadCalled) {
- if (!nextReadCalled) {
+ if (!isRead || read_eof || totalRead >= contentLength || nextReadCalled) {
+ if (isRead && !nextReadCalled) {
nextReadCalled = true;
cnc.NextRead ();
}
cnc.NextRead ();
}
-
- static void CallbackWrapper (IAsyncResult r)
+
+ void WriteCallbackWrapper (IAsyncResult r)
{
- WebAsyncResult result = (WebAsyncResult) r.AsyncState;
- result.InnerAsyncResult = r;
- result.DoCallback ();
+ WebAsyncResult result;
+ if (r.AsyncState != null) {
+ result = (WebAsyncResult) r.AsyncState;
+ result.InnerAsyncResult = r;
+ result.DoCallback ();
+ } else {
+ EndWrite (r);
+ }
+ }
+
+ void ReadCallbackWrapper (IAsyncResult r)
+ {
+ WebAsyncResult result;
+ if (r.AsyncState != null) {
+ result = (WebAsyncResult) r.AsyncState;
+ result.InnerAsyncResult = r;
+ result.DoCallback ();
+ } else {
+ EndRead (r);
+ }
}
public override int Read (byte [] buffer, int offset, int size)
if (totalRead >= contentLength)
return 0;
- IAsyncResult res = BeginRead (buffer, offset, size, null, null);
+ AsyncCallback cb = new AsyncCallback (ReadCallbackWrapper);
+ WebAsyncResult res = (WebAsyncResult) BeginRead (buffer, offset, size, cb, null);
+ if (!res.IsCompleted && !res.WaitUntilComplete (request.ReadWriteTimeout, false)) {
+ nextReadCalled = true;
+ cnc.Close (true);
+ throw new IOException ("Read timed out.");
+ }
+
return EndRead (res);
}
}
if (cb != null)
- cb = new AsyncCallback (CallbackWrapper);
+ cb = new AsyncCallback (ReadCallbackWrapper);
if (contentLength != Int32.MaxValue && contentLength - totalRead < size)
size = contentLength - totalRead;
- result.InnerAsyncResult = cnc.BeginRead (buffer, offset, size, cb, result);
+ if (!read_eof) {
+ result.InnerAsyncResult = cnc.BeginRead (buffer, offset, size, cb, result);
+ } else {
+ result.SetCompleted (true, result.NBytes);
+ result.DoCallback ();
+ }
return result;
}
public override int EndRead (IAsyncResult r)
{
WebAsyncResult result = (WebAsyncResult) r;
+ if (result.EndCalled) {
+ int xx = result.NBytes;
+ return (xx >= 0) ? xx : 0;
+ }
+
+ result.EndCalled = true;
if (!result.IsCompleted) {
- int nbytes = cnc.EndRead (result);
- bool finished = (nbytes == -1);
- if (finished && result.NBytes > 0)
+ int nbytes = -1;
+ try {
+ nbytes = cnc.EndRead (result);
+ } catch (Exception exc) {
+ lock (locker) {
+ pendingReads--;
+ if (pendingReads == 0)
+ pending.Set ();
+ }
+
+ nextReadCalled = true;
+ cnc.Close (true);
+ result.SetCompleted (false, exc);
+ throw;
+ }
+
+ if (nbytes < 0) {
nbytes = 0;
+ read_eof = true;
+ }
- result.SetCompleted (false, nbytes + result.NBytes);
totalRead += nbytes;
- if (finished || nbytes == 0)
+ result.SetCompleted (false, nbytes + result.NBytes);
+ result.DoCallback ();
+ if (nbytes == 0)
contentLength = totalRead;
}
if (totalRead >= contentLength && !nextReadCalled)
ReadAll ();
- return result.NBytes;
+ int nb = result.NBytes;
+ return (nb >= 0) ? nb : 0;
}
public override IAsyncResult BeginWrite (byte [] buffer, int offset, int size,
WebAsyncResult result = new WebAsyncResult (cb, state);
if (allowBuffering) {
+ if (max_buffer_size >= 0) {
+ long avail = max_buffer_size - writeBuffer.Length;
+ if (size > avail) {
+ if (requestWritten)
+ throw new ProtocolViolationException (
+ "The number of bytes to be written is greater than " +
+ "the specified ContentLength.");
+ }
+ }
writeBuffer.Write (buffer, offset, size);
if (!sendChunked) {
result.SetCompleted (true, 0);
AsyncCallback callback = null;
if (cb != null)
- callback = new AsyncCallback (CallbackWrapper);
+ callback = new AsyncCallback (WriteCallbackWrapper);
if (sendChunked) {
WriteRequest ();
if (r == null)
throw new ArgumentNullException ("r");
- if (allowBuffering && !sendChunked)
- return;
-
WebAsyncResult result = r as WebAsyncResult;
if (result == null)
throw new ArgumentException ("Invalid IAsyncResult");
+ if (result.EndCalled)
+ return;
+
+ result.EndCalled = true;
+
+ if (allowBuffering && !sendChunked)
+ return;
+
if (result.GotException)
throw result.Exception;
- cnc.EndWrite (result.InnerAsyncResult);
+ try {
+ cnc.EndWrite (result.InnerAsyncResult);
+ result.SetCompleted (false, 0);
+ } catch (Exception e) {
+ result.SetCompleted (false, e);
+ }
+
if (sendChunked) {
lock (locker) {
pendingWrites--;
if (isRead)
throw new NotSupportedException ("This stream does not allow writing");
- IAsyncResult res = BeginWrite (buffer, offset, size, null, null);
+ AsyncCallback cb = new AsyncCallback (WriteCallbackWrapper);
+ WebAsyncResult res = (WebAsyncResult) BeginWrite (buffer, offset, size, cb, null);
+ if (!res.IsCompleted && !res.WaitUntilComplete (request.ReadWriteTimeout, false)) {
+ nextReadCalled = true;
+ cnc.Close (true);
+ throw new IOException ("Write timed out.");
+ }
+
EndWrite (res);
}
throw new WebException ("Not connected", null, WebExceptionStatus.SendFailure, null);
cnc.Write (buffer, offset, size);
+ if (!initRead) {
+ initRead = true;
+ WebConnection.InitRead (cnc);
+ }
} else {
headers = new byte [size];
Buffer.BlockCopy (buffer, offset, headers, 0, size);
if (cnc.Data.StatusCode != 0 && cnc.Data.StatusCode != 100)
return;
- cnc.Write (bytes, 0, length);
+ IAsyncResult result = null;
+ if (length > 0)
+ result = cnc.BeginWrite (bytes, 0, length, null, null);
+
+ if (!initRead) {
+ initRead = true;
+ WebConnection.InitRead (cnc);
+ }
+
+ if (length > 0)
+ complete_request_written = cnc.EndWrite (result);
+ else
+ complete_request_written = true;
}
internal void InternalClose ()
{
disposed = true;
}
-
+
+ internal void ForceCloseConnection ()
+ {
+ if (!disposed) {
+ disposed = true;
+ cnc.Close (true);
+ }
+ }
+
public override void Close ()
{
if (sendChunked) {
if (!nextReadCalled) {
CheckComplete ();
// If we have not read all the contents
- if (!nextReadCalled)
+ if (!nextReadCalled) {
+ nextReadCalled = true;
cnc.Close (true);
+ }
+ }
+ return;
+ } else if (!allowBuffering) {
+ complete_request_written = true;
+ if (!initRead) {
+ initRead = true;
+ WebConnection.InitRead (cnc);
}
return;
}
- if (!allowBuffering || disposed)
+ if (disposed)
return;
- disposed = true;
-
long length = request.ContentLength;
if (length != -1 && length > writeBuffer.Length)
throw new IOException ("Cannot close the stream until all bytes are written");
WriteRequest ();
+ disposed = true;
}
public override long Seek (long a, SeekOrigin b)