[Socket] Use SemaphoreSlim instead of Queue (#3633)
authorLudovic Henry <ludovic@xamarin.com>
Sun, 25 Sep 2016 17:54:07 +0000 (19:54 +0200)
committerGitHub <noreply@github.com>
Sun, 25 Sep 2016 17:54:07 +0000 (19:54 +0200)
This uses the same approach as corefx's FileStream. This simplifies the code, as well as make the intent of the queue clearer.

mcs/class/System/System.Net.Sockets/Socket.cs
mcs/class/System/System.Net.Sockets/SocketAsyncResult.cs

index d17dc34f6fdaf7b7b63bb1d95b38b7a6315699ee..7ac0ba39f4696065f56a9491b7c4a61191580d02 100644 (file)
@@ -86,8 +86,8 @@ namespace System.Net.Sockets
                 */
                internal EndPoint seed_endpoint = null;
 
-               internal Queue<KeyValuePair<IntPtr, IOSelectorJob>> readQ = new Queue<KeyValuePair<IntPtr, IOSelectorJob>> (2);
-               internal Queue<KeyValuePair<IntPtr, IOSelectorJob>> writeQ = new Queue<KeyValuePair<IntPtr, IOSelectorJob>> (2);
+               internal SemaphoreSlim ReadSem = new SemaphoreSlim (1, 1);
+               internal SemaphoreSlim WriteSem = new SemaphoreSlim (1, 1);
 
                internal bool is_blocking = true;
                internal bool is_bound;
@@ -706,7 +706,7 @@ namespace System.Net.Sockets
 
                        InitSocketAsyncEventArgs (e, AcceptAsyncCallback, e, SocketOperation.Accept);
 
-                       QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptCallback, e.socket_async_result));
+                       QueueIOSelectorJob (ReadSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptCallback, e.socket_async_result));
 
                        return true;
                }
@@ -739,7 +739,7 @@ namespace System.Net.Sockets
 
                        SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.Accept);
 
-                       QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptCallback, sockares));
+                       QueueIOSelectorJob (ReadSem, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptCallback, sockares));
 
                        return sockares;
                }
@@ -792,7 +792,7 @@ namespace System.Net.Sockets
                                AcceptSocket = acceptSocket,
                        };
 
-                       QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptReceiveCallback, sockares));
+                       QueueIOSelectorJob (ReadSem, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptReceiveCallback, sockares));
 
                        return sockares;
                }
@@ -1553,7 +1553,7 @@ namespace System.Net.Sockets
 
                                e.socket_async_result.Buffers = e.BufferList;
 
-                               QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveGenericCallback, e.socket_async_result));
+                               QueueIOSelectorJob (ReadSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveGenericCallback, e.socket_async_result));
                        } else {
                                InitSocketAsyncEventArgs (e, ReceiveAsyncCallback, e, SocketOperation.Receive);
 
@@ -1561,7 +1561,7 @@ namespace System.Net.Sockets
                                e.socket_async_result.Offset = e.Offset;
                                e.socket_async_result.Size = e.Count;
 
-                               QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveCallback, e.socket_async_result));
+                               QueueIOSelectorJob (ReadSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveCallback, e.socket_async_result));
                        }
 
                        return true;
@@ -1602,7 +1602,7 @@ namespace System.Net.Sockets
                                SockFlags = socketFlags,
                        };
 
-                       QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveCallback, sockares));
+                       QueueIOSelectorJob (ReadSem, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveCallback, sockares));
 
                        return sockares;
                }
@@ -1637,7 +1637,7 @@ namespace System.Net.Sockets
                                SockFlags = socketFlags,
                        };
 
-                       QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveGenericCallback, sockares));
+                       QueueIOSelectorJob (ReadSem, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveGenericCallback, sockares));
 
                        return sockares;
                }
@@ -1777,7 +1777,7 @@ namespace System.Net.Sockets
                        e.socket_async_result.EndPoint = e.RemoteEndPoint;
                        e.socket_async_result.SockFlags = e.SocketFlags;
 
-                       QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveFromCallback, e.socket_async_result));
+                       QueueIOSelectorJob (ReadSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveFromCallback, e.socket_async_result));
 
                        return true;
                }
@@ -1816,7 +1816,7 @@ namespace System.Net.Sockets
                                EndPoint = remote_end,
                        };
 
-                       QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveFromCallback, sockares));
+                       QueueIOSelectorJob (ReadSem, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveFromCallback, sockares));
 
                        return sockares;
                }
@@ -2019,7 +2019,7 @@ m_Handle, buffer, offset + sent, size - sent, socketFlags, out nativeError);
 
                                e.socket_async_result.Buffers = e.BufferList;
 
-                               QueueIOSelectorJob (writeQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, BeginSendGenericCallback, e.socket_async_result));
+                               QueueIOSelectorJob (WriteSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, BeginSendGenericCallback, e.socket_async_result));
                        } else {
                                InitSocketAsyncEventArgs (e, SendAsyncCallback, e, SocketOperation.Send);
 
@@ -2027,7 +2027,7 @@ m_Handle, buffer, offset + sent, size - sent, socketFlags, out nativeError);
                                e.socket_async_result.Offset = e.Offset;
                                e.socket_async_result.Size = e.Count;
 
-                               QueueIOSelectorJob (writeQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendCallback ((SocketAsyncResult) s, 0), e.socket_async_result));
+                               QueueIOSelectorJob (WriteSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendCallback ((SocketAsyncResult) s, 0), e.socket_async_result));
                        }
 
                        return true;
@@ -2070,7 +2070,7 @@ m_Handle, buffer, offset + sent, size - sent, socketFlags, out nativeError);
                                SockFlags = socketFlags,
                        };
 
-                       QueueIOSelectorJob (writeQ, sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendCallback ((SocketAsyncResult) s, 0), sockares));
+                       QueueIOSelectorJob (WriteSem, sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendCallback ((SocketAsyncResult) s, 0), sockares));
 
                        return sockares;
                }
@@ -2127,7 +2127,7 @@ m_Handle, buffer, offset + sent, size - sent, socketFlags, out nativeError);
                                SockFlags = socketFlags,
                        };
 
-                       QueueIOSelectorJob (writeQ, sockares.Handle, new IOSelectorJob (IOOperation.Write, BeginSendGenericCallback, sockares));
+                       QueueIOSelectorJob (WriteSem, sockares.Handle, new IOSelectorJob (IOOperation.Write, BeginSendGenericCallback, sockares));
 
                        return sockares;
                }
@@ -2245,7 +2245,7 @@ m_Handle, buffer, offset + sent, size - sent, socketFlags, out nativeError);
                        e.socket_async_result.SockFlags = e.SocketFlags;
                        e.socket_async_result.EndPoint = e.RemoteEndPoint;
 
-                       QueueIOSelectorJob (writeQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendToCallback ((SocketAsyncResult) s, 0), e.socket_async_result));
+                       QueueIOSelectorJob (WriteSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendToCallback ((SocketAsyncResult) s, 0), e.socket_async_result));
 
                        return true;
                }
@@ -2281,7 +2281,7 @@ m_Handle, buffer, offset + sent, size - sent, socketFlags, out nativeError);
                                EndPoint = remote_end,
                        };
 
-                       QueueIOSelectorJob (writeQ, sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendToCallback ((SocketAsyncResult) s, 0), sockares));
+                       QueueIOSelectorJob (WriteSem, sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendToCallback ((SocketAsyncResult) s, 0), sockares));
 
                        return sockares;
                }
@@ -2848,16 +2848,16 @@ m_Handle, buffer, offset + sent, size - sent, socketFlags, out nativeError);
                        return sockares;
                }
 
-               void QueueIOSelectorJob (Queue<KeyValuePair<IntPtr, IOSelectorJob>> queue, IntPtr handle, IOSelectorJob job)
+               void QueueIOSelectorJob (SemaphoreSlim sem, IntPtr handle, IOSelectorJob job)
                {
-                       int count;
-                       lock (queue) {
-                               queue.Enqueue (new KeyValuePair<IntPtr, IOSelectorJob> (handle, job));
-                               count = queue.Count;
-                       }
+                       sem.WaitAsync ().ContinueWith (t => {
+                               if (CleanedUp) {
+                                       job.MarkDisposed ();
+                                       return;
+                               }
 
-                       if (count == 1)
                                IOSelector.Add (handle, job);
+                       });
                }
 
                void InitSocketAsyncEventArgs (SocketAsyncEventArgs e, AsyncCallback callback, object state, SocketOperation operation)
index b0e806cccd35e368b4755d95e505599d014f2912..c63b1781660d77a48c9dd8b8f77dc1aae07f7a8a 100644 (file)
@@ -150,42 +150,20 @@ namespace System.Net.Sockets
                                ThreadPool.UnsafeQueueUserWorkItem (_ => callback (this), null);
                        }
 
-                       Queue<KeyValuePair<IntPtr, IOSelectorJob>> queue = null;
                        switch (operation) {
                        case SocketOperation.Receive:
                        case SocketOperation.ReceiveFrom:
                        case SocketOperation.ReceiveGeneric:
                        case SocketOperation.Accept:
-                               queue = socket.readQ;
+                               socket.ReadSem.Release ();
                                break;
                        case SocketOperation.Send:
                        case SocketOperation.SendTo:
                        case SocketOperation.SendGeneric:
-                               queue = socket.writeQ;
+                               socket.WriteSem.Release ();
                                break;
                        }
 
-                       if (queue != null) {
-                               lock (queue) {
-                                       /* queue.Count will only be 0 if the socket is closed while receive/send/accept
-                                        * operation(s) are pending and at least one call to this method is waiting
-                                        * on the lock while another one calls CompleteAllOnDispose() */
-                                       if (queue.Count > 0)
-                                               queue.Dequeue (); /* remove ourselves */
-                                       if (queue.Count > 0) {
-                                               if (!socket.CleanedUp) {
-                                                       IOSelector.Add (queue.Peek ().Key, queue.Peek ().Value);
-                                               } else {
-                                                       /* CompleteAllOnDispose */
-                                                       KeyValuePair<IntPtr, IOSelectorJob> [] jobs = queue.ToArray ();
-                                                       for (int i = 0; i < jobs.Length; i++)
-                                                               ThreadPool.QueueUserWorkItem (j => ((IOSelectorJob) j).MarkDisposed (), jobs [i].Value);
-                                                       queue.Clear ();
-                                               }
-                                       }
-                               }
-                       }
-
                        // IMPORTANT: 'callback', if any is scheduled from unmanaged code
                }