1 //------------------------------------------------------------------------------
2 // <copyright file="ConnectionPool.cs" company="Microsoft">
3 // Copyright (c) Microsoft Corporation. All rights reserved.
5 //------------------------------------------------------------------------------
10 using System.Net.Sockets;
11 using System.Collections;
12 using System.Diagnostics;
13 using System.Globalization;
14 using System.Runtime.InteropServices;
15 using System.Security;
16 using System.Security.Permissions;
17 using System.Threading;
19 internal delegate void GeneralAsyncDelegate(object request, object state);
20 internal delegate PooledStream CreateConnectionDelegate(ConnectionPool pool);
24 /// Impliments basic ConnectionPooling by pooling PooledStreams
27 internal class ConnectionPool {
34 private static TimerThread.Callback s_CleanupCallback = new TimerThread.Callback(CleanupCallbackWrapper);
35 private static TimerThread.Callback s_CancelErrorCallback = new TimerThread.Callback(CancelErrorCallbackWrapper);
36 private static TimerThread.Queue s_CancelErrorQueue = TimerThread.GetOrCreateQueue(ErrorWait);
38 private const int MaxQueueSize = (int)0x00100000;
40 // The order of these is important; we want the WaitAny call to be signaled
41 // for a free object before a creation signal. Only the index first signaled
42 // object is returned from the WaitAny call.
43 private const int SemaphoreHandleIndex = (int)0x0;
44 private const int ErrorHandleIndex = (int)0x1;
45 private const int CreationHandleIndex = (int)0x2;
47 private const int WaitTimeout = (int)0x102;
48 private const int WaitAbandoned = (int)0x80;
50 private const int ErrorWait = 5 * 1000; // 5 seconds
52 private readonly TimerThread.Queue m_CleanupQueue;
54 private State m_State;
55 private InterlockedStack m_StackOld;
56 private InterlockedStack m_StackNew;
58 private int m_WaitCount;
59 private WaitHandle[] m_WaitHandles;
61 private Exception m_ResError;
62 private volatile bool m_ErrorOccured;
64 private TimerThread.Timer m_ErrorTimer;
66 private ArrayList m_ObjectList;
67 private int m_TotalObjects;
69 private Queue m_QueuedRequests;
70 private Thread m_AsyncThread;
72 private int m_MaxPoolSize;
73 private int m_MinPoolSize;
74 private ServicePoint m_ServicePoint;
75 private CreateConnectionDelegate m_CreateConnectionCallback;
77 private Mutex CreationMutex {
79 return (Mutex) m_WaitHandles[CreationHandleIndex];
83 private ManualResetEvent ErrorEvent {
85 return (ManualResetEvent) m_WaitHandles[ErrorHandleIndex];
89 private Semaphore Semaphore {
91 return (Semaphore) m_WaitHandles[SemaphoreHandleIndex];
96 /// <para>Constructor - binds pool with a servicePoint and sets up a cleanup Timer to remove Idle Connections</para>
98 internal ConnectionPool(ServicePoint servicePoint, int maxPoolSize, int minPoolSize, int idleTimeout, CreateConnectionDelegate createConnectionCallback) : base() {
99 m_State = State.Initializing;
101 m_CreateConnectionCallback = createConnectionCallback;
102 m_MaxPoolSize = maxPoolSize;
103 m_MinPoolSize = minPoolSize;
104 m_ServicePoint = servicePoint;
108 if (idleTimeout > 0) {
109 // special case: if the timeout value is 1 then the timer thread should have a duration
110 // of 1 to avoid having the timer callback run constantly
111 m_CleanupQueue = TimerThread.GetOrCreateQueue(idleTimeout == 1 ? 1 : (idleTimeout / 2));
112 m_CleanupQueue.CreateTimer(s_CleanupCallback, this);
117 /// <para>Internal init stuff, creates stacks, queue, wait handles etc</para>
119 private void Initialize() {
120 m_StackOld = new InterlockedStack();
121 m_StackNew = new InterlockedStack();
123 m_QueuedRequests = new Queue();
125 m_WaitHandles = new WaitHandle[3];
126 m_WaitHandles[SemaphoreHandleIndex] = new Semaphore(0, MaxQueueSize);
127 m_WaitHandles[ErrorHandleIndex] = new ManualResetEvent(false);
128 m_WaitHandles[CreationHandleIndex] = new Mutex();
130 m_ErrorTimer = null; // No error yet.
132 m_ObjectList = new ArrayList();
133 m_State = State.Running;
138 /// <para>Async state object, used for storing state on async calls</para>
140 private class AsyncConnectionPoolRequest {
141 public AsyncConnectionPoolRequest(ConnectionPool pool, object owningObject, GeneralAsyncDelegate asyncCallback, int creationTimeout) {
143 OwningObject = owningObject;
144 AsyncCallback = asyncCallback;
145 CreationTimeout = creationTimeout;
147 public object OwningObject;
148 public GeneralAsyncDelegate AsyncCallback;
149 public bool Completed;
150 public ConnectionPool Pool;
151 public int CreationTimeout;
155 /// <para>Queues a AsyncConnectionPoolRequest to our queue of requests needing
156 /// a pooled stream. If an AsyncThread is not created, we create one,
157 /// and let it process the queued items</para>
159 private void QueueRequest(AsyncConnectionPoolRequest asyncRequest) {
160 lock(m_QueuedRequests) {
161 m_QueuedRequests.Enqueue(asyncRequest);
162 if (m_AsyncThread == null) {
163 m_AsyncThread = new Thread(new ThreadStart(AsyncThread));
164 m_AsyncThread.IsBackground = true;
165 m_AsyncThread.Start();
171 /// <para>Processes async queued requests that are blocked on needing a free pooled stream
172 /// works as follows:
173 /// 1. while there are blocked requests, take one out of the queue
174 /// 2. Wait for a free connection, when one becomes avail, then notify the request that its there
175 /// 3. repeat 1 until there are no more queued requests
176 /// 4. if there are no more requests waiting to for a free stream, then close down this thread
179 private void AsyncThread() {
181 while (m_QueuedRequests.Count > 0) {
182 bool continueLoop = true;
183 AsyncConnectionPoolRequest asyncState = null;
184 lock (m_QueuedRequests) {
185 asyncState = (AsyncConnectionPoolRequest) m_QueuedRequests.Dequeue();
188 WaitHandle [] localWaitHandles = m_WaitHandles;
189 PooledStream PooledStream = null;
191 while ((PooledStream == null) && continueLoop) {
192 int result = WaitHandle.WaitAny(localWaitHandles, asyncState.CreationTimeout, false);
194 Get(asyncState.OwningObject, result, ref continueLoop, ref localWaitHandles);
197 PooledStream.Activate(asyncState.OwningObject, asyncState.AsyncCallback);
198 } catch (Exception e) {
199 if(PooledStream != null){
200 PutConnection(PooledStream, asyncState.OwningObject, asyncState.CreationTimeout, false);
202 asyncState.AsyncCallback(asyncState.OwningObject, e);
206 lock(m_QueuedRequests) {
207 if (m_QueuedRequests.Count == 0) {
208 m_AsyncThread = null;
216 /// <para>Count of total pooled streams associated with this pool, including streams that are being used</para>
219 get { return(m_TotalObjects); }
223 /// <para>Our ServicePoint, used for IP resolution</para>
225 internal ServicePoint ServicePoint {
227 return m_ServicePoint;
232 /// <para>Our Max Size of outstanding pooled streams</para>
234 internal int MaxPoolSize {
236 return m_MaxPoolSize;
241 /// <para>Our Min Size of the pool to remove idled items down to</para>
243 internal int MinPoolSize {
245 return m_MinPoolSize;
250 /// <para>An Error occurred usually due to an abort</para>
252 private bool ErrorOccurred {
253 get { return m_ErrorOccured; }
256 private static void CleanupCallbackWrapper(TimerThread.Timer timer, int timeNoticed, object context)
258 ConnectionPool pThis = (ConnectionPool) context;
262 pThis.CleanupCallback();
266 pThis.m_CleanupQueue.CreateTimer(s_CleanupCallback, context);
271 /// Cleans up everything in both the old and new stack. If a connection is in use
272 /// then it will be on neither stack and it is the responsibility of the object
273 /// using that connection to clean it up when it is finished using it. This does
274 /// not clean up the ConnectionPool object and new connections can still be
275 /// created if needed in the future should this ConnectionPool object be reused
277 /// preconditions: none
279 /// postconditions: any connections not currently in use by an object will be
280 /// gracefully terminated and purged from this connection pool
282 internal void ForceCleanup()
285 Logging.Enter(Logging.Web, "ConnectionPool::ForceCleanup");
288 // If WaitOne returns false, all connections in the pool are in use
289 // so no cleanup should be performed. The last object owning
290 // a connection from the pool will perform final cleanup.
292 if (Semaphore.WaitOne(0, false)) {
293 // Try to clean up from new stack first, if there isn't anything on new
294 // then try old. When we lock the Semaphore, it gives us a license to
295 // remove only one connection from the pool but it can be from either
296 // stack since if the Semaphore is locked by another thread it means that
297 // there must have been more than one connection available in either stack
298 PooledStream pooledStream = (PooledStream)m_StackNew.Pop();
300 // no streams in stack new, there must therefore be one in stack old since we
301 // were able to acquire the semaphore
302 if(pooledStream == null) {
303 pooledStream = (PooledStream)m_StackOld.Pop();
306 Debug.Assert(pooledStream != null, "Acquired Semaphore with no connections in either stack");
307 Destroy(pooledStream);
310 // couldn't get semaphore, nothing to do here
316 Logging.Exit(Logging.Web, "ConnectionPool::ForceCleanup");
321 /// <para>This is called by a timer, to check for needed cleanup of idle pooled streams</para>
323 private void CleanupCallback()
325 // Called when the cleanup-timer ticks over.
327 // This is the automatic prunning method. Every period, we will perform a two-step
328 // process. First, for the objects above MinPool, we will obtain the semaphore for
329 // the object and then destroy it if it was on the old stack. We will continue this
330 // until we either reach MinPool size, or we are unable to obtain a free object, or
331 // until we have exhausted all the objects on the old stack. After that, push all
332 // objects on the new stack to the old stack. So, every period the objects on the
333 // old stack are destroyed and the objects on the new stack are pushed to the old
334 // stack. All objects that are currently out and in use are not on either stack.
335 // With this logic, a object is prunned if unused for at least one period but not
336 // more than two periods.
338 // Destroy free objects above MinPool size from old stack.
339 while(Count > MinPoolSize) { // While above MinPoolSize...
341 // acquiring the Semaphore gives us a license to remove one and only
342 // one connection from the pool
343 if (Semaphore.WaitOne(0, false) ) { // != WaitTimeout
344 // We obtained a objects from the semaphore.
345 PooledStream pooledStream = (PooledStream) m_StackOld.Pop();
347 if (null != pooledStream) {
348 // If we obtained one from the old stack, destroy it.
349 Destroy(pooledStream);
352 // Else we exhausted the old stack, so break
353 // and release the Semaphore to indicate that
354 // no connection was actually removed so whatever
355 // we had locked is still available.
356 Semaphore.ReleaseSemaphore();
363 // Push to the old-stack. For each free object, move object from new stack
364 // to old stack. The Semaphore guarantees that we are allowed to handle
365 // one connection at a time so moving a connection between stacks is safe since
366 // one connection is reserved for the duration of this loop and we only touch
367 // one connection at a time on the new stack
368 if(Semaphore.WaitOne(0, false)) { // != WaitTimeout
370 PooledStream pooledStream = (PooledStream) m_StackNew.Pop();
372 if (null == pooledStream)
375 GlobalLog.Assert(!pooledStream.IsEmancipated, "Pooled object not in pool.");
376 GlobalLog.Assert(pooledStream.CanBePooled, "Pooled object is not poolable.");
378 m_StackOld.Push(pooledStream);
380 // no connections were actually destroyed so signal that a connection is now
381 // available since we are no longer reserving a connection by holding the
383 Semaphore.ReleaseSemaphore();
388 /// <para>Creates a new PooledStream, performs checks as well on the new stream</para>
390 private PooledStream Create(CreateConnectionDelegate createConnectionCallback) {
391 GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::Create");
392 PooledStream newObj = null;
395 newObj = createConnectionCallback(this);
398 throw new InternalException(); // Create succeeded, but null object
400 if (!newObj.CanBePooled)
401 throw new InternalException(); // Create succeeded, but non-poolable object
403 newObj.PrePush(null);
405 lock (m_ObjectList.SyncRoot) {
406 m_ObjectList.Add(newObj);
407 m_TotalObjects = m_ObjectList.Count;
410 GlobalLog.Print("Create pooledStream#"+ValidationHelper.HashString(newObj));
413 GlobalLog.Print("Pool Exception: " + e.Message);
415 newObj = null; // set to null, so we do not return bad new object
416 // Failed to create instance
420 GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Create",ValidationHelper.HashString(newObj));
426 /// <para>Destroys a pooled stream from the pool</para>
428 private void Destroy(PooledStream pooledStream) {
429 GlobalLog.Print("Destroy pooledStream#"+ValidationHelper.HashString(pooledStream));
431 if (null != pooledStream) {
434 lock (m_ObjectList.SyncRoot) {
435 m_ObjectList.Remove(pooledStream);
436 m_TotalObjects = m_ObjectList.Count;
441 pooledStream.Dispose();
446 private static void CancelErrorCallbackWrapper(TimerThread.Timer timer, int timeNoticed, object context)
448 ((ConnectionPool) context).CancelErrorCallback();
452 /// <para>Called on error, after we waited a set amount of time from aborting</para>
454 private void CancelErrorCallback()
456 TimerThread.Timer timer = m_ErrorTimer;
457 if (timer != null && timer.Cancel())
459 m_ErrorOccured = false;
467 /// <para>Retrieves a pooled stream from the pool proper
468 /// this work by first attemting to find something in the pool on the New stack
469 /// and then trying the Old stack if something is not there availble </para>
471 private PooledStream GetFromPool(object owningObject) {
472 PooledStream res = null;
473 GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetFromPool");
474 res = (PooledStream) m_StackNew.Pop();
476 res = (PooledStream) m_StackOld.Pop();
479 // The semaphore guaranteed that a connection was available so if res is
480 // null it means that this contract has been violated somewhere
481 GlobalLog.Assert(res != null, "GetFromPool called with nothing in the pool!");
484 res.PostPop(owningObject);
485 GlobalLog.Print("GetFromGeneralPool pooledStream#"+ValidationHelper.HashString(res));
488 GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetFromPool",ValidationHelper.HashString(res));
493 /// <para>Retrieves the pooled stream out of the pool, does this by using the result
494 /// of a WaitAny as input, and then based on whether it has a mutex, event, semaphore,
495 /// or timeout decides what action to take</para>
497 private PooledStream Get(object owningObject, int result, ref bool continueLoop, ref WaitHandle [] waitHandles) {
498 PooledStream pooledStream = null;
499 GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get", result.ToString());
502 // From the WaitAny docs: "If more than one object became signaled during
503 // the call, this is the array index of the signaled object with the
504 // smallest index value of all the signaled objects." This is important
505 // so that the free object signal will be returned before a creation
510 Interlocked.Decrement(ref m_WaitCount);
511 continueLoop = false;
512 GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get","throw Timeout WebException");
513 throw new WebException(NetRes.GetWebStatusString("net_timeout", WebExceptionStatus.ConnectFailure), WebExceptionStatus.Timeout);
516 case ErrorHandleIndex:
517 // Throw the error that PoolCreateRequest stashed.
518 int newWaitCount = Interlocked.Decrement(ref m_WaitCount);
519 continueLoop = false;
520 Exception exceptionToThrow = m_ResError;
521 if (newWaitCount == 0) {
522 CancelErrorCallback();
524 throw exceptionToThrow;
526 // The creation mutex signaled, which means no connections are available in
527 // the connection pool. This means you might be able to create a connection.
528 case CreationHandleIndex:
531 // try creating a new connection
532 pooledStream = UserCreateRequest();
534 if (null != pooledStream) {
535 pooledStream.PostPop(owningObject);
536 Interlocked.Decrement(ref m_WaitCount);
537 continueLoop = false;
541 // If we were not able to create an object, check to see if
542 // we reached MaxPoolSize. If so, we will no longer wait on
543 // the CreationHandle, but instead wait for a free object or
546 // Consider changing: if we receive the CreationHandle midway into the wait
547 // period and re-wait, we will be waiting on the full period
548 if (Count >= MaxPoolSize && 0 != MaxPoolSize) {
549 if (!ReclaimEmancipatedObjects()) {
550 // modify handle array not to wait on creation mutex anymore
551 waitHandles = new WaitHandle[2];
552 waitHandles[0] = m_WaitHandles[0];
553 waitHandles[1] = m_WaitHandles[1];
560 CreationMutex.ReleaseMutex();
565 // the semaphore was signaled which can only happen
566 // when a connection has been placed in the pool
567 // so there is guaranteed available inventory
568 Interlocked.Decrement(ref m_WaitCount);
569 pooledStream = GetFromPool(owningObject);
570 continueLoop = false;
573 GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::Get",ValidationHelper.HashString(pooledStream));
578 /// <para>Aborts the queued requests to the pool</para>
580 internal void Abort() {
581 if (m_ResError == null) {
582 m_ResError = new WebException(
583 NetRes.GetWebStatusString("net_requestaborted", WebExceptionStatus.RequestCanceled),
584 WebExceptionStatus.RequestCanceled);
587 m_ErrorOccured = true;
588 m_ErrorTimer = s_CancelErrorQueue.CreateTimer(s_CancelErrorCallback, this);
592 /// <para>Attempts to create a PooledStream, by trying to get a pooled Connection,
593 /// or by creating its own new one</para>
595 internal PooledStream GetConnection(object owningObject,
596 GeneralAsyncDelegate asyncCallback,
597 int creationTimeout) {
599 PooledStream stream = null;
600 bool continueLoop = true;
601 bool async = (asyncCallback != null) ? true : false;
603 GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetConnection");
605 if(m_State != State.Running) {
606 throw new InternalException();
609 Interlocked.Increment(ref m_WaitCount);
610 WaitHandle[] localWaitHandles = m_WaitHandles;
613 result = WaitHandle.WaitAny(localWaitHandles, 0, false);
614 if (result != WaitTimeout) {
615 stream = Get(owningObject, result, ref continueLoop, ref localWaitHandles);
617 if (stream == null) {
618 GlobalLog.Print("GetConnection:"+ValidationHelper.HashString(this)+" going async");
619 AsyncConnectionPoolRequest asyncState = new AsyncConnectionPoolRequest(this, owningObject, asyncCallback, creationTimeout);
620 QueueRequest(asyncState);
623 // loop while we don't have an error/timeout and we haven't gotten a stream yet
624 while ((stream == null) && continueLoop) {
625 result = WaitHandle.WaitAny(localWaitHandles, creationTimeout, false);
626 stream = Get(owningObject, result, ref continueLoop, ref localWaitHandles);
630 if (null != stream) {
631 // if there is already a stream, then we're not going async
632 if (!stream.IsInitalizing) {
633 asyncCallback = null;
637 // If activate returns false, it is going to finish asynchronously
638 // and therefore the stream will be returned in a callback and
639 // we should not return it here (return null)
640 if (stream.Activate(owningObject, asyncCallback) == false)
644 PutConnection(stream,owningObject,creationTimeout, false);
648 throw new InternalException();
651 GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::GetConnection", ValidationHelper.HashString(stream));
657 /// Attempts to return a PooledStream to the pool. Default is that it can be reused if it can
661 internal void PutConnection(PooledStream pooledStream, object owningObject, int creationTimeout)
664 PutConnection(pooledStream, owningObject, creationTimeout, true);
669 /// Attempts to return a PooledStream to the pool. If canReuse is false, then the
670 /// connection will be destroyed even if it is marked as reusable and a new conneciton will
671 /// be created. If it is true, then the connection will still be checked to ensure that
672 /// it can be pooled and will be cleaned up if it can not for another reason.
675 internal void PutConnection(PooledStream pooledStream, object owningObject, int creationTimeout, bool canReuse) {
676 GlobalLog.Print("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutConnection");
677 if (pooledStream == null) {
678 throw new ArgumentNullException("pooledStream");
681 pooledStream.PrePush(owningObject);
683 if (m_State != State.ShuttingDown) {
684 pooledStream.Deactivate();
686 // cancel our error status, if we have no new requests waiting anymore
687 if (m_WaitCount == 0) {
688 CancelErrorCallback();
691 if (canReuse && pooledStream.CanBePooled) {
692 PutNew(pooledStream);
696 Destroy(pooledStream);
697 } finally { // Make sure to release the mutex even under error conditions.
698 // Make sure we recreate a new pooled stream, if there are requests for a stream
700 if (m_WaitCount > 0) {
701 if (!CreationMutex.WaitOne(creationTimeout, false)) {
705 pooledStream = UserCreateRequest();
706 if (null != pooledStream) {
707 PutNew(pooledStream);
710 CreationMutex.ReleaseMutex();
718 // If we're shutting down, we destroy the object.
719 Destroy(pooledStream);
725 /// <para>Places a new/reusable stream in the new stack of the pool</para>
727 private void PutNew(PooledStream pooledStream) {
728 GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutNew", "#"+ValidationHelper.HashString(pooledStream));
730 GlobalLog.Assert(null != pooledStream, "Why are we adding a null object to the pool?");
731 GlobalLog.Assert(pooledStream.CanBePooled, "Non-poolable object in pool.");
733 m_StackNew.Push(pooledStream);
734 // ensure that the semaphore's count is incremented to signal an available connection is in
736 Semaphore.ReleaseSemaphore();
737 GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::PutNew");
742 /// <para>Reclaim any pooled Streams that have seen their users/WebRequests GCed away</para>
744 private bool ReclaimEmancipatedObjects() {
745 bool emancipatedObjectFound = false;
746 GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::ReclaimEmancipatedObjects");
748 lock(m_ObjectList.SyncRoot) {
750 object[] objectList = m_ObjectList.ToArray();
751 if (null != objectList) {
753 for (int i = 0; i < objectList.Length; ++i) {
754 PooledStream pooledStream = (PooledStream) objectList[i];
756 if (null != pooledStream) {
760 Monitor.TryEnter(pooledStream, ref locked);
763 if (pooledStream.IsEmancipated) {
765 GlobalLog.Print("EmancipatedObject pooledStream#"+ValidationHelper.HashString(pooledStream));
766 PutConnection(pooledStream, null, Timeout.Infinite);
767 emancipatedObjectFound = true;
773 Monitor.Exit(pooledStream);
779 GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::ReclaimEmancipatedObjects",emancipatedObjectFound.ToString());
780 return emancipatedObjectFound;
784 /// <para>Creates a new PooledStream is allowable</para>
786 private PooledStream UserCreateRequest() {
787 // called by user when they were not able to obtain a free object but
788 // instead obtained creation mutex
789 GlobalLog.Enter("ConnectionPool#" + ValidationHelper.HashString(this) + "::UserCreateRequest");
791 PooledStream pooledStream = null;
793 if (!ErrorOccurred) {
794 if (Count < MaxPoolSize || 0 == MaxPoolSize) {
795 // If we have an odd number of total objects, reclaim any dead objects.
796 // If we did not find any objects to reclaim, create a new one.
799 if ((Count & 0x1) == 0x1 || !ReclaimEmancipatedObjects())
800 pooledStream = Create(m_CreateConnectionCallback);
803 GlobalLog.Leave("ConnectionPool#" + ValidationHelper.HashString(this) + "::UserCreateRequest", ValidationHelper.HashString(pooledStream));
810 /// <para>Used to Pool streams in a thread safe manner</para>
812 sealed internal class InterlockedStack {
813 private readonly Stack _stack = new Stack();
817 private readonly Hashtable doublepush = new Hashtable();
820 internal InterlockedStack() {
823 internal void Push(Object pooledStream) {
824 GlobalLog.Assert(null != pooledStream, "push null");
825 if (null == pooledStream) { throw new ArgumentNullException("pooledStream"); }
826 lock(_stack.SyncRoot) {
828 GlobalLog.Assert(null == doublepush[pooledStream], "object already in stack");
829 doublepush[pooledStream] = _stack.Count;
831 _stack.Push(pooledStream);
833 GlobalLog.Assert(_count+1 == _stack.Count, "push count mishandle");
835 _count = _stack.Count;
839 internal Object Pop() {
840 lock(_stack.SyncRoot) {
841 object pooledStream = null;
842 if (0 <_stack.Count) {
843 pooledStream = _stack.Pop();
845 GlobalLog.Assert(_count-1 == _stack.Count, "pop count mishandle");
846 doublepush.Remove(pooledStream);
848 _count = _stack.Count;