Bypass BeginInvoke for asynch. operations
authorGonzalo Paniagua Javier <gonzalo.mono@gmail.com>
Sun, 26 Sep 2010 05:54:22 +0000 (01:54 -0400)
committerGonzalo Paniagua Javier <gonzalo.mono@gmail.com>
Sun, 26 Sep 2010 05:58:38 +0000 (01:58 -0400)
The code involved in a delegate BeginInvoke is too heavy and of no use
for socket asynchronous operations. With these changes, we bypass all
that and send the work items directly to the asynchronous IO queue,
improving performance and lowering memory use.

mcs/class/System/System.Net.Sockets/Socket.cs
mcs/class/System/System.Net.Sockets/Socket_2_1.cs
mono/metadata/icall-def.h
mono/metadata/socket-io.h
mono/metadata/threadpool.c
mono/metadata/threadpool.h

index 191ae5911e98e5086c1b8ae5b90423c3a91f04ca..26b640f79cd84df7f3df1ea2283d2cec2b0d1802 100644 (file)
@@ -525,13 +525,12 @@ namespace System.Net.Sockets
                        }
 
                        e.curSocket = this;
-                       e.Worker.Init (this, null, e.AcceptCallback, SocketOperation.Accept);
-                       SocketAsyncCall sac = new SocketAsyncCall (e.Worker.Accept);
-                       sac.BeginInvoke (null, e.Worker.result);
+                       Worker w = e.Worker;
+                       w.Init (this, null, e.AcceptCallback, SocketOperation.Accept);
+                       socket_pool_queue (w.Accept, w.result);
                        return true;
                }
 #endif
-
                // Creates a new system socket, returning the handle
                [MethodImplAttribute(MethodImplOptions.InternalCall)]
                private extern static IntPtr Accept_internal(IntPtr sock, out int error, bool blocking);
@@ -611,9 +610,8 @@ namespace System.Net.Sockets
 
                        SocketAsyncResult req = new SocketAsyncResult (this, state, callback, SocketOperation.Accept);
                        Worker worker = new Worker (req);
-                       SocketAsyncCall sac = new SocketAsyncCall (worker.Accept);
-                       sac.BeginInvoke (null, req);
-                       return(req);
+                       socket_pool_queue (worker.Accept, req);
+                       return req;
                }
 
                public IAsyncResult BeginAccept (int receiveSize,
@@ -628,15 +626,12 @@ namespace System.Net.Sockets
 
                        SocketAsyncResult req = new SocketAsyncResult (this, state, callback, SocketOperation.AcceptReceive);
                        Worker worker = new Worker (req);
-                       SocketAsyncCall sac = new SocketAsyncCall (worker.AcceptReceive);
-                       
                        req.Buffer = new byte[receiveSize];
                        req.Offset = 0;
                        req.Size = receiveSize;
                        req.SockFlags = SocketFlags.None;
-
-                       sac.BeginInvoke (null, req);
-                       return(req);
+                       socket_pool_queue (worker.AcceptReceive, req);
+                       return req;
                }
 
                public IAsyncResult BeginAccept (Socket acceptSocket,
@@ -668,15 +663,12 @@ namespace System.Net.Sockets
                        
                        SocketAsyncResult req = new SocketAsyncResult (this, state, callback, SocketOperation.AcceptReceive);
                        Worker worker = new Worker (req);
-                       SocketAsyncCall sac = new SocketAsyncCall (worker.AcceptReceive);
-                       
                        req.Buffer = new byte[receiveSize];
                        req.Offset = 0;
                        req.Size = receiveSize;
                        req.SockFlags = SocketFlags.None;
                        req.AcceptSocket = acceptSocket;
-
-                       sac.BeginInvoke (null, req);
+                       socket_pool_queue (worker.AcceptReceive, req);
                        return(req);
                }
 
@@ -721,8 +713,7 @@ namespace System.Net.Sockets
                                // continue asynch
                                connected = false;
                                Worker worker = new Worker (req);
-                               SocketAsyncCall sac = new SocketAsyncCall (worker.Connect);
-                               sac.BeginInvoke (null, req);
+                               socket_pool_queue (worker.Connect, req);
                        }
 
                        return(req);
@@ -772,8 +763,7 @@ namespace System.Net.Sockets
                        
                        connected = false;
                        Worker worker = new Worker (req);
-                       SocketAsyncCall sac = new SocketAsyncCall (worker.Connect);
-                       sac.BeginInvoke (null, req);
+                       socket_pool_queue (worker.Connect, req);
                        
                        return(req);
                }
@@ -810,8 +800,7 @@ namespace System.Net.Sockets
                        req.ReuseSocket = reuseSocket;
                        
                        Worker worker = new Worker (req);
-                       SocketAsyncCall sac = new SocketAsyncCall (worker.Disconnect);
-                       sac.BeginInvoke (null, req);
+                       socket_pool_queue (worker.Disconnect, req);
                        
                        return(req);
                }
@@ -840,14 +829,13 @@ namespace System.Net.Sockets
                        req.Size = size;
                        req.SockFlags = socket_flags;
                        Worker worker = new Worker (req);
+                       int count;
                        lock (readQ) {
                                readQ.Enqueue (worker);
-                               if (readQ.Count == 1) {
-                                       SocketAsyncCall sac = new SocketAsyncCall (worker.Receive);
-                                       sac.BeginInvoke (null, req);
-                               }
+                               count = readQ.Count;
                        }
-
+                       if (count == 1)
+                               socket_pool_queue (worker.Receive, req);
                        return req;
                }
 
@@ -884,15 +872,14 @@ namespace System.Net.Sockets
                        req.Buffers = buffers;
                        req.SockFlags = socketFlags;
                        Worker worker = new Worker (req);
+                       int count;
                        lock(readQ) {
                                readQ.Enqueue (worker);
-                               if (readQ.Count == 1) {
-                                       SocketAsyncCall sac = new SocketAsyncCall (worker.ReceiveGeneric);
-                                       sac.BeginInvoke (null, req);
-                               }
+                               count = readQ.Count;
                        }
-                       
-                       return(req);
+                       if (count == 1)
+                               socket_pool_queue (worker.ReceiveGeneric, req);
+                       return req;
                }
                
                [CLSCompliant (false)]
@@ -937,13 +924,13 @@ namespace System.Net.Sockets
                        req.SockFlags = socket_flags;
                        req.EndPoint = remote_end;
                        Worker worker = new Worker (req);
+                       int count;
                        lock (readQ) {
                                readQ.Enqueue (worker);
-                               if (readQ.Count == 1) {
-                                       SocketAsyncCall sac = new SocketAsyncCall (worker.ReceiveFrom);
-                                       sac.BeginInvoke (null, req);
-                               }
+                               count = readQ.Count;
                        }
+                       if (count == 1)
+                               socket_pool_queue (worker.ReceiveFrom, req);
                        return req;
                }
 
@@ -998,13 +985,13 @@ namespace System.Net.Sockets
                        req.Size = size;
                        req.SockFlags = socket_flags;
                        Worker worker = new Worker (req);
+                       int count;
                        lock (writeQ) {
                                writeQ.Enqueue (worker);
-                               if (writeQ.Count == 1) {
-                                       SocketAsyncCall sac = new SocketAsyncCall (worker.Send);
-                                       sac.BeginInvoke (null, req);
-                               }
+                               count = writeQ.Count;
                        }
+                       if (count == 1)
+                               socket_pool_queue (worker.Send, req);
                        return req;
                }
 
@@ -1044,15 +1031,14 @@ namespace System.Net.Sockets
                        req.Buffers = buffers;
                        req.SockFlags = socketFlags;
                        Worker worker = new Worker (req);
+                       int count;
                        lock (writeQ) {
                                writeQ.Enqueue (worker);
-                               if (writeQ.Count == 1) {
-                                       SocketAsyncCall sac = new SocketAsyncCall (worker.SendGeneric);
-                                       sac.BeginInvoke (null, req);
-                               }
+                               count = writeQ.Count;
                        }
-                       
-                       return(req);
+                       if (count == 1)
+                               socket_pool_queue (worker.SendGeneric, req);
+                       return req;
                }
 
                [CLSCompliant (false)]
@@ -1172,13 +1158,13 @@ namespace System.Net.Sockets
                        req.SockFlags = socket_flags;
                        req.EndPoint = remote_end;
                        Worker worker = new Worker (req);
+                       int count;
                        lock (writeQ) {
                                writeQ.Enqueue (worker);
-                               if (writeQ.Count == 1) {
-                                       SocketAsyncCall sac = new SocketAsyncCall (worker.SendTo);
-                                       sac.BeginInvoke (null, req);
-                               }
+                               count = writeQ.Count;
                        }
+                       if (count == 1)
+                               socket_pool_queue (worker.SendTo, req);
                        return req;
                }
 
@@ -1294,8 +1280,7 @@ namespace System.Net.Sockets
 
                        e.curSocket = this;
                        e.Worker.Init (this, null, e.DisconnectCallback, SocketOperation.Disconnect);
-                       SocketAsyncCall sac = new SocketAsyncCall (e.Worker.Disconnect);
-                       sac.BeginInvoke (null, e.Worker.result);
+                       socket_pool_queue (e.Worker.Disconnect, e.Worker.result);
                        return true;
                }
 #endif
@@ -1736,13 +1721,13 @@ namespace System.Net.Sockets
                        res.EndPoint = e.RemoteEndPoint;
                        res.SockFlags = e.SocketFlags;
                        Worker worker = new Worker (e);
+                       int count;
                        lock (readQ) {
                                readQ.Enqueue (worker);
-                               if (readQ.Count == 1) {
-                                       SocketAsyncCall sac = new SocketAsyncCall (e.Worker.ReceiveFrom);
-                                       sac.BeginInvoke (null, res);
-                               }
+                               count = readQ.Count;
                        }
+                       if (count == 1)
+                               socket_pool_queue (e.Worker.ReceiveFrom, res);
                        return true;
                }
 #endif
@@ -2074,14 +2059,13 @@ namespace System.Net.Sockets
                        res.SockFlags = e.SocketFlags;
                        res.EndPoint = e.RemoteEndPoint;
                        Worker worker = new Worker (e);
+                       int count;
                        lock (writeQ) {
                                writeQ.Enqueue (worker);
-                               if (writeQ.Count == 1) {
-                                       SocketAsyncCall sac = new SocketAsyncCall (e.Worker.SendTo);
-                                       sac.BeginInvoke (null, res);
-                               }
+                               count = writeQ.Count;
                        }
-                       // We always return true for now
+                       if (count == 1)
+                               socket_pool_queue (e.Worker.SendTo, res);
                        return true;
                }
 #endif
index 29792ab7668f6f6f9c9ac0f6759821acb4b2d294..ab8403534653e713a7abaece2684cb4d40a7c78d 100644 (file)
@@ -123,8 +123,13 @@ namespace System.Net.Sockets {
                        public void Init (Socket sock, object state, AsyncCallback callback, SocketOperation operation)
                        {
                                this.Sock = sock;
-                               this.blocking = sock.blocking;
-                               this.handle = sock.socket;
+                               if (sock != null) {
+                                       this.blocking = sock.blocking;
+                                       this.handle = sock.socket;
+                               } else {
+                                       this.blocking = true;
+                                       this.handle = IntPtr.Zero;
+                               }
                                this.state = state;
                                this.callback = callback;
                                this.operation = operation;
@@ -242,7 +247,7 @@ namespace System.Net.Sockets {
                                        }
 
                                        if (sac != null)
-                                               sac.BeginInvoke (null, worker.result);
+                                               Socket.socket_pool_queue (sac, worker.result);
                                }
 
                                if (callback != null)
@@ -618,8 +623,7 @@ namespace System.Net.Sockets {
                                        }
 
                                        if (result.Size > 0) {
-                                               SocketAsyncCall sac = new SocketAsyncCall (this.Send);
-                                               sac.BeginInvoke (null, result);
+                                               Socket.socket_pool_queue (this.Send, result);
                                                return; // Have to finish writing everything. See bug #74475.
                                        }
                                        result.Total = send_so_far;
@@ -640,8 +644,7 @@ namespace System.Net.Sockets {
 
                                        UpdateSendValues (total);
                                        if (result.Size > 0) {
-                                               SocketAsyncCall sac = new SocketAsyncCall (this.SendTo);
-                                               sac.BeginInvoke (null, result);
+                                               Socket.socket_pool_queue (this.SendTo, result);
                                                return; // Have to finish writing everything. See bug #74475.
                                        }
                                        result.Total = send_so_far;
@@ -1191,13 +1194,16 @@ namespace System.Net.Sockets {
                        }
                        res.SockFlags = e.SocketFlags;
                        Worker worker = new Worker (e);
+                       int count;
                        lock (readQ) {
                                readQ.Enqueue (worker);
-                               if (readQ.Count == 1) {
-                                       SocketAsyncCall sac = new SocketAsyncCall (e.Worker.Receive);
-                                       sac.BeginInvoke (null, res);
-                               }
+                               count = readQ.Count;
+                       }
+                       if (count == 1) {
+                               // Receive takes care of ReceiveGeneric
+                               socket_pool_queue (e.Worker.Receive, res);
                        }
+
                        return true;
                }
 
@@ -1222,12 +1228,14 @@ namespace System.Net.Sockets {
                        }
                        res.SockFlags = e.SocketFlags;
                        Worker worker = new Worker (e);
+                       int count;
                        lock (writeQ) {
                                writeQ.Enqueue (worker);
-                               if (writeQ.Count == 1) {
-                                       SocketAsyncCall sac = new SocketAsyncCall (e.Worker.Send);
-                                       sac.BeginInvoke (null, res);
-                               }
+                               count = writeQ.Count;
+                       }
+                       if (count == 1) {
+                               // Send takes care of SendGeneric
+                               socket_pool_queue (e.Worker.Send, res);
                        }
                        return true;
                }
@@ -1722,6 +1730,9 @@ namespace System.Net.Sockets {
                        end_point = req.EndPoint;
                        return req.Total;
                }
+
+               [MethodImplAttribute(MethodImplOptions.InternalCall)]
+               static extern void socket_pool_queue (SocketAsyncCall d, SocketAsyncResult r);
        }
 }
 
index 9ff14881dd9e3c461964316d96b6c36897b946eb..d28632b230e3160a14c6c234a50f4bd814bdea31 100644 (file)
@@ -456,6 +456,7 @@ ICALL(SOCK_18, "SetSocketOption_internal(intptr,System.Net.Sockets.SocketOptionL
 ICALL(SOCK_19, "Shutdown_internal(intptr,System.Net.Sockets.SocketShutdown,int&)", ves_icall_System_Net_Sockets_Socket_Shutdown_internal)
 ICALL(SOCK_20, "Socket_internal(System.Net.Sockets.AddressFamily,System.Net.Sockets.SocketType,System.Net.Sockets.ProtocolType,int&)", ves_icall_System_Net_Sockets_Socket_Socket_internal)
 ICALL(SOCK_21, "WSAIoctl(intptr,int,byte[],byte[],int&)", ves_icall_System_Net_Sockets_Socket_WSAIoctl)
+ICALL(SOCK_22, "socket_pool_queue", icall_append_io_job)
 
 ICALL_TYPE(SOCKEX, "System.Net.Sockets.SocketException", SOCKEX_1)
 ICALL(SOCKEX_1, "WSAGetLastError_internal", ves_icall_System_Net_Sockets_SocketException_WSAGetLastError_internal)
index 3e0db2c1ecca2c4779cb33602b3536a894e28605..1d815d1081a6a7821920818e32f97f20ad3ca369 100644 (file)
@@ -180,6 +180,7 @@ typedef struct _MonoSocketAsyncResult {
        gint error;
        gint operation;
        MonoAsyncResult *ares;
+       gint32 end_called;
 } MonoSocketAsyncResult;
 
 typedef struct
index b9e41789dfbcb3c2bbbbcda0d102fd2f955792bc..a7f85eb7cb102aeef13c5ad1d53b2944502cb68e 100644 (file)
@@ -1254,6 +1254,19 @@ mono_thread_pool_init ()
 #endif
 }
 
+void
+icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state)
+{
+       MonoDomain *domain = mono_domain_get ();
+       MonoAsyncResult *ares;
+
+       /* Don't call mono_async_result_new() to avoid capturing the context */
+       ares = (MonoAsyncResult *) mono_object_new (domain, mono_defaults.asyncresult_class);
+       MONO_OBJECT_SETREF (ares, async_delegate, target);
+       MONO_OBJECT_SETREF (ares, async_state, state);
+       socket_io_add (ares, state);
+}
+
 MonoAsyncResult *
 mono_thread_pool_add (MonoObject *target, MonoMethodMessage *msg, MonoDelegate *async_callback,
                      MonoObject *state)
index 003ba6a124f5ea35b5809483a501394d971b4040..c860b7ebfca3face33fa6e8292aab026fc691878 100644 (file)
@@ -3,11 +3,13 @@
 
 #include <mono/metadata/object-internals.h>
 #include <mono/metadata/reflection.h>
+#include <mono/metadata/socket-io.h>
 
 /* No managed code here */
 void mono_thread_pool_init (void) MONO_INTERNAL;
 
 void icall_append_job (MonoObject *ar) MONO_INTERNAL;
+void icall_append_io_job (MonoObject *target, MonoSocketAsyncResult *state) MONO_INTERNAL;
 MonoAsyncResult *
 mono_thread_pool_add     (MonoObject *target, MonoMethodMessage *msg, 
                          MonoDelegate *async_callback, MonoObject *state) MONO_INTERNAL;