1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation. All rights reserved.
3 //------------------------------------------------------------
5 namespace System.ServiceModel.Channels
7 using System.Collections.Generic;
8 using System.Diagnostics;
10 using System.ServiceModel;
11 using System.ServiceModel.Diagnostics;
12 using System.Threading;
14 abstract class RequestChannel : ChannelBase, IRequestChannel
16 bool manualAddressing;
17 List<IRequestBase> outstandingRequests = new List<IRequestBase>();
20 ManualResetEvent closedEvent;
23 protected RequestChannel(ChannelManagerBase channelFactory, EndpointAddress to, Uri via, bool manualAddressing)
24 : base(channelFactory)
26 if (!manualAddressing)
30 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("to");
34 this.manualAddressing = manualAddressing;
39 protected bool ManualAddressing
43 return this.manualAddressing;
47 public EndpointAddress RemoteAddress
63 protected void AbortPendingRequests()
65 IRequestBase[] requestsToAbort = CopyPendingRequests(false);
67 if (requestsToAbort != null)
69 foreach (IRequestBase request in requestsToAbort)
76 protected IAsyncResult BeginWaitForPendingRequests(TimeSpan timeout, AsyncCallback callback, object state)
78 IRequestBase[] pendingRequests = SetupWaitForPendingRequests();
79 return new WaitForPendingRequestsAsyncResult(timeout, this, pendingRequests, callback, state);
82 protected void EndWaitForPendingRequests(IAsyncResult result)
84 WaitForPendingRequestsAsyncResult.End(result);
89 lock (outstandingRequests)
94 if (closedEvent != null)
96 this.closedEvent.Close();
102 IRequestBase[] SetupWaitForPendingRequests()
104 return this.CopyPendingRequests(true);
107 protected void WaitForPendingRequests(TimeSpan timeout)
109 IRequestBase[] pendingRequests = SetupWaitForPendingRequests();
110 if (pendingRequests != null)
112 if (!closedEvent.WaitOne(timeout, false))
114 foreach (IRequestBase request in pendingRequests)
123 IRequestBase[] CopyPendingRequests(bool createEventIfNecessary)
125 IRequestBase[] requests = null;
127 lock (outstandingRequests)
129 if (outstandingRequests.Count > 0)
131 requests = new IRequestBase[outstandingRequests.Count];
132 outstandingRequests.CopyTo(requests);
133 outstandingRequests.Clear();
135 if (createEventIfNecessary && closedEvent == null)
137 closedEvent = new ManualResetEvent(false);
145 protected void FaultPendingRequests()
147 IRequestBase[] requestsToFault = CopyPendingRequests(false);
149 if (requestsToFault != null)
151 foreach (IRequestBase request in requestsToFault)
158 public override T GetProperty<T>()
160 if (typeof(T) == typeof(IRequestChannel))
162 return (T)(object)this;
165 T baseProperty = base.GetProperty<T>();
166 if (baseProperty != null)
174 protected override void OnAbort()
176 AbortPendingRequests();
179 void ReleaseRequest(IRequestBase request)
183 // Synchronization of OnReleaseRequest is the
184 // responsibility of the concrete implementation of request.
185 request.OnReleaseRequest();
188 lock (outstandingRequests)
190 // Remove supports the connection having been removed, so don't need extra Contains() check,
191 // even though this may have been removed by Abort()
192 outstandingRequests.Remove(request);
193 if (outstandingRequests.Count == 0)
195 if (!closed && closedEvent != null)
203 void TrackRequest(IRequestBase request)
205 lock (outstandingRequests)
207 ThrowIfDisposedOrNotOpen(); // make sure that we haven't already snapshot our collection
208 outstandingRequests.Add(request);
212 public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
214 return this.BeginRequest(message, this.DefaultSendTimeout, callback, state);
217 public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
220 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("message");
222 if (timeout < TimeSpan.Zero)
223 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
224 new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
226 ThrowIfDisposedOrNotOpen();
228 AddHeadersTo(message);
229 IAsyncRequest asyncRequest = CreateAsyncRequest(message, callback, state);
230 TrackRequest(asyncRequest);
232 bool throwing = true;
235 asyncRequest.BeginSendRequest(message, timeout);
242 ReleaseRequest(asyncRequest);
249 protected abstract IRequest CreateRequest(Message message);
250 protected abstract IAsyncRequest CreateAsyncRequest(Message message, AsyncCallback callback, object state);
252 public Message EndRequest(IAsyncResult result)
256 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
259 IAsyncRequest asyncRequest = result as IAsyncRequest;
261 if (asyncRequest == null)
263 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("result", SR.GetString(SR.InvalidAsyncResult));
268 Message reply = asyncRequest.End();
270 if (DiagnosticUtility.ShouldTraceInformation)
272 TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.RequestChannelReplyReceived,
273 SR.GetString(SR.TraceCodeRequestChannelReplyReceived), reply);
280 ReleaseRequest(asyncRequest);
284 public Message Request(Message message)
286 return this.Request(message, this.DefaultSendTimeout);
289 public Message Request(Message message, TimeSpan timeout)
293 throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("message");
296 if (timeout < TimeSpan.Zero)
297 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
298 new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
300 ThrowIfDisposedOrNotOpen();
302 AddHeadersTo(message);
303 IRequest request = CreateRequest(message);
304 TrackRequest(request);
308 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
310 TimeSpan savedTimeout = timeoutHelper.RemainingTime();
313 request.SendRequest(message, savedTimeout);
315 catch (TimeoutException timeoutException)
317 throw TraceUtility.ThrowHelperError(new TimeoutException(SR.GetString(SR.RequestChannelSendTimedOut, savedTimeout),
318 timeoutException), message);
321 savedTimeout = timeoutHelper.RemainingTime();
325 reply = request.WaitForReply(savedTimeout);
327 catch (TimeoutException timeoutException)
329 throw TraceUtility.ThrowHelperError(new TimeoutException(SR.GetString(SR.RequestChannelWaitForReplyTimedOut, savedTimeout),
330 timeoutException), message);
334 if (DiagnosticUtility.ShouldTraceInformation)
336 TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.RequestChannelReplyReceived,
337 SR.GetString(SR.TraceCodeRequestChannelReplyReceived), reply);
344 ReleaseRequest(request);
348 protected virtual void AddHeadersTo(Message message)
350 if (!manualAddressing && to != null)
356 class WaitForPendingRequestsAsyncResult : AsyncResult
358 static WaitOrTimerCallback completeWaitCallBack = new WaitOrTimerCallback(OnCompleteWaitCallBack);
359 IRequestBase[] pendingRequests;
360 RequestChannel requestChannel;
362 RegisteredWaitHandle waitHandle;
364 public WaitForPendingRequestsAsyncResult(TimeSpan timeout, RequestChannel requestChannel, IRequestBase[] pendingRequests, AsyncCallback callback, object state)
365 : base(callback, state)
367 this.requestChannel = requestChannel;
368 this.pendingRequests = pendingRequests;
369 this.timeout = timeout;
371 if (this.timeout == TimeSpan.Zero || this.pendingRequests == null)
379 this.waitHandle = ThreadPool.RegisterWaitForSingleObject(this.requestChannel.closedEvent, completeWaitCallBack, this, TimeoutHelper.ToMilliseconds(timeout), true);
385 if (pendingRequests != null)
387 foreach (IRequestBase request in pendingRequests)
389 request.Abort(this.requestChannel);
396 if (requestChannel.closedEvent != null)
398 if (waitHandle != null)
400 waitHandle.Unregister(requestChannel.closedEvent);
402 requestChannel.FinishClose();
406 static void OnCompleteWaitCallBack(object state, bool timedOut)
408 WaitForPendingRequestsAsyncResult thisPtr = (WaitForPendingRequestsAsyncResult)state;
409 Exception completionException = null;
414 thisPtr.AbortRequests();
416 thisPtr.CleanupEvents();
418 #pragma warning suppress 56500 // Microsoft, transferring exception to another thread
425 completionException = e;
428 thisPtr.Complete(false, completionException);
431 public static void End(IAsyncResult result)
433 AsyncResult.End<WaitForPendingRequestsAsyncResult>(result);
438 interface IRequestBase
440 void Abort(RequestChannel requestChannel);
441 void Fault(RequestChannel requestChannel);
442 void OnReleaseRequest();
445 interface IRequest : IRequestBase
447 void SendRequest(Message message, TimeSpan timeout);
448 Message WaitForReply(TimeSpan timeout);
451 interface IAsyncRequest : IAsyncResult, IRequestBase
453 void BeginSendRequest(Message message, TimeSpan timeout);