*/
internal EndPoint seed_endpoint = null;
- internal Queue<SocketAsyncWorker> readQ = new Queue<SocketAsyncWorker> (2);
- internal Queue<SocketAsyncWorker> writeQ = new Queue<SocketAsyncWorker> (2);
+ internal Queue<KeyValuePair<IntPtr, IOSelectorJob>> readQ = new Queue<KeyValuePair<IntPtr, IOSelectorJob>> (2);
+ internal Queue<KeyValuePair<IntPtr, IOSelectorJob>> writeQ = new Queue<KeyValuePair<IntPtr, IOSelectorJob>> (2);
internal bool is_blocking = true;
internal bool is_bound;
this.address_family = addressFamily;
this.socket_type = socketType;
this.protocol_type = protocolType;
-
- int error;
- var handle = Socket_internal (addressFamily, socketType, protocolType, out error);
- this.safe_handle = new SafeSocketHandle (handle, true);
+ int error;
+ this.safe_handle = new SafeSocketHandle (Socket_internal (addressFamily, socketType, protocolType, out error), true);
if (error != 0)
throw new SocketException (error);
throw new InvalidOperationException ("AcceptSocket: The socket must not be bound or connected.");
}
- e.curSocket = this;
- e.Worker.Init (this, e, SocketOperation.Accept);
-
- SocketAsyncResult sockares = e.Worker.result;
+ InitSocketAsyncEventArgs (e, AcceptAsyncCallback, e, SocketOperation.Accept);
- QueueSocketAsyncResult (readQ, e.Worker, sockares);
+ QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptCallback, e.socket_async_result));
return true;
}
+ static AsyncCallback AcceptAsyncCallback = new AsyncCallback (ares => {
+ SocketAsyncEventArgs e = (SocketAsyncEventArgs) ((SocketAsyncResult) ares).AsyncState;
+
+ if (Interlocked.Exchange (ref e.in_progress, 0) != 1)
+ throw new InvalidOperationException ("No operation in progress");
+
+ try {
+ e.AcceptSocket = e.current_socket.EndAccept (ares);
+ } catch (SocketException ex) {
+ e.SocketError = ex.SocketErrorCode;
+ } catch (ObjectDisposedException) {
+ e.SocketError = SocketError.OperationAborted;
+ } finally {
+ if (e.AcceptSocket == null)
+ e.AcceptSocket = new Socket (e.current_socket.AddressFamily, e.current_socket.SocketType, e.current_socket.ProtocolType, null);
+ e.Complete ();
+ }
+ });
+
public IAsyncResult BeginAccept(AsyncCallback callback, object state)
{
ThrowIfDisposedAndClosed ();
if (!is_bound || !is_listening)
throw new InvalidOperationException ();
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.Accept);
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.Accept);
- QueueSocketAsyncResult (readQ, sockares.Worker, sockares);
+ QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptCallback, sockares));
return sockares;
}
+ static IOAsyncCallback BeginAcceptCallback = new IOAsyncCallback (ares => {
+ SocketAsyncResult sockares = (SocketAsyncResult) ares;
+ Socket socket = null;
+
+ try {
+ socket = sockares.socket.Accept ();
+ } catch (Exception e) {
+ sockares.Complete (e);
+ return;
+ }
+
+ sockares.Complete (socket);
+ });
+
public IAsyncResult BeginAccept (int receiveSize, AsyncCallback callback, object state)
{
ThrowIfDisposedAndClosed ();
if (receiveSize < 0)
throw new ArgumentOutOfRangeException ("receiveSize", "receiveSize is less than zero");
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.AcceptReceive) {
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.AcceptReceive) {
Buffer = new byte [receiveSize],
Offset = 0,
Size = receiveSize,
SockFlags = SocketFlags.None,
};
- QueueSocketAsyncResult (readQ, sockares.Worker, sockares);
+ QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptReceiveCallback, sockares));
return sockares;
}
if (acceptSocket.ProtocolType != ProtocolType.Tcp)
throw new SocketException ((int)SocketError.InvalidArgument);
}
-
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.AcceptReceive) {
+
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.AcceptReceive) {
Buffer = new byte [receiveSize],
Offset = 0,
Size = receiveSize,
AcceptSocket = acceptSocket,
};
- QueueSocketAsyncResult (readQ, sockares.Worker, sockares);
+ QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptReceiveCallback, sockares));
return sockares;
}
+ static IOAsyncCallback BeginAcceptReceiveCallback = new IOAsyncCallback (ares => {
+ SocketAsyncResult sockares = (SocketAsyncResult) ares;
+ Socket acc_socket = null;
+
+ try {
+ if (sockares.AcceptSocket == null) {
+ acc_socket = sockares.socket.Accept ();
+ } else {
+ acc_socket = sockares.AcceptSocket;
+ sockares.socket.Accept (acc_socket);
+ }
+ } catch (Exception e) {
+ sockares.Complete (e);
+ return;
+ }
+
+ /* It seems the MS runtime special-cases 0-length requested receive data. See bug 464201. */
+ int total = 0;
+ if (sockares.Size > 0) {
+ try {
+ SocketError error;
+ total = acc_socket.Receive_nochecks (sockares.Buffer, sockares.Offset, sockares.Size, sockares.SockFlags, out error);
+ if (error != 0) {
+ sockares.Complete (new SocketException ((int) error));
+ return;
+ }
+ } catch (Exception e) {
+ sockares.Complete (e);
+ return;
+ }
+ }
+
+ sockares.Complete (acc_socket, total);
+ });
+
public Socket EndAccept (IAsyncResult result)
{
int bytes;
buffer = sockares.Buffer;
bytesTransferred = sockares.Total;
- return sockares.Socket;
+ return sockares.AcceptedSocket;
}
static SafeSocketHandle Accept_internal (SafeSocketHandle safeHandle, out int error, bool blocking)
if (e.RemoteEndPoint == null)
throw new ArgumentNullException ("remoteEP");
- e.curSocket = this;
- e.Worker.Init (this, e, SocketOperation.Connect);
-
- SocketAsyncResult result = e.Worker.result;
+ InitSocketAsyncEventArgs (e, ConnectAsyncCallback, e, SocketOperation.Connect);
try {
IPAddress [] addresses;
- IAsyncResult ares;
+ SocketAsyncResult ares;
if (!GetCheckedIPs (e, out addresses)) {
- result.EndPoint = e.RemoteEndPoint;
- ares = BeginConnect (e.RemoteEndPoint, SocketAsyncEventArgs.Dispatcher, e);
+ e.socket_async_result.EndPoint = e.RemoteEndPoint;
+ ares = (SocketAsyncResult) BeginConnect (e.RemoteEndPoint, ConnectAsyncCallback, e);
} else {
DnsEndPoint dep = (e.RemoteEndPoint as DnsEndPoint);
- result.Addresses = addresses;
- result.Port = dep.Port;
- ares = BeginConnect (addresses, dep.Port, SocketAsyncEventArgs.Dispatcher, e);
+ e.socket_async_result.Addresses = addresses;
+ e.socket_async_result.Port = dep.Port;
+ ares = (SocketAsyncResult) BeginConnect (addresses, dep.Port, ConnectAsyncCallback, e);
}
if (ares.IsCompleted && ares.CompletedSynchronously) {
- ((SocketAsyncResult) ares).CheckIfThrowDelayedException ();
+ ares.CheckIfThrowDelayedException ();
return false;
}
} catch (Exception exc) {
- result.Complete (exc, true);
+ e.socket_async_result.Complete (exc, true);
return false;
}
return true;
}
+ static AsyncCallback ConnectAsyncCallback = new AsyncCallback (ares => {
+ SocketAsyncEventArgs e = (SocketAsyncEventArgs) ((SocketAsyncResult) ares).AsyncState;
+
+ if (Interlocked.Exchange (ref e.in_progress, 0) != 1)
+ throw new InvalidOperationException ("No operation in progress");
+
+ try {
+ e.current_socket.EndConnect (ares);
+ } catch (SocketException se) {
+ e.SocketError = se.SocketErrorCode;
+ } catch (ObjectDisposedException) {
+ e.SocketError = SocketError.OperationAborted;
+ } finally {
+ e.Complete ();
+ }
+ });
+
public IAsyncResult BeginConnect (IPAddress address, int port, AsyncCallback callback, object state)
{
ThrowIfDisposedAndClosed ();
if (end_point == null)
throw new ArgumentNullException ("end_point");
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.Connect) {
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.Connect) {
EndPoint = end_point,
};
// an error. Better to just close the socket and move on.
connect_in_progress = false;
safe_handle.Dispose ();
- var handle = Socket_internal (address_family, socket_type, protocol_type, out error);
- safe_handle = new SafeSocketHandle (handle, true);
+ safe_handle = new SafeSocketHandle (Socket_internal (address_family, socket_type, protocol_type, out error), true);
if (error != 0)
throw new SocketException (error);
}
is_bound = false;
connect_in_progress = true;
- socket_pool_queue (SocketAsyncWorker.Dispatcher, sockares);
+ IOSelector.Add (sockares.Handle, new IOSelectorJob (IOOperation.Write, BeginConnectCallback, sockares));
return sockares;
}
if (is_listening)
throw new InvalidOperationException ();
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.Connect) {
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.Connect) {
Addresses = addresses,
Port = port,
};
internal IAsyncResult BeginMConnect (SocketAsyncResult sockares)
{
- IAsyncResult ares = null;
+ SocketAsyncResult ares = null;
Exception exc = null;
+ AsyncCallback callback;
for (int i = sockares.CurrentAddress; i < sockares.Addresses.Length; i++) {
try {
sockares.CurrentAddress++;
- ares = BeginConnect (new IPEndPoint (sockares.Addresses [i], sockares.Port), null, sockares);
+ ares = (SocketAsyncResult) BeginConnect (new IPEndPoint (sockares.Addresses [i], sockares.Port), null, sockares);
if (ares.IsCompleted && ares.CompletedSynchronously) {
- ((SocketAsyncResult) ares).CheckIfThrowDelayedException ();
- sockares.DoMConnectCallback ();
+ ares.CheckIfThrowDelayedException ();
+
+ callback = ares.AsyncCallback;
+ if (callback != null)
+ ThreadPool.UnsafeQueueUserWorkItem (_ => callback (ares), null);
}
break;
return sockares;
}
+ static IOAsyncCallback BeginConnectCallback = new IOAsyncCallback (ares => {
+ SocketAsyncResult sockares = (SocketAsyncResult) ares;
+
+ if (sockares.EndPoint == null) {
+ sockares.Complete (new SocketException ((int)SocketError.AddressNotAvailable));
+ return;
+ }
+
+ SocketAsyncResult mconnect = sockares.AsyncState as SocketAsyncResult;
+ bool is_mconnect = mconnect != null && mconnect.Addresses != null;
+
+ try {
+ EndPoint ep = sockares.EndPoint;
+ int error_code = (int) sockares.socket.GetSocketOption (SocketOptionLevel.Socket, SocketOptionName.Error);
+
+ if (error_code == 0) {
+ if (is_mconnect)
+ sockares = mconnect;
+
+ sockares.socket.seed_endpoint = ep;
+ sockares.socket.is_connected = true;
+ sockares.socket.is_bound = true;
+ sockares.socket.connect_in_progress = false;
+ sockares.error = 0;
+ sockares.Complete ();
+ return;
+ }
+
+ if (!is_mconnect) {
+ sockares.socket.connect_in_progress = false;
+ sockares.Complete (new SocketException (error_code));
+ return;
+ }
+
+ if (mconnect.CurrentAddress >= mconnect.Addresses.Length) {
+ mconnect.Complete (new SocketException (error_code));
+ return;
+ }
+
+ mconnect.socket.BeginMConnect (mconnect);
+ } catch (Exception e) {
+ sockares.socket.connect_in_progress = false;
+
+ if (is_mconnect)
+ sockares = mconnect;
+
+ sockares.Complete (e);
+ return;
+ }
+ });
+
public void EndConnect (IAsyncResult result)
{
ThrowIfDisposedAndClosed ();
ThrowIfDisposedAndClosed ();
- e.curSocket = this;
- e.Worker.Init (this, e, SocketOperation.Disconnect);
-
- SocketAsyncResult sockares = e.Worker.result;
+ InitSocketAsyncEventArgs (e, DisconnectAsyncCallback, e, SocketOperation.Disconnect);
- socket_pool_queue (SocketAsyncWorker.Dispatcher, sockares);
+ IOSelector.Add (e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, BeginDisconnectCallback, e.socket_async_result));
return true;
}
+ static AsyncCallback DisconnectAsyncCallback = new AsyncCallback (ares => {
+ SocketAsyncEventArgs e = (SocketAsyncEventArgs) ((SocketAsyncResult) ares).AsyncState;
+
+ if (Interlocked.Exchange (ref e.in_progress, 0) != 1)
+ throw new InvalidOperationException ("No operation in progress");
+
+ try {
+ e.current_socket.EndDisconnect (ares);
+ } catch (SocketException ex) {
+ e.SocketError = ex.SocketErrorCode;
+ } catch (ObjectDisposedException) {
+ e.SocketError = SocketError.OperationAborted;
+ } finally {
+ e.Complete ();
+ }
+ });
public IAsyncResult BeginDisconnect (bool reuseSocket, AsyncCallback callback, object state)
{
ThrowIfDisposedAndClosed ();
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.Disconnect) {
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.Disconnect) {
ReuseSocket = reuseSocket,
};
- socket_pool_queue (SocketAsyncWorker.Dispatcher, sockares);
+ IOSelector.Add (sockares.Handle, new IOSelectorJob (IOOperation.Write, BeginDisconnectCallback, sockares));
return sockares;
}
+ static IOAsyncCallback BeginDisconnectCallback = new IOAsyncCallback (ares => {
+ SocketAsyncResult sockares = (SocketAsyncResult) ares;
+
+ try {
+ sockares.socket.Disconnect (sockares.ReuseSocket);
+ } catch (Exception e) {
+ sockares.Complete (e);
+ return;
+ }
+
+ sockares.Complete ();
+ });
+
public void EndDisconnect (IAsyncResult asyncResult)
{
ThrowIfDisposedAndClosed ();
if (e.Buffer == null && e.BufferList == null)
throw new NullReferenceException ("Either e.Buffer or e.BufferList must be valid buffers.");
- e.curSocket = this;
- e.Worker.Init (this, e, e.Buffer != null ? SocketOperation.Receive : SocketOperation.ReceiveGeneric);
+ if (e.Buffer == null) {
+ InitSocketAsyncEventArgs (e, ReceiveAsyncCallback, e, SocketOperation.ReceiveGeneric);
- SocketAsyncResult sockares = e.Worker.result;
- sockares.SockFlags = e.SocketFlags;
+ e.socket_async_result.Buffers = e.BufferList;
- if (e.Buffer != null) {
- sockares.Buffer = e.Buffer;
- sockares.Offset = e.Offset;
- sockares.Size = e.Count;
+ QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveGenericCallback, e.socket_async_result));
} else {
- sockares.Buffers = e.BufferList;
- }
+ InitSocketAsyncEventArgs (e, ReceiveAsyncCallback, e, SocketOperation.Receive);
- // Receive takes care of ReceiveGeneric
- QueueSocketAsyncResult (readQ, e.Worker, sockares);
+ e.socket_async_result.Buffer = e.Buffer;
+ e.socket_async_result.Offset = e.Offset;
+ e.socket_async_result.Size = e.Count;
+
+ QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveCallback, e.socket_async_result));
+ }
return true;
}
+ static AsyncCallback ReceiveAsyncCallback = new AsyncCallback (ares => {
+ SocketAsyncEventArgs e = (SocketAsyncEventArgs) ((SocketAsyncResult) ares).AsyncState;
+
+ if (Interlocked.Exchange (ref e.in_progress, 0) != 1)
+ throw new InvalidOperationException ("No operation in progress");
+
+ try {
+ e.BytesTransferred = e.current_socket.EndReceive (ares);
+ } catch (SocketException se){
+ e.SocketError = se.SocketErrorCode;
+ } catch (ObjectDisposedException) {
+ e.SocketError = SocketError.OperationAborted;
+ } finally {
+ e.Complete ();
+ }
+ });
+
public IAsyncResult BeginReceive (byte[] buffer, int offset, int size, SocketFlags socket_flags, AsyncCallback callback, object state)
{
ThrowIfDisposedAndClosed ();
ThrowIfBufferNull (buffer);
ThrowIfBufferOutOfRange (buffer, offset, size);
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.Receive) {
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.Receive) {
Buffer = buffer,
Offset = offset,
Size = size,
SockFlags = socket_flags,
};
- QueueSocketAsyncResult (readQ, sockares.Worker, sockares);
+ QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveCallback, sockares));
return sockares;
}
return BeginReceive (buffer, offset, size, flags, callback, state);
}
+ static IOAsyncCallback BeginReceiveCallback = new IOAsyncCallback (ares => {
+ SocketAsyncResult sockares = (SocketAsyncResult) ares;
+ int total = 0;
+
+ try {
+ total = Receive_internal (sockares.socket.safe_handle, sockares.Buffer, sockares.Offset, sockares.Size, sockares.SockFlags, out sockares.error);
+ } catch (Exception e) {
+ sockares.Complete (e);
+ return;
+ }
+
+ sockares.Complete (total);
+ });
+
[CLSCompliant (false)]
public IAsyncResult BeginReceive (IList<ArraySegment<byte>> buffers, SocketFlags socketFlags, AsyncCallback callback, object state)
{
if (buffers == null)
throw new ArgumentNullException ("buffers");
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.ReceiveGeneric) {
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.ReceiveGeneric) {
Buffers = buffers,
SockFlags = socketFlags,
};
- QueueSocketAsyncResult (readQ, sockares.Worker, sockares);
+ QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveGenericCallback, sockares));
return sockares;
}
return BeginReceive (buffers, socketFlags, callback, state);
}
+ static IOAsyncCallback BeginReceiveGenericCallback = new IOAsyncCallback (ares => {
+ SocketAsyncResult sockares = (SocketAsyncResult) ares;
+ int total = 0;
+
+ try {
+ total = sockares.socket.Receive (sockares.Buffers, sockares.SockFlags);
+ } catch (Exception e) {
+ sockares.Complete (e);
+ return;
+ }
+
+ sockares.Complete (total);
+ });
+
public int EndReceive (IAsyncResult result)
{
SocketError error;
return sockares.Total;
}
- internal int Receive_nochecks (byte [] buf, int offset, int size, SocketFlags flags, out SocketError error)
+ int Receive_nochecks (byte [] buf, int offset, int size, SocketFlags flags, out SocketError error)
{
int nativeError;
int ret = Receive_internal (safe_handle, buf, offset, size, flags, out nativeError);
[MethodImplAttribute (MethodImplOptions.InternalCall)]
extern static int Receive_internal (IntPtr sock, WSABUF[] bufarray, SocketFlags flags, out int error);
- internal static int Receive_internal (SafeSocketHandle safeHandle, byte[] buffer, int offset, int count, SocketFlags flags, out int error)
+ static int Receive_internal (SafeSocketHandle safeHandle, byte[] buffer, int offset, int count, SocketFlags flags, out int error)
{
try {
safeHandle.RegisterForBlockingSyscall ();
{
ThrowIfDisposedAndClosed ();
ThrowIfBufferNull (buffer);
- ThrowIfBufferOutOfRange (buffer, 0, buffer.Length);
-
- if (remoteEP == null)
- throw new ArgumentNullException ("remoteEP");
- return ReceiveFrom_nochecks (buffer, 0, buffer.Length, SocketFlags.None, ref remoteEP);
+ return ReceiveFrom (buffer, 0, buffer.Length, SocketFlags.None, ref remoteEP);
}
public int ReceiveFrom (byte [] buffer, SocketFlags flags, ref EndPoint remoteEP)
{
ThrowIfDisposedAndClosed ();
ThrowIfBufferNull (buffer);
- ThrowIfBufferOutOfRange (buffer, 0, buffer.Length);
-
- if (remoteEP == null)
- throw new ArgumentNullException ("remoteEP");
- return ReceiveFrom_nochecks (buffer, 0, buffer.Length, flags, ref remoteEP);
+ return ReceiveFrom (buffer, 0, buffer.Length, flags, ref remoteEP);
}
public int ReceiveFrom (byte [] buffer, int size, SocketFlags flags, ref EndPoint remoteEP)
ThrowIfBufferNull (buffer);
ThrowIfBufferOutOfRange (buffer, 0, size);
- if (remoteEP == null)
- throw new ArgumentNullException ("remoteEP");
-
- return ReceiveFrom_nochecks (buffer, 0, size, flags, ref remoteEP);
+ return ReceiveFrom (buffer, 0, size, flags, ref remoteEP);
}
public int ReceiveFrom (byte [] buffer, int offset, int size, SocketFlags flags, ref EndPoint remoteEP)
if (remoteEP == null)
throw new ArgumentNullException ("remoteEP");
- return ReceiveFrom_nochecks (buffer, offset, size, flags, ref remoteEP);
+ int error;
+ return ReceiveFrom_nochecks_exc (buffer, offset, size, flags, ref remoteEP, true, out error);
}
public bool ReceiveFromAsync (SocketAsyncEventArgs e)
if (e.RemoteEndPoint == null)
throw new ArgumentNullException ("remoteEP", "Value cannot be null.");
- e.curSocket = this;
- e.Worker.Init (this, e, SocketOperation.ReceiveFrom);
+ InitSocketAsyncEventArgs (e, ReceiveFromAsyncCallback, e, SocketOperation.ReceiveFrom);
- SocketAsyncResult sockares = e.Worker.result;
- sockares.Buffer = e.Buffer;
- sockares.Offset = e.Offset;
- sockares.Size = e.Count;
- sockares.EndPoint = e.RemoteEndPoint;
- sockares.SockFlags = e.SocketFlags;
+ e.socket_async_result.Buffer = e.Buffer;
+ e.socket_async_result.Offset = e.Offset;
+ e.socket_async_result.Size = e.Count;
+ e.socket_async_result.EndPoint = e.RemoteEndPoint;
+ e.socket_async_result.SockFlags = e.SocketFlags;
- QueueSocketAsyncResult (readQ, e.Worker, sockares);
+ QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveFromCallback, e.socket_async_result));
return true;
}
+ static AsyncCallback ReceiveFromAsyncCallback = new AsyncCallback (ares => {
+ SocketAsyncEventArgs e = (SocketAsyncEventArgs) ((SocketAsyncResult) ares).AsyncState;
+
+ if (Interlocked.Exchange (ref e.in_progress, 0) != 1)
+ throw new InvalidOperationException ("No operation in progress");
+
+ try {
+ e.BytesTransferred = e.current_socket.EndReceiveFrom (ares, ref e.remote_ep);
+ } catch (SocketException ex) {
+ e.SocketError = ex.SocketErrorCode;
+ } catch (ObjectDisposedException) {
+ e.SocketError = SocketError.OperationAborted;
+ } finally {
+ e.Complete ();
+ }
+ });
+
public IAsyncResult BeginReceiveFrom (byte[] buffer, int offset, int size, SocketFlags socket_flags, ref EndPoint remote_end, AsyncCallback callback, object state)
{
ThrowIfDisposedAndClosed ();
if (remote_end == null)
throw new ArgumentNullException ("remote_end");
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.ReceiveFrom) {
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.ReceiveFrom) {
Buffer = buffer,
Offset = offset,
Size = size,
EndPoint = remote_end,
};
- QueueSocketAsyncResult (readQ, sockares.Worker, sockares);
+ QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveFromCallback, sockares));
return sockares;
}
+ static IOAsyncCallback BeginReceiveFromCallback = new IOAsyncCallback (ares => {
+ SocketAsyncResult sockares = (SocketAsyncResult) ares;
+ int total = 0;
+
+ try {
+ int error;
+ total = sockares.socket.ReceiveFrom_nochecks_exc (sockares.Buffer, sockares.Offset, sockares.Size, sockares.SockFlags, ref sockares.EndPoint, true, out error);
+ } catch (Exception e) {
+ sockares.Complete (e);
+ return;
+ }
+
+ sockares.Complete (total);
+ });
+
public int EndReceiveFrom(IAsyncResult result, ref EndPoint end_point)
{
ThrowIfDisposedAndClosed ();
return sockares.Total;
}
- internal int ReceiveFrom_nochecks (byte [] buf, int offset, int size, SocketFlags flags, ref EndPoint remote_end)
- {
- int error;
- return ReceiveFrom_nochecks_exc (buf, offset, size, flags, ref remote_end, true, out error);
- }
-
internal int ReceiveFrom_nochecks_exc (byte [] buf, int offset, int size, SocketFlags flags, ref EndPoint remote_end, bool throwOnError, out int error)
{
SocketAddress sockaddr = remote_end.Serialize();
return ret;
}
- internal int Send_nochecks (byte [] buf, int offset, int size, SocketFlags flags, out SocketError error)
+ int Send_nochecks (byte [] buf, int offset, int size, SocketFlags flags, out SocketError error)
{
if (size == 0) {
error = SocketError.Success;
if (e.Buffer == null && e.BufferList == null)
throw new NullReferenceException ("Either e.Buffer or e.BufferList must be valid buffers.");
- e.curSocket = this;
- e.Worker.Init (this, e, e.Buffer != null ? SocketOperation.Send : SocketOperation.SendGeneric);
+ if (e.Buffer == null) {
+ InitSocketAsyncEventArgs (e, SendAsyncCallback, e, SocketOperation.SendGeneric);
- SocketAsyncResult sockares = e.Worker.result;
- sockares.SockFlags = e.SocketFlags;
+ e.socket_async_result.Buffers = e.BufferList;
- if (e.Buffer != null) {
- sockares.Buffer = e.Buffer;
- sockares.Offset = e.Offset;
- sockares.Size = e.Count;
+ QueueIOSelectorJob (writeQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, BeginSendGenericCallback, e.socket_async_result));
} else {
- sockares.Buffers = e.BufferList;
- }
+ InitSocketAsyncEventArgs (e, SendAsyncCallback, e, SocketOperation.Send);
+
+ e.socket_async_result.Buffer = e.Buffer;
+ e.socket_async_result.Offset = e.Offset;
+ e.socket_async_result.Size = e.Count;
- // Send takes care of SendGeneric
- QueueSocketAsyncResult (writeQ, e.Worker, sockares);
+ QueueIOSelectorJob (writeQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendCallback ((SocketAsyncResult) s, 0), e.socket_async_result));
+ }
return true;
}
+ static AsyncCallback SendAsyncCallback = new AsyncCallback (ares => {
+ SocketAsyncEventArgs e = (SocketAsyncEventArgs) ((SocketAsyncResult) ares).AsyncState;
+
+ if (Interlocked.Exchange (ref e.in_progress, 0) != 1)
+ throw new InvalidOperationException ("No operation in progress");
+
+ try {
+ e.BytesTransferred = e.current_socket.EndSend (ares);
+ } catch (SocketException se){
+ e.SocketError = se.SocketErrorCode;
+ } catch (ObjectDisposedException) {
+ e.SocketError = SocketError.OperationAborted;
+ } finally {
+ e.Complete ();
+ }
+ });
+
public IAsyncResult BeginSend (byte[] buffer, int offset, int size, SocketFlags socketFlags, out SocketError errorCode, AsyncCallback callback, object state)
{
if (!is_connected) {
if (!is_connected)
throw new SocketException ((int)SocketError.NotConnected);
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.Send) {
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.Send) {
Buffer = buffer,
Offset = offset,
Size = size,
SockFlags = socket_flags,
};
- QueueSocketAsyncResult (writeQ, sockares.Worker, sockares);
+ QueueIOSelectorJob (writeQ, sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendCallback ((SocketAsyncResult) s, 0), sockares));
return sockares;
}
+ static void BeginSendCallback (SocketAsyncResult sockares, int sent_so_far)
+ {
+ int total = 0;
+
+ try {
+ total = Socket.Send_internal (sockares.socket.safe_handle, sockares.Buffer, sockares.Offset, sockares.Size, sockares.SockFlags, out sockares.error);
+ } catch (Exception e) {
+ sockares.Complete (e);
+ return;
+ }
+
+ if (sockares.error == 0) {
+ sent_so_far += total;
+ sockares.Offset += total;
+ sockares.Size -= total;
+
+ if (sockares.socket.is_disposed) {
+ sockares.Complete (total);
+ return;
+ }
+
+ if (sockares.Size > 0) {
+ IOSelector.Add (sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendCallback ((SocketAsyncResult) s, sent_so_far), sockares));
+ return; // Have to finish writing everything. See bug #74475.
+ }
+
+ sockares.Total = sent_so_far;
+ }
+
+ sockares.Complete (total);
+ }
+
public IAsyncResult BeginSend (IList<ArraySegment<byte>> buffers, SocketFlags socketFlags, AsyncCallback callback, object state)
{
ThrowIfDisposedAndClosed ();
if (!is_connected)
throw new SocketException ((int)SocketError.NotConnected);
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.SendGeneric) {
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.SendGeneric) {
Buffers = buffers,
SockFlags = socketFlags,
};
- QueueSocketAsyncResult (writeQ, sockares.Worker, sockares);
+ QueueIOSelectorJob (writeQ, sockares.Handle, new IOSelectorJob (IOOperation.Write, BeginSendGenericCallback, sockares));
return sockares;
}
return BeginSend (buffers, socketFlags, callback, state);
}
+ static IOAsyncCallback BeginSendGenericCallback = new IOAsyncCallback (ares => {
+ SocketAsyncResult sockares = (SocketAsyncResult) ares;
+ int total = 0;
+
+ try {
+ total = sockares.socket.Send (sockares.Buffers, sockares.SockFlags);
+ } catch (Exception e) {
+ sockares.Complete (e);
+ return;
+ }
+
+ sockares.Complete (total);
+ });
+
public int EndSend (IAsyncResult result)
{
SocketError error;
[MethodImplAttribute (MethodImplOptions.InternalCall)]
extern static int Send_internal (IntPtr sock, WSABUF[] bufarray, SocketFlags flags, out int error);
- internal static int Send_internal (SafeSocketHandle safeHandle, byte[] buf, int offset, int count, SocketFlags flags, out int error)
+ static int Send_internal (SafeSocketHandle safeHandle, byte[] buf, int offset, int count, SocketFlags flags, out int error)
{
try {
safeHandle.RegisterForBlockingSyscall ();
{
ThrowIfDisposedAndClosed ();
ThrowIfBufferNull (buffer);
- ThrowIfBufferOutOfRange (buffer, 0, buffer.Length);
-
- if (remote_end == null)
- throw new ArgumentNullException ("remote_end");
- return SendTo_nochecks (buffer, 0, buffer.Length, SocketFlags.None, remote_end);
+ return SendTo (buffer, 0, buffer.Length, SocketFlags.None, remote_end);
}
public int SendTo (byte [] buffer, SocketFlags flags, EndPoint remote_end)
{
ThrowIfDisposedAndClosed ();
ThrowIfBufferNull (buffer);
- ThrowIfBufferOutOfRange (buffer, 0, buffer.Length);
-
- if (remote_end == null)
- throw new ArgumentNullException ("remote_end");
- return SendTo_nochecks (buffer, 0, buffer.Length, flags, remote_end);
+ return SendTo (buffer, 0, buffer.Length, flags, remote_end);
}
public int SendTo (byte [] buffer, int size, SocketFlags flags, EndPoint remote_end)
{
- ThrowIfDisposedAndClosed ();
- ThrowIfBufferNull (buffer);
- ThrowIfBufferOutOfRange (buffer, 0, size);
-
- if (remote_end == null)
- throw new ArgumentNullException ("remote_end");
-
- return SendTo_nochecks (buffer, 0, size, flags, remote_end);
+ return SendTo (buffer, 0, size, flags, remote_end);
}
public int SendTo (byte [] buffer, int offset, int size, SocketFlags flags, EndPoint remote_end)
return SendTo_nochecks (buffer, offset, size, flags, remote_end);
}
- internal int SendTo_nochecks (byte [] buffer, int offset, int size, SocketFlags flags, EndPoint remote_end)
- {
- int error;
- int ret = SendTo_internal (safe_handle, buffer, offset, size, flags, remote_end.Serialize (), out error);
-
- SocketError err = (SocketError) error;
- if (err != 0) {
- if (err != SocketError.WouldBlock && err != SocketError.InProgress)
- is_connected = false;
- throw new SocketException (error);
- }
-
- is_connected = true;
- is_bound = true;
- seed_endpoint = remote_end;
-
- return ret;
- }
-
public bool SendToAsync (SocketAsyncEventArgs e)
{
// NO check is made whether e != null in MS.NET (NRE is thrown in such case)
if (e.RemoteEndPoint == null)
throw new ArgumentNullException ("remoteEP", "Value cannot be null.");
- e.curSocket = this;
- e.Worker.Init (this, e, SocketOperation.SendTo);
+ InitSocketAsyncEventArgs (e, SendToAsyncCallback, e, SocketOperation.SendTo);
- SocketAsyncResult sockares = e.Worker.result;
- sockares.Buffer = e.Buffer;
- sockares.Offset = e.Offset;
- sockares.Size = e.Count;
- sockares.SockFlags = e.SocketFlags;
- sockares.EndPoint = e.RemoteEndPoint;
+ e.socket_async_result.Buffer = e.Buffer;
+ e.socket_async_result.Offset = e.Offset;
+ e.socket_async_result.Size = e.Count;
+ e.socket_async_result.SockFlags = e.SocketFlags;
+ e.socket_async_result.EndPoint = e.RemoteEndPoint;
- QueueSocketAsyncResult (writeQ, e.Worker, sockares);
+ QueueIOSelectorJob (writeQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendToCallback ((SocketAsyncResult) s, 0), e.socket_async_result));
return true;
}
+ static AsyncCallback SendToAsyncCallback = new AsyncCallback (ares => {
+ SocketAsyncEventArgs e = (SocketAsyncEventArgs) ((SocketAsyncResult) ares).AsyncState;
+
+ if (Interlocked.Exchange (ref e.in_progress, 0) != 1)
+ throw new InvalidOperationException ("No operation in progress");
+
+ try {
+ e.BytesTransferred = e.current_socket.EndSendTo (ares);
+ } catch (SocketException ex) {
+ e.SocketError = ex.SocketErrorCode;
+ } catch (ObjectDisposedException) {
+ e.SocketError = SocketError.OperationAborted;
+ } finally {
+ e.Complete ();
+ }
+ });
public IAsyncResult BeginSendTo(byte[] buffer, int offset, int size, SocketFlags socket_flags, EndPoint remote_end, AsyncCallback callback, object state)
{
ThrowIfBufferNull (buffer);
ThrowIfBufferOutOfRange (buffer, offset, size);
- SocketAsyncResult sockares = new SocketAsyncResult (this, state, callback, SocketOperation.SendTo) {
+ SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.SendTo) {
Buffer = buffer,
Offset = offset,
Size = size,
EndPoint = remote_end,
};
- QueueSocketAsyncResult (writeQ, sockares.Worker, sockares);
+ QueueIOSelectorJob (writeQ, sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendToCallback ((SocketAsyncResult) s, 0), sockares));
return sockares;
}
+ static void BeginSendToCallback (SocketAsyncResult sockares, int sent_so_far)
+ {
+ int total = 0;
+ try {
+ total = sockares.socket.SendTo_nochecks (sockares.Buffer, sockares.Offset, sockares.Size, sockares.SockFlags, sockares.EndPoint);
+
+ if (sockares.error == 0) {
+ sent_so_far += total;
+ sockares.Offset += total;
+ sockares.Size -= total;
+ }
+
+ if (sockares.Size > 0) {
+ IOSelector.Add (sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendToCallback ((SocketAsyncResult) s, sent_so_far), sockares));
+ return; // Have to finish writing everything. See bug #74475.
+ }
+
+ sockares.Total = sent_so_far;
+ } catch (Exception e) {
+ sockares.Complete (e);
+ return;
+ }
+
+ sockares.Complete ();
+ }
+
public int EndSendTo (IAsyncResult result)
{
ThrowIfDisposedAndClosed ();
return sockares.Total;
}
+ int SendTo_nochecks (byte [] buffer, int offset, int size, SocketFlags flags, EndPoint remote_end)
+ {
+ int error;
+ int ret = SendTo_internal (safe_handle, buffer, offset, size, flags, remote_end.Serialize (), out error);
+
+ SocketError err = (SocketError) error;
+ if (err != 0) {
+ if (err != SocketError.WouldBlock && err != SocketError.InProgress)
+ is_connected = false;
+ throw new SocketException (error);
+ }
+
+ is_connected = true;
+ is_bound = true;
+ seed_endpoint = remote_end;
+
+ return ret;
+ }
+
static int SendTo_internal (SafeSocketHandle safeHandle, byte[] buffer, int offset, int count, SocketFlags flags, SocketAddress sa, out int error)
{
try {
}
[MethodImplAttribute (MethodImplOptions.InternalCall)]
- extern static void Shutdown_internal (IntPtr socket, SocketShutdown how, out int error);
+ internal extern static void Shutdown_internal (IntPtr socket, SocketShutdown how, out int error);
#endregion
return sockares;
}
- void QueueSocketAsyncResult (Queue<SocketAsyncWorker> queue, SocketAsyncWorker worker, SocketAsyncResult sockares)
+ void QueueIOSelectorJob (Queue<KeyValuePair<IntPtr, IOSelectorJob>> queue, IntPtr handle, IOSelectorJob job)
{
int count;
lock (queue) {
- queue.Enqueue (worker);
+ queue.Enqueue (new KeyValuePair<IntPtr, IOSelectorJob> (handle, job));
count = queue.Count;
}
if (count == 1)
- socket_pool_queue (SocketAsyncWorker.Dispatcher, sockares);
+ IOSelector.Add (handle, job);
+ }
+
+ void InitSocketAsyncEventArgs (SocketAsyncEventArgs e, AsyncCallback callback, object state, SocketOperation operation)
+ {
+ e.socket_async_result.Init (this, callback, state, operation);
+
+ e.current_socket = this;
+ e.SetLastOperation (SocketOperationToSocketAsyncOperation (operation));
+ e.SocketError = SocketError.Success;
+ e.BytesTransferred = 0;
+ }
+
+ SocketAsyncOperation SocketOperationToSocketAsyncOperation (SocketOperation op)
+ {
+ switch (op) {
+ case SocketOperation.Connect:
+ return SocketAsyncOperation.Connect;
+ case SocketOperation.Accept:
+ return SocketAsyncOperation.Accept;
+ case SocketOperation.Disconnect:
+ return SocketAsyncOperation.Disconnect;
+ case SocketOperation.Receive:
+ case SocketOperation.ReceiveGeneric:
+ return SocketAsyncOperation.Receive;
+ case SocketOperation.ReceiveFrom:
+ return SocketAsyncOperation.ReceiveFrom;
+ case SocketOperation.Send:
+ case SocketOperation.SendGeneric:
+ return SocketAsyncOperation.Send;
+ case SocketOperation.SendTo:
+ return SocketAsyncOperation.SendTo;
+ default:
+ throw new NotImplementedException (String.Format ("Operation {0} is not implemented", op));
+ }
}
[StructLayout (LayoutKind.Sequential)]
internal static extern void cancel_blocking_socket_operation (Thread thread);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
- internal static extern void socket_pool_queue (SocketAsyncCallback d, SocketAsyncResult r);
+ internal static extern bool SupportsPortReuse ();
}
}