*/
internal EndPoint seed_endpoint = null;
- internal Queue<KeyValuePair<IntPtr, IOSelectorJob>> readQ = new Queue<KeyValuePair<IntPtr, IOSelectorJob>> (2);
- internal Queue<KeyValuePair<IntPtr, IOSelectorJob>> writeQ = new Queue<KeyValuePair<IntPtr, IOSelectorJob>> (2);
+ internal SemaphoreSlim ReadSem = new SemaphoreSlim (1, 1);
+ internal SemaphoreSlim WriteSem = new SemaphoreSlim (1, 1);
internal bool is_blocking = true;
internal bool is_bound;
InitSocketAsyncEventArgs (e, AcceptAsyncCallback, e, SocketOperation.Accept);
- QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptCallback, e.socket_async_result));
+ QueueIOSelectorJob (ReadSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptCallback, e.socket_async_result));
return true;
}
SocketAsyncResult sockares = new SocketAsyncResult (this, callback, state, SocketOperation.Accept);
- QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptCallback, sockares));
+ QueueIOSelectorJob (ReadSem, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptCallback, sockares));
return sockares;
}
AcceptSocket = acceptSocket,
};
- QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptReceiveCallback, sockares));
+ QueueIOSelectorJob (ReadSem, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginAcceptReceiveCallback, sockares));
return sockares;
}
e.socket_async_result.Buffers = e.BufferList;
- QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveGenericCallback, e.socket_async_result));
+ QueueIOSelectorJob (ReadSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveGenericCallback, e.socket_async_result));
} else {
InitSocketAsyncEventArgs (e, ReceiveAsyncCallback, e, SocketOperation.Receive);
e.socket_async_result.Offset = e.Offset;
e.socket_async_result.Size = e.Count;
- QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveCallback, e.socket_async_result));
+ QueueIOSelectorJob (ReadSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveCallback, e.socket_async_result));
}
return true;
SockFlags = socketFlags,
};
- QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveCallback, sockares));
+ QueueIOSelectorJob (ReadSem, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveCallback, sockares));
return sockares;
}
SockFlags = socketFlags,
};
- QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveGenericCallback, sockares));
+ QueueIOSelectorJob (ReadSem, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveGenericCallback, sockares));
return sockares;
}
e.socket_async_result.EndPoint = e.RemoteEndPoint;
e.socket_async_result.SockFlags = e.SocketFlags;
- QueueIOSelectorJob (readQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveFromCallback, e.socket_async_result));
+ QueueIOSelectorJob (ReadSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveFromCallback, e.socket_async_result));
return true;
}
EndPoint = remote_end,
};
- QueueIOSelectorJob (readQ, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveFromCallback, sockares));
+ QueueIOSelectorJob (ReadSem, sockares.Handle, new IOSelectorJob (IOOperation.Read, BeginReceiveFromCallback, sockares));
return sockares;
}
e.socket_async_result.Buffers = e.BufferList;
- QueueIOSelectorJob (writeQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, BeginSendGenericCallback, e.socket_async_result));
+ QueueIOSelectorJob (WriteSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, BeginSendGenericCallback, e.socket_async_result));
} else {
InitSocketAsyncEventArgs (e, SendAsyncCallback, e, SocketOperation.Send);
e.socket_async_result.Offset = e.Offset;
e.socket_async_result.Size = e.Count;
- QueueIOSelectorJob (writeQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendCallback ((SocketAsyncResult) s, 0), e.socket_async_result));
+ QueueIOSelectorJob (WriteSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendCallback ((SocketAsyncResult) s, 0), e.socket_async_result));
}
return true;
SockFlags = socketFlags,
};
- QueueIOSelectorJob (writeQ, sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendCallback ((SocketAsyncResult) s, 0), sockares));
+ QueueIOSelectorJob (WriteSem, sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendCallback ((SocketAsyncResult) s, 0), sockares));
return sockares;
}
SockFlags = socketFlags,
};
- QueueIOSelectorJob (writeQ, sockares.Handle, new IOSelectorJob (IOOperation.Write, BeginSendGenericCallback, sockares));
+ QueueIOSelectorJob (WriteSem, sockares.Handle, new IOSelectorJob (IOOperation.Write, BeginSendGenericCallback, sockares));
return sockares;
}
e.socket_async_result.SockFlags = e.SocketFlags;
e.socket_async_result.EndPoint = e.RemoteEndPoint;
- QueueIOSelectorJob (writeQ, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendToCallback ((SocketAsyncResult) s, 0), e.socket_async_result));
+ QueueIOSelectorJob (WriteSem, e.socket_async_result.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendToCallback ((SocketAsyncResult) s, 0), e.socket_async_result));
return true;
}
EndPoint = remote_end,
};
- QueueIOSelectorJob (writeQ, sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendToCallback ((SocketAsyncResult) s, 0), sockares));
+ QueueIOSelectorJob (WriteSem, sockares.Handle, new IOSelectorJob (IOOperation.Write, s => BeginSendToCallback ((SocketAsyncResult) s, 0), sockares));
return sockares;
}
return sockares;
}
- void QueueIOSelectorJob (Queue<KeyValuePair<IntPtr, IOSelectorJob>> queue, IntPtr handle, IOSelectorJob job)
+ void QueueIOSelectorJob (SemaphoreSlim sem, IntPtr handle, IOSelectorJob job)
{
- int count;
- lock (queue) {
- queue.Enqueue (new KeyValuePair<IntPtr, IOSelectorJob> (handle, job));
- count = queue.Count;
- }
+ sem.WaitAsync ().ContinueWith (t => {
+ if (CleanedUp) {
+ job.MarkDisposed ();
+ return;
+ }
- if (count == 1)
IOSelector.Add (handle, job);
+ });
}
void InitSocketAsyncEventArgs (SocketAsyncEventArgs e, AsyncCallback callback, object state, SocketOperation operation)