2 // Copyright (c) Microsoft Corporation. All rights reserved.
5 namespace System.ServiceModel.Channels
8 using System.Collections.Generic;
9 using System.Diagnostics.CodeAnalysis;
10 using System.Globalization;
12 using System.Net.Sockets;
14 using System.Runtime.Diagnostics;
15 using System.ServiceModel.Diagnostics;
16 using System.Threading;
19 internal abstract class UdpOutputChannel : OutputChannel, IOutputChannel
21 private bool cleanedUp;
22 private volatile AsyncWaitHandle retransmissionDoneWaitHandle;
23 private UdpRetransmissionSettings retransmitSettings;
24 private volatile Dictionary<UniqueId, IUdpRetransmitter> retransmitList;
25 private SynchronizedRandom randomNumberGenerator;
28 public UdpOutputChannel(
29 ChannelManagerBase factory,
30 MessageEncoder encoder,
31 BufferManager bufferManager,
32 UdpSocket[] sendSockets,
33 UdpRetransmissionSettings retransmissionSettings,
38 Fx.Assert(encoder != null, "encoder shouldn't be null");
39 Fx.Assert(bufferManager != null, "buffer manager shouldn't be null");
40 Fx.Assert(sendSockets != null, "sendSockets can't be null");
41 Fx.Assert(sendSockets.Length > 0, "sendSockets can't be empty");
42 Fx.Assert(retransmissionSettings != null, "retransmissionSettings can't be null");
43 Fx.Assert(via != null, "via can't be null");
45 this.BufferManager = bufferManager;
46 this.IsMulticast = isMulticast;
47 this.Encoder = encoder;
48 this.retransmitSettings = retransmissionSettings;
49 this.SendSockets = sendSockets;
52 if (this.retransmitSettings.Enabled)
54 this.retransmitList = new Dictionary<UniqueId, IUdpRetransmitter>();
55 this.randomNumberGenerator = new SynchronizedRandom(AppDomain.CurrentDomain.GetHashCode() | Environment.TickCount);
59 private interface IUdpRetransmitter
61 bool IsMulticast { get; }
63 void CancelRetransmission();
66 public override EndpointAddress RemoteAddress
71 public override Uri Via
73 get { return this.via; }
76 internal bool IsMulticast
82 internal TimeSpan InternalSendTimeout
84 get { return this.DefaultSendTimeout; }
87 protected BufferManager BufferManager
93 protected MessageEncoder Encoder
99 protected UdpSocket[] SendSockets
105 [SuppressMessage("Microsoft.StyleCop.CSharp.ReadabilityRules", "SA1100:DoNotPrefixCallsWithBaseUnlessLocalImplementationExists", Justification = "StyleCop 4.5 does not validate this rule properly.")]
106 public override T GetProperty<T>()
108 if (typeof(T) == typeof(IOutputChannel))
110 return (T)(object)this;
113 T messageEncoderProperty = this.Encoder.GetProperty<T>();
114 if (messageEncoderProperty != null)
116 return messageEncoderProperty;
119 return base.GetProperty<T>();
122 internal void CancelRetransmission(UniqueId messageId)
124 if (messageId != null && this.retransmitList != null)
128 if (this.retransmitList != null)
130 IUdpRetransmitter retransmitter;
131 if (this.retransmitList.TryGetValue(messageId, out retransmitter))
133 this.retransmitList.Remove(messageId);
134 retransmitter.CancelRetransmission();
141 protected static void LogMessage(ref Message message, ArraySegment<byte> messageData)
143 using (XmlDictionaryReader xmlDictionaryReader = XmlDictionaryReader.CreateTextReader(messageData.Array, messageData.Offset, messageData.Count, null, XmlDictionaryReaderQuotas.Max, null))
145 MessageLogger.LogMessage(ref message, xmlDictionaryReader, MessageLoggingSource.TransportSend);
149 protected override void AddHeadersTo(Message message)
151 Fx.Assert(message != null, "Message can't be null");
153 if (message is NullMessage)
158 if (message.Version.Addressing != AddressingVersion.None)
160 if (message.Headers.MessageId == null)
162 message.Headers.MessageId = new UniqueId();
167 if (this.retransmitSettings.Enabled == true)
169 // we should only get here if some channel above us starts producing messages that don't match the encoder's message version.
170 throw FxTrace.Exception.AsError(new ProtocolException(SR.RetransmissionRequiresAddressingOnMessage(message.Version.Addressing.ToString())));
175 protected override IAsyncResult OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
177 if (message is NullMessage)
179 return new CompletedAsyncResult(callback, state);
182 return new SendAsyncResult(this, message, timeout, callback, state);
185 protected override void OnEndSend(IAsyncResult result)
187 if (result is CompletedAsyncResult)
189 CompletedAsyncResult.End(result);
193 SendAsyncResult.End(result);
197 protected override void OnSend(Message message, TimeSpan timeout)
199 if (message is NullMessage)
204 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
206 IPEndPoint remoteEndPoint;
207 UdpSocket[] sendSockets;
208 Exception exceptionToBeThrown;
209 sendSockets = this.GetSendSockets(message, out remoteEndPoint, out exceptionToBeThrown);
211 if (exceptionToBeThrown != null)
213 throw FxTrace.Exception.AsError(exceptionToBeThrown);
216 if (timeoutHelper.RemainingTime() <= TimeSpan.Zero)
218 throw FxTrace.Exception.AsError(new TimeoutException(SR.SendTimedOut(remoteEndPoint, timeout)));
221 bool returnBuffer = false;
222 ArraySegment<byte> messageData = default(ArraySegment<byte>);
224 bool sendingMulticast = UdpUtility.IsMulticastAddress(remoteEndPoint.Address);
225 SynchronousRetransmissionHelper retransmitHelper = null;
226 RetransmitIterator retransmitIterator = null;
228 bool shouldRetransmit = this.ShouldRetransmitMessage(sendingMulticast);
232 if (shouldRetransmit)
234 retransmitIterator = this.CreateRetransmitIterator(sendingMulticast);
235 retransmitHelper = new SynchronousRetransmissionHelper(sendingMulticast);
236 this.RetransmitStarting(message.Headers.MessageId, retransmitHelper);
239 messageData = this.EncodeMessage(message);
242 this.TransmitMessage(messageData, sendSockets, remoteEndPoint, timeoutHelper);
244 if (shouldRetransmit)
246 while (retransmitIterator.MoveNext())
248 // wait for currentDelay time, then retransmit
249 if (retransmitIterator.CurrentDelay > 0)
251 retransmitHelper.Wait(retransmitIterator.CurrentDelay);
254 if (retransmitHelper.IsCanceled)
260 // since we only invoke the encoder once just before the initial send of the message
261 // we need to handle logging the message in the retransmission case
262 if (MessageLogger.LogMessagesAtTransportLevel)
264 UdpOutputChannel.LogMessage(ref message, messageData);
267 this.TransmitMessage(messageData, sendSockets, remoteEndPoint, timeoutHelper);
275 this.BufferManager.ReturnBuffer(messageData.Array);
278 if (shouldRetransmit)
280 this.RetransmitStopping(message.Headers.MessageId);
282 if (retransmitHelper != null)
284 retransmitHelper.Dispose();
290 protected abstract UdpSocket[] GetSendSockets(Message message, out IPEndPoint remoteEndPoint, out Exception exceptionToBeThrown);
292 protected override void OnAbort()
294 this.Cleanup(true, TimeSpan.Zero);
297 [SuppressMessage("Microsoft.StyleCop.CSharp.ReadabilityRules", "SA1100:DoNotPrefixCallsWithBaseUnlessLocalImplementationExists", Justification = "If BeginClose is overridden we still pass base.BeginClose here")]
298 protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
300 return new CloseAsyncResult(
307 protected override void OnClose(TimeSpan timeout)
309 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
310 this.Cleanup(false, timeoutHelper.RemainingTime());
313 protected override void OnEndClose(IAsyncResult result)
315 CloseAsyncResult.End(result);
318 protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
320 this.OnOpen(timeout);
321 return new CompletedAsyncResult(callback, state);
324 protected override void OnEndOpen(IAsyncResult result)
326 CompletedAsyncResult.End(result);
329 protected override void OnOpen(TimeSpan timeout)
331 for (int i = 0; i < this.SendSockets.Length; i++)
333 this.SendSockets[i].Open();
337 protected ArraySegment<byte> EncodeMessage(Message message)
339 return this.Encoder.WriteMessage(message, int.MaxValue, this.BufferManager);
342 protected ObjectDisposedException CreateObjectDisposedException()
344 return new ObjectDisposedException(null, SR.ObjectDisposed(this.GetType().Name));
347 private RetransmitIterator CreateRetransmitIterator(bool sendingMulticast)
349 Fx.Assert(this.retransmitSettings.Enabled, "CreateRetransmitCalculator called when no retransmission set to happen");
350 int lowerBound = this.retransmitSettings.GetDelayLowerBound();
351 int upperBound = this.retransmitSettings.GetDelayUpperBound();
352 int currentDelay = this.randomNumberGenerator.Next(lowerBound, upperBound);
354 int maxDelay = this.retransmitSettings.GetMaxDelayPerRetransmission();
355 int maxRetransmitCount = sendingMulticast ? this.retransmitSettings.MaxMulticastRetransmitCount : this.retransmitSettings.MaxUnicastRetransmitCount;
357 return new RetransmitIterator(currentDelay, maxDelay, maxRetransmitCount);
360 private void RetransmitStarting(UniqueId messageId, IUdpRetransmitter retransmitter)
362 Fx.Assert(this.retransmitSettings.Enabled, "RetransmitStarting called when retransmission is disabled");
368 if (this.retransmitList.ContainsKey(messageId))
370 // someone is sending a message with the same MessageId
371 // while a retransmission is still in progress for that ID.
372 throw FxTrace.Exception.AsError(new InvalidOperationException(SR.RecycledMessageIdDuringRetransmission(messageId)));
376 this.retransmitList[messageId] = retransmitter;
381 private void RetransmitStopping(UniqueId messageId)
383 Fx.Assert(this.retransmitSettings.Enabled, "RetransmitStopping called when retransmission is disabled");
387 // Cleanup sets retransmitList to null, so check before using...
388 if (this.retransmitList != null)
390 this.retransmitList.Remove(messageId);
392 // if we are closing down, then we need to unblock the Cleanup code
393 // this.retransmissionDoneEvent only != null if on cleaning up; abort case means that it == null.
394 if (this.retransmitList.Count == 0 && this.retransmissionDoneWaitHandle != null)
396 this.retransmissionDoneWaitHandle.Set();
402 private bool ShouldRetransmitMessage(bool sendingMulticast)
404 if (sendingMulticast)
406 return this.retransmitSettings.MaxMulticastRetransmitCount > 0;
410 return this.retransmitSettings.MaxUnicastRetransmitCount > 0;
414 private void TransmitMessage(ArraySegment<byte> messageBytes, UdpSocket[] sockets, IPEndPoint remoteEndpoint, TimeoutHelper timeoutHelper)
416 Fx.Assert(messageBytes.Array != null, "message data array can't be null");
417 Fx.Assert(sockets != null, "sockets can't be null");
418 Fx.Assert(sockets.Length > 0, "sockets must contain at least one item");
419 Fx.Assert(remoteEndpoint != null, "remoteEndPoint can't be null");
421 for (int i = 0; i < sockets.Length; i++)
423 if (timeoutHelper.RemainingTime() <= TimeSpan.Zero)
425 throw FxTrace.Exception.AsError(new TimeoutException(SR.SendTimedOut(remoteEndpoint, timeoutHelper.OriginalTimeout)));
428 sockets[i].SendTo(messageBytes.Array, messageBytes.Offset, messageBytes.Count, remoteEndpoint);
432 // we're guaranteed by CommunicationObject that at most ONE of Close or BeginClose will be called once.
433 private void Cleanup(bool aborting, TimeSpan timeout)
435 bool needToWait = false;
449 if (!aborting && this.retransmitList != null && this.retransmitList.Count > 0)
452 this.retransmissionDoneWaitHandle = new AsyncWaitHandle(EventResetMode.ManualReset);
456 // copied this call here in order to avoid releasing then retaking lock
457 this.CleanupAfterWait(aborting);
463 if (!this.retransmissionDoneWaitHandle.Wait(timeout))
465 throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(timeout)));
470 this.retransmissionDoneWaitHandle = null;
472 // another thread could have called Abort while Close() was waiting for retransmission to complete.
478 this.CleanupAfterWait(aborting);
483 // must be called from within this.ThisLock
484 private void CleanupAfterWait(bool aborting)
486 Fx.Assert(!this.cleanedUp, "We should only clean up once");
488 if (this.retransmitList != null)
490 foreach (IUdpRetransmitter retransmitter in this.retransmitList.Values)
492 retransmitter.CancelRetransmission();
495 if (aborting && this.retransmissionDoneWaitHandle != null)
497 // If another thread has called close and is waiting for retransmission to complete,
498 // we need to make sure that thread gets unblocked.
499 this.retransmissionDoneWaitHandle.Set();
502 this.retransmitList = null;
505 for (int i = 0; i < this.SendSockets.Length; i++)
507 this.SendSockets[i].Close();
510 this.cleanedUp = true;
513 private class RetransmitIterator
515 private int maxDelay;
516 private int retransmitCount;
517 private int initialDelay;
519 internal RetransmitIterator(int initialDelay, int maxDelay, int retransmitCount)
521 Fx.Assert(initialDelay >= 0, "initialDelay cannot be negative");
522 Fx.Assert(maxDelay >= initialDelay, "maxDelay must be >= initialDelay");
523 Fx.Assert(retransmitCount > 0, "retransmitCount must be > 0");
525 this.CurrentDelay = -1;
526 this.initialDelay = initialDelay;
527 this.maxDelay = maxDelay;
528 this.retransmitCount = retransmitCount;
531 public int CurrentDelay
537 // should be called before each retransmission to determine if
538 // another one is needed.
539 public bool MoveNext()
541 if (this.CurrentDelay < 0)
543 this.CurrentDelay = this.initialDelay;
547 bool shouldContinue = --this.retransmitCount > 0;
549 if (shouldContinue && this.CurrentDelay < this.maxDelay)
551 this.CurrentDelay = Math.Min(this.CurrentDelay * 2, this.maxDelay);
554 return shouldContinue;
558 private sealed class SynchronousRetransmissionHelper : IUdpRetransmitter, IDisposable
560 private ManualResetEvent cancelEvent;
561 private object thisLock;
562 private bool cleanedUp;
564 public SynchronousRetransmissionHelper(bool isMulticast)
566 this.thisLock = new object();
567 this.IsMulticast = isMulticast;
568 this.cancelEvent = new ManualResetEvent(false);
571 public bool IsMulticast
577 public bool IsCanceled
583 public void Wait(int millisecondsTimeout)
585 if (this.ResetEvent())
587 // Dispose should only be called by the same thread that
588 // is calling this function, making it so that we don't need a lock here...
589 this.cancelEvent.WaitOne(millisecondsTimeout);
593 public void CancelRetransmission()
597 this.IsCanceled = true;
601 this.cancelEvent.Set();
606 public void Dispose()
612 this.cleanedUp = true;
613 this.cancelEvent.Dispose();
618 private bool ResetEvent()
622 if (!this.IsCanceled && !this.cleanedUp)
624 this.cancelEvent.Reset();
633 private class SendAsyncResult : AsyncResult, IUdpRetransmitter
635 private static AsyncCallback onSocketSendComplete = Fx.ThunkCallback(new AsyncCallback(OnSocketSendComplete));
636 private static Action<object> onRetransmitMessage = new Action<object>(OnRetransmitMessage);
638 private UdpOutputChannel channel;
639 private ArraySegment<byte> messageData;
640 private TimeoutHelper timeoutHelper;
641 private IPEndPoint remoteEndpoint;
642 private int currentSocket;
643 private UdpSocket[] sendSockets;
644 private IOThreadTimer retransmitTimer;
645 private RetransmitIterator retransmitIterator;
646 private Message message;
647 private bool retransmissionEnabled;
649 public SendAsyncResult(UdpOutputChannel channel, Message message, TimeSpan timeout, AsyncCallback callback, object state)
650 : base(callback, state)
652 this.timeoutHelper = new TimeoutHelper(timeout);
654 this.channel = channel;
655 this.message = message;
656 bool throwing = true;
657 bool completedSynchronously = false;
661 this.Initialize(message);
663 completedSynchronously = this.BeginTransmitMessage();
665 if (completedSynchronously && this.retransmissionEnabled)
667 // initial send completed sync, now we need to start the retransmission process...
668 completedSynchronously = this.BeginRetransmission();
681 if (completedSynchronously)
683 this.CompleteAndCleanup(true, null);
687 private enum RetransmitState
693 public bool IsCanceled
699 public bool IsMulticast
705 public static void End(IAsyncResult result)
707 AsyncResult.End<SendAsyncResult>(result);
710 // tries to terminate retransmission early, but won't cancel async IO immediately
711 public void CancelRetransmission()
713 this.IsCanceled = true;
716 private static void OnSocketSendComplete(IAsyncResult result)
718 if (result.CompletedSynchronously)
723 SendAsyncResult thisPtr = (SendAsyncResult)result.AsyncState;
724 bool completeSelf = false;
725 Exception completionException = null;
729 completeSelf = thisPtr.ContinueTransmitting(result);
731 if (completeSelf && thisPtr.retransmissionEnabled)
733 completeSelf = thisPtr.BeginRetransmission();
743 completionException = e;
749 thisPtr.CompleteAndCleanup(false, completionException);
753 private static void OnRetransmitMessage(object state)
755 SendAsyncResult thisPtr = (SendAsyncResult)state;
756 bool completeSelf = false;
757 Exception completionException = null;
761 completeSelf = thisPtr.ContinueRetransmission(RetransmitState.WaitCompleted);
770 completionException = e;
776 thisPtr.CompleteAndCleanup(false, completionException);
780 private bool BeginTransmitMessage()
782 this.currentSocket = 0;
783 return this.ContinueTransmitting(null);
786 private bool ContinueTransmitting(IAsyncResult socketAsyncResult)
788 while (this.currentSocket < this.sendSockets.Length)
790 if (socketAsyncResult == null)
792 socketAsyncResult = this.sendSockets[this.currentSocket].BeginSendTo(
793 this.messageData.Array,
794 this.messageData.Offset,
795 this.messageData.Count,
797 onSocketSendComplete,
800 if (!socketAsyncResult.CompletedSynchronously)
806 this.sendSockets[this.currentSocket].EndSendTo(socketAsyncResult);
808 // check for timeout after calling socket.EndSendTo
809 // so that we don't leave the socket in a bad state/leak async results
810 this.ThrowIfTimedOut();
814 // don't send on the next socket and return true to cause Complete to be called.
818 this.currentSocket++;
819 socketAsyncResult = null;
825 private bool BeginRetransmission()
827 // BeginRetransmission should only be called in the case where transmission of the message
828 // completes synchronously.
829 return this.ContinueRetransmission(RetransmitState.TransmitCompleted);
832 private bool ContinueRetransmission(RetransmitState state)
834 this.ThrowIfTimedOut();
840 case RetransmitState.TransmitCompleted:
841 if (!this.retransmitIterator.MoveNext())
843 // We are done retransmitting
847 if (this.retransmitIterator.CurrentDelay > 0)
849 this.retransmitTimer.Set(this.retransmitIterator.CurrentDelay);
853 state = RetransmitState.WaitCompleted;
855 case RetransmitState.WaitCompleted:
858 this.channel.ThrowIfAborted();
862 // since we only invoke the encoder once just before the initial send of the message
863 // we need to handle logging the message in the retransmission case
864 if (MessageLogger.LogMessagesAtTransportLevel)
866 UdpOutputChannel.LogMessage(ref this.message, this.messageData);
870 if (!this.BeginTransmitMessage())
875 state = RetransmitState.TransmitCompleted;
879 Fx.Assert("Unknown RetransmitState value encountered");
885 private void Initialize(Message message)
887 Exception exceptionToThrow;
888 this.sendSockets = this.channel.GetSendSockets(message, out this.remoteEndpoint, out exceptionToThrow);
890 if (exceptionToThrow != null)
892 throw FxTrace.Exception.AsError(exceptionToThrow);
895 this.IsMulticast = UdpUtility.IsMulticastAddress(this.remoteEndpoint.Address);
897 if (this.channel.ShouldRetransmitMessage(this.IsMulticast))
899 this.retransmissionEnabled = true;
900 this.channel.RetransmitStarting(this.message.Headers.MessageId, this);
901 this.retransmitTimer = new IOThreadTimer(onRetransmitMessage, this, false);
902 this.retransmitIterator = this.channel.CreateRetransmitIterator(this.IsMulticast);
905 this.messageData = this.channel.EncodeMessage(message);
908 private void ThrowIfTimedOut()
910 if (this.timeoutHelper.RemainingTime() <= TimeSpan.Zero)
912 throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(this.timeoutHelper.OriginalTimeout)));
916 private void Cleanup()
918 if (this.retransmissionEnabled)
920 this.channel.RetransmitStopping(this.message.Headers.MessageId);
921 this.retransmitTimer.Cancel();
924 if (this.messageData.Array != null)
926 this.channel.BufferManager.ReturnBuffer(this.messageData.Array);
927 this.messageData = default(ArraySegment<byte>);
931 private void CompleteAndCleanup(bool completedSynchronously, Exception completionException)
934 this.Complete(completedSynchronously, completionException);
938 // Control flow for async path
939 // We use this mechanism to avoid initializing two async objects as logically cleanup+close is one operation.
940 // At any point in the Begin* methods, we may go async. The steps are:
943 private class CloseAsyncResult : AsyncResult
945 private static Action<object, TimeoutException> completeCleanupCallback = new Action<object, TimeoutException>(CompleteCleanup);
947 private UdpOutputChannel channel;
948 private TimeoutHelper timeoutHelper;
950 public CloseAsyncResult(UdpOutputChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
951 : base(callback, state)
953 this.channel = channel;
954 this.timeoutHelper = new TimeoutHelper(timeout);
956 if (this.BeginCleanup())
962 internal static void End(IAsyncResult result)
964 AsyncResult.End<CloseAsyncResult>(result);
967 private static void CompleteCleanup(object state, TimeoutException exception)
969 CloseAsyncResult thisPtr = (CloseAsyncResult)state;
970 Exception completionException = null;
972 if (exception != null)
974 Fx.Assert(exception.GetType() == typeof(TimeoutException), "Exception on callback should always be TimeoutException");
975 throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(thisPtr.timeoutHelper.OriginalTimeout)));
980 lock (thisPtr.channel.ThisLock)
982 thisPtr.channel.retransmissionDoneWaitHandle = null;
984 // another thread could have called Abort while Close() was waiting for retransmission to complete.
985 if (!thisPtr.channel.cleanedUp)
987 // never aborting here
988 thisPtr.channel.CleanupAfterWait(false);
999 completionException = e;
1002 thisPtr.Complete(false, completionException);
1005 private bool BeginCleanup()
1007 bool needToWait = false;
1009 if (this.channel.cleanedUp)
1014 lock (this.channel.ThisLock)
1016 if (this.channel.cleanedUp)
1021 // we're never aborting in this case...
1022 if (this.channel.retransmitList != null && this.channel.retransmitList.Count > 0)
1025 this.channel.retransmissionDoneWaitHandle = new AsyncWaitHandle(EventResetMode.ManualReset);
1029 this.channel.CleanupAfterWait(false);
1032 // we're guaranteed by CommunicationObject that at most ONE of Close or BeginClose will be called once.
1033 // we don't null out retransmissionDoneEvent in the abort case; should be safe to use here.
1034 return !needToWait || this.channel.retransmissionDoneWaitHandle.WaitAsync(completeCleanupCallback, this, this.timeoutHelper.RemainingTime());