Updates referencesource to .NET 4.7
[mono.git] / mcs / class / referencesource / System.ServiceModel.Channels / System / ServiceModel / Channels / UdpOutputChannel.cs
1 // <copyright>
2 // Copyright (c) Microsoft Corporation.  All rights reserved.
3 // </copyright> 
4
5 namespace System.ServiceModel.Channels
6 {
7     using System;
8     using System.Collections.Generic;
9     using System.Diagnostics.CodeAnalysis;
10     using System.Globalization;
11     using System.Net;
12     using System.Net.Sockets;
13     using System.Runtime;
14     using System.Runtime.Diagnostics;
15     using System.ServiceModel.Diagnostics;
16     using System.Threading;
17     using System.Xml;
18
19     internal abstract class UdpOutputChannel : OutputChannel, IOutputChannel
20     {
21         private bool cleanedUp;
22         private volatile AsyncWaitHandle retransmissionDoneWaitHandle;
23         private UdpRetransmissionSettings retransmitSettings;
24         private volatile Dictionary<UniqueId, IUdpRetransmitter> retransmitList;
25         private SynchronizedRandom randomNumberGenerator;
26         private Uri via;
27
28         public UdpOutputChannel(
29             ChannelManagerBase factory,
30             MessageEncoder encoder,
31             BufferManager bufferManager,
32             UdpSocket[] sendSockets,
33             UdpRetransmissionSettings retransmissionSettings,
34             Uri via,
35             bool isMulticast)
36             : base(factory)
37         {
38             Fx.Assert(encoder != null, "encoder shouldn't be null");
39             Fx.Assert(bufferManager != null, "buffer manager shouldn't be null");
40             Fx.Assert(sendSockets != null, "sendSockets can't be null");
41             Fx.Assert(sendSockets.Length > 0, "sendSockets can't be empty");
42             Fx.Assert(retransmissionSettings != null, "retransmissionSettings can't be null");
43             Fx.Assert(via != null, "via can't be null");
44
45             this.BufferManager = bufferManager;
46             this.IsMulticast = isMulticast;
47             this.Encoder = encoder;
48             this.retransmitSettings = retransmissionSettings;
49             this.SendSockets = sendSockets;
50             this.via = via;
51
52             if (this.retransmitSettings.Enabled)
53             {
54                 this.retransmitList = new Dictionary<UniqueId, IUdpRetransmitter>();
55                 this.randomNumberGenerator = new SynchronizedRandom(AppDomain.CurrentDomain.GetHashCode() | Environment.TickCount);
56             }
57         }
58
59         private interface IUdpRetransmitter
60         {
61             bool IsMulticast { get; }
62
63             void CancelRetransmission();
64         }
65
66         public override EndpointAddress RemoteAddress
67         {
68             get { return null; }
69         }
70
71         public override Uri Via
72         {
73             get { return this.via; }
74         }
75
76         internal bool IsMulticast
77         {
78             get;
79             private set;
80         }
81
82         internal TimeSpan InternalSendTimeout
83         {
84             get { return this.DefaultSendTimeout; }
85         }
86
87         protected BufferManager BufferManager
88         {
89             get;
90             private set;
91         }
92
93         protected MessageEncoder Encoder
94         {
95             get;
96             private set;
97         }
98
99         protected UdpSocket[] SendSockets
100         {
101             get;
102             private set;
103         }
104
105         [SuppressMessage("Microsoft.StyleCop.CSharp.ReadabilityRules", "SA1100:DoNotPrefixCallsWithBaseUnlessLocalImplementationExists", Justification = "StyleCop 4.5 does not validate this rule properly.")]
106         public override T GetProperty<T>()
107         {
108             if (typeof(T) == typeof(IOutputChannel))
109             {
110                 return (T)(object)this;
111             }
112
113             T messageEncoderProperty = this.Encoder.GetProperty<T>();
114             if (messageEncoderProperty != null)
115             {
116                 return messageEncoderProperty;
117             }
118
119             return base.GetProperty<T>();
120         }
121
122         internal void CancelRetransmission(UniqueId messageId)
123         {
124             if (messageId != null && this.retransmitList != null)
125             {
126                 lock (this.ThisLock)
127                 {
128                     if (this.retransmitList != null)
129                     {
130                         IUdpRetransmitter retransmitter;
131                         if (this.retransmitList.TryGetValue(messageId, out retransmitter))
132                         {
133                             this.retransmitList.Remove(messageId);
134                             retransmitter.CancelRetransmission();
135                         }
136                     }
137                 }
138             }
139         }
140
141         protected static void LogMessage(ref Message message, ArraySegment<byte> messageData)
142         {
143             using (XmlDictionaryReader xmlDictionaryReader = XmlDictionaryReader.CreateTextReader(messageData.Array, messageData.Offset, messageData.Count, null, XmlDictionaryReaderQuotas.Max, null))
144             {
145                 MessageLogger.LogMessage(ref message, xmlDictionaryReader, MessageLoggingSource.TransportSend);
146             }
147         }
148
149         protected override void AddHeadersTo(Message message)
150         {
151             Fx.Assert(message != null, "Message can't be null");
152             
153             if (message is NullMessage)
154             {
155                 return; 
156             }
157
158             if (message.Version.Addressing != AddressingVersion.None)
159             {
160                 if (message.Headers.MessageId == null)
161                 {
162                     message.Headers.MessageId = new UniqueId();
163                 }
164             }
165             else
166             {
167                 if (this.retransmitSettings.Enabled == true)
168                 {
169                     // we should only get here if some channel above us starts producing messages that don't match the encoder's message version.
170                     throw FxTrace.Exception.AsError(new ProtocolException(SR.RetransmissionRequiresAddressingOnMessage(message.Version.Addressing.ToString())));
171                 }
172             }
173         }
174
175         protected override IAsyncResult OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
176         {
177             if (message is NullMessage)
178             {
179                 return new CompletedAsyncResult(callback, state); 
180             }
181             
182             return new SendAsyncResult(this, message, timeout, callback, state);
183         }
184
185         protected override void OnEndSend(IAsyncResult result)
186         {
187             if (result is CompletedAsyncResult)
188             {
189                 CompletedAsyncResult.End(result); 
190             }
191             else 
192             {
193                 SendAsyncResult.End(result);
194             }
195         }
196
197         protected override void OnSend(Message message, TimeSpan timeout)
198         {
199             if (message is NullMessage)
200             {
201                 return; 
202             }
203         
204             TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
205
206             IPEndPoint remoteEndPoint;
207             UdpSocket[] sendSockets;
208             Exception exceptionToBeThrown;
209             sendSockets = this.GetSendSockets(message, out remoteEndPoint, out exceptionToBeThrown);
210
211             if (exceptionToBeThrown != null)
212             {
213                 throw FxTrace.Exception.AsError(exceptionToBeThrown);
214             }
215
216             if (timeoutHelper.RemainingTime() <= TimeSpan.Zero)
217             {
218                 throw FxTrace.Exception.AsError(new TimeoutException(SR.SendTimedOut(remoteEndPoint, timeout)));
219             }
220
221             bool returnBuffer = false;
222             ArraySegment<byte> messageData = default(ArraySegment<byte>);
223
224             bool sendingMulticast = UdpUtility.IsMulticastAddress(remoteEndPoint.Address);
225             SynchronousRetransmissionHelper retransmitHelper = null;
226             RetransmitIterator retransmitIterator = null;
227
228             bool shouldRetransmit = this.ShouldRetransmitMessage(sendingMulticast);
229             
230             try
231             {
232                 if (shouldRetransmit)
233                 {
234                     retransmitIterator = this.CreateRetransmitIterator(sendingMulticast);
235                     retransmitHelper = new SynchronousRetransmissionHelper(sendingMulticast);
236                     this.RetransmitStarting(message.Headers.MessageId, retransmitHelper);
237                 }
238
239                 messageData = this.EncodeMessage(message);
240                 returnBuffer = true;
241
242                 this.TransmitMessage(messageData, sendSockets, remoteEndPoint, timeoutHelper);
243
244                 if (shouldRetransmit)
245                 {
246                     while (retransmitIterator.MoveNext())
247                     {
248                         // wait for currentDelay time, then retransmit
249                         if (retransmitIterator.CurrentDelay > 0)
250                         {
251                             retransmitHelper.Wait(retransmitIterator.CurrentDelay);
252                         }
253
254                         if (retransmitHelper.IsCanceled)
255                         {
256                             ThrowIfAborted();
257                             return;
258                         }
259
260                         // since we only invoke the encoder once just before the initial send of the message
261                         // we need to handle logging the message in the retransmission case
262                         if (MessageLogger.LogMessagesAtTransportLevel)
263                         {
264                             UdpOutputChannel.LogMessage(ref message, messageData);
265                         }
266
267                         this.TransmitMessage(messageData, sendSockets, remoteEndPoint, timeoutHelper);
268                     }
269                 }
270             }
271             finally
272             {
273                 if (returnBuffer)
274                 {
275                     this.BufferManager.ReturnBuffer(messageData.Array);
276                 }
277
278                 if (shouldRetransmit)
279                 {
280                     this.RetransmitStopping(message.Headers.MessageId);
281
282                     if (retransmitHelper != null)
283                     {
284                         retransmitHelper.Dispose();
285                     }
286                 }
287             }
288         }
289
290         protected abstract UdpSocket[] GetSendSockets(Message message, out IPEndPoint remoteEndPoint, out Exception exceptionToBeThrown);
291
292         protected override void OnAbort()
293         {
294             this.Cleanup(true, TimeSpan.Zero);
295         }
296
297         [SuppressMessage("Microsoft.StyleCop.CSharp.ReadabilityRules", "SA1100:DoNotPrefixCallsWithBaseUnlessLocalImplementationExists", Justification = "If BeginClose is overridden we still pass base.BeginClose here")]
298         protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
299         {
300             return new CloseAsyncResult(
301                 this,
302                 timeout,
303                 callback,
304                 state);
305         }
306
307         protected override void OnClose(TimeSpan timeout)
308         {
309             TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
310             this.Cleanup(false, timeoutHelper.RemainingTime());
311         }
312
313         protected override void OnEndClose(IAsyncResult result)
314         {
315             CloseAsyncResult.End(result);
316         }
317
318         protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
319         {
320             this.OnOpen(timeout);
321             return new CompletedAsyncResult(callback, state);
322         }
323
324         protected override void OnEndOpen(IAsyncResult result)
325         {
326             CompletedAsyncResult.End(result);
327         }
328
329         protected override void OnOpen(TimeSpan timeout)
330         {
331             for (int i = 0; i < this.SendSockets.Length; i++)
332             {
333                 this.SendSockets[i].Open();
334             }
335         }
336
337         protected ArraySegment<byte> EncodeMessage(Message message)
338         {
339             return this.Encoder.WriteMessage(message, int.MaxValue, this.BufferManager);
340         }
341
342         protected ObjectDisposedException CreateObjectDisposedException()
343         {
344             return new ObjectDisposedException(null, SR.ObjectDisposed(this.GetType().Name));
345         }
346
347         private RetransmitIterator CreateRetransmitIterator(bool sendingMulticast)
348         {
349             Fx.Assert(this.retransmitSettings.Enabled, "CreateRetransmitCalculator called when no retransmission set to happen");
350             int lowerBound = this.retransmitSettings.GetDelayLowerBound();
351             int upperBound = this.retransmitSettings.GetDelayUpperBound();
352             int currentDelay = this.randomNumberGenerator.Next(lowerBound, upperBound);
353
354             int maxDelay = this.retransmitSettings.GetMaxDelayPerRetransmission();
355             int maxRetransmitCount = sendingMulticast ? this.retransmitSettings.MaxMulticastRetransmitCount : this.retransmitSettings.MaxUnicastRetransmitCount;
356
357             return new RetransmitIterator(currentDelay, maxDelay, maxRetransmitCount);
358         }
359
360         private void RetransmitStarting(UniqueId messageId, IUdpRetransmitter retransmitter)
361         {
362             Fx.Assert(this.retransmitSettings.Enabled, "RetransmitStarting called when retransmission is disabled");
363
364             lock (this.ThisLock)
365             {
366                 ThrowIfDisposed();
367
368                 if (this.retransmitList.ContainsKey(messageId))
369                 {
370                     // someone is sending a message with the same MessageId 
371                     // while a retransmission is still in progress for that ID.
372                     throw FxTrace.Exception.AsError(new InvalidOperationException(SR.RecycledMessageIdDuringRetransmission(messageId)));
373                 }
374                 else
375                 {
376                     this.retransmitList[messageId] = retransmitter;
377                 }
378             }
379         }
380
381         private void RetransmitStopping(UniqueId messageId)
382         {
383             Fx.Assert(this.retransmitSettings.Enabled, "RetransmitStopping called when retransmission is disabled");
384
385             lock (this.ThisLock)
386             {
387                 // Cleanup sets retransmitList to null, so check before using...
388                 if (this.retransmitList != null)
389                 {
390                     this.retransmitList.Remove(messageId);
391
392                     // if we are closing down, then we need to unblock the Cleanup code 
393                     //  this.retransmissionDoneEvent only != null if on cleaning up; abort case means that it == null. 
394                     if (this.retransmitList.Count == 0 && this.retransmissionDoneWaitHandle != null)
395                     {
396                         this.retransmissionDoneWaitHandle.Set();
397                     }
398                 }
399             }
400         }
401
402         private bool ShouldRetransmitMessage(bool sendingMulticast)
403         {
404             if (sendingMulticast)
405             {
406                 return this.retransmitSettings.MaxMulticastRetransmitCount > 0;
407             }
408             else
409             {
410                 return this.retransmitSettings.MaxUnicastRetransmitCount > 0;
411             }
412         }
413
414         private void TransmitMessage(ArraySegment<byte> messageBytes, UdpSocket[] sockets, IPEndPoint remoteEndpoint, TimeoutHelper timeoutHelper)
415         {
416             Fx.Assert(messageBytes.Array != null, "message data array can't be null");
417             Fx.Assert(sockets != null, "sockets can't be null");
418             Fx.Assert(sockets.Length > 0, "sockets must contain at least one item");
419             Fx.Assert(remoteEndpoint != null, "remoteEndPoint can't be null");
420
421             for (int i = 0; i < sockets.Length; i++)
422             {
423                 if (timeoutHelper.RemainingTime() <= TimeSpan.Zero)
424                 {
425                     throw FxTrace.Exception.AsError(new TimeoutException(SR.SendTimedOut(remoteEndpoint, timeoutHelper.OriginalTimeout)));
426                 }
427
428                 sockets[i].SendTo(messageBytes.Array, messageBytes.Offset, messageBytes.Count, remoteEndpoint);
429             }
430         }
431
432         // we're guaranteed by CommunicationObject that at most ONE of Close or BeginClose will be called once. 
433         private void Cleanup(bool aborting, TimeSpan timeout)
434         {
435             bool needToWait = false;
436
437             if (this.cleanedUp)
438             {
439                 return;
440             }
441
442             lock (this.ThisLock)
443             {
444                 if (this.cleanedUp)
445                 {
446                     return;
447                 }
448
449                 if (!aborting && this.retransmitList != null && this.retransmitList.Count > 0)
450                 {
451                     needToWait = true;
452                     this.retransmissionDoneWaitHandle = new AsyncWaitHandle(EventResetMode.ManualReset);
453                 }
454                 else
455                 {
456                     // copied this call here in order to avoid releasing then retaking lock 
457                     this.CleanupAfterWait(aborting);
458                 }
459             }
460
461             if (needToWait)
462             {
463                 if (!this.retransmissionDoneWaitHandle.Wait(timeout))
464                 {
465                     throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(timeout)));
466                 }
467
468                 lock (this.ThisLock)
469                 {
470                     this.retransmissionDoneWaitHandle = null;
471
472                     // another thread could have called Abort while Close() was waiting for retransmission to complete.
473                     if (this.cleanedUp)
474                     {
475                         return;
476                     }
477
478                     this.CleanupAfterWait(aborting);
479                 }
480             }
481         }
482
483         // must be called from within this.ThisLock
484         private void CleanupAfterWait(bool aborting)
485         {
486             Fx.Assert(!this.cleanedUp, "We should only clean up once");
487
488             if (this.retransmitList != null)
489             {
490                 foreach (IUdpRetransmitter retransmitter in this.retransmitList.Values)
491                 {
492                     retransmitter.CancelRetransmission();
493                 }
494
495                 if (aborting && this.retransmissionDoneWaitHandle != null)
496                 {
497                     // If another thread has called close and is waiting for retransmission to complete,
498                     // we need to make sure that thread gets unblocked.
499                     this.retransmissionDoneWaitHandle.Set();
500                 }
501
502                 this.retransmitList = null;
503             }
504
505             for (int i = 0; i < this.SendSockets.Length; i++)
506             {
507                 this.SendSockets[i].Close();
508             }                
509
510             this.cleanedUp = true;
511         }
512
513         private class RetransmitIterator
514         {
515             private int maxDelay;
516             private int retransmitCount;
517             private int initialDelay;
518
519             internal RetransmitIterator(int initialDelay, int maxDelay, int retransmitCount)
520             {
521                 Fx.Assert(initialDelay >= 0, "initialDelay cannot be negative");
522                 Fx.Assert(maxDelay >= initialDelay, "maxDelay must be >= initialDelay");
523                 Fx.Assert(retransmitCount > 0, "retransmitCount must be > 0");
524
525                 this.CurrentDelay = -1;
526                 this.initialDelay = initialDelay;
527                 this.maxDelay = maxDelay;
528                 this.retransmitCount = retransmitCount;
529             }
530
531             public int CurrentDelay
532             {
533                 get;
534                 private set;
535             }
536
537             // should be called before each retransmission to determine if 
538             // another one is needed.
539             public bool MoveNext()
540             {
541                 if (this.CurrentDelay < 0)
542                 {
543                     this.CurrentDelay = this.initialDelay;
544                     return true;
545                 }
546
547                 bool shouldContinue = --this.retransmitCount > 0;
548
549                 if (shouldContinue && this.CurrentDelay < this.maxDelay)
550                 {
551                     this.CurrentDelay = Math.Min(this.CurrentDelay * 2, this.maxDelay);
552                 }
553
554                 return shouldContinue;
555             }
556         }
557
558         private sealed class SynchronousRetransmissionHelper : IUdpRetransmitter, IDisposable
559         {
560             private ManualResetEvent cancelEvent;
561             private object thisLock;
562             private bool cleanedUp;
563
564             public SynchronousRetransmissionHelper(bool isMulticast)
565             {
566                 this.thisLock = new object();
567                 this.IsMulticast = isMulticast;
568                 this.cancelEvent = new ManualResetEvent(false);
569             }
570
571             public bool IsMulticast
572             {
573                 get;
574                 private set;
575             }
576
577             public bool IsCanceled
578             {
579                 get;
580                 private set;
581             }
582
583             public void Wait(int millisecondsTimeout)
584             {
585                 if (this.ResetEvent())
586                 {
587                     // Dispose should only be called by the same thread that
588                     // is calling this function, making it so that we don't need a lock here...
589                     this.cancelEvent.WaitOne(millisecondsTimeout);
590                 }
591             }
592
593             public void CancelRetransmission()
594             {
595                 lock (this.thisLock)
596                 {
597                     this.IsCanceled = true;
598
599                     if (!this.cleanedUp)
600                     {
601                         this.cancelEvent.Set();
602                     }
603                 }
604             }
605
606             public void Dispose()
607             {
608                 lock (this.thisLock)
609                 {
610                     if (!this.cleanedUp)
611                     {
612                         this.cleanedUp = true;
613                         this.cancelEvent.Dispose();
614                     }
615                 }
616             }
617
618             private bool ResetEvent()
619             {
620                 lock (this.thisLock)
621                 {
622                     if (!this.IsCanceled && !this.cleanedUp)
623                     {
624                         this.cancelEvent.Reset();
625                         return true;
626                     }
627                 }
628
629                 return false;
630             }
631         }
632
633         private class SendAsyncResult : AsyncResult, IUdpRetransmitter
634         {
635             private static AsyncCallback onSocketSendComplete = Fx.ThunkCallback(new AsyncCallback(OnSocketSendComplete));
636             private static Action<object> onRetransmitMessage = new Action<object>(OnRetransmitMessage);
637
638             private UdpOutputChannel channel;
639             private ArraySegment<byte> messageData;
640             private TimeoutHelper timeoutHelper;
641             private IPEndPoint remoteEndpoint;
642             private int currentSocket;
643             private UdpSocket[] sendSockets;
644             private IOThreadTimer retransmitTimer;
645             private RetransmitIterator retransmitIterator;
646             private Message message;
647             private bool retransmissionEnabled;
648
649             public SendAsyncResult(UdpOutputChannel channel, Message message, TimeSpan timeout, AsyncCallback callback, object state)
650                 : base(callback, state)
651             {
652                 this.timeoutHelper = new TimeoutHelper(timeout);
653
654                 this.channel = channel;
655                 this.message = message;
656                 bool throwing = true;
657                 bool completedSynchronously = false;
658
659                 try
660                 {
661                     this.Initialize(message);
662
663                     completedSynchronously = this.BeginTransmitMessage();
664
665                     if (completedSynchronously && this.retransmissionEnabled)
666                     {
667                         // initial send completed sync, now we need to start the retransmission process...
668                         completedSynchronously = this.BeginRetransmission();
669                     }
670
671                     throwing = false;
672                 }
673                 finally
674                 {
675                     if (throwing)
676                     {
677                         this.Cleanup();
678                     }
679                 }
680
681                 if (completedSynchronously)
682                 {
683                     this.CompleteAndCleanup(true, null);
684                 }
685             }
686
687             private enum RetransmitState
688             {
689                 WaitCompleted,
690                 TransmitCompleted,
691             }
692
693             public bool IsCanceled
694             {
695                 get;
696                 private set;
697             }
698
699             public bool IsMulticast
700             {
701                 get;
702                 private set;
703             }
704
705             public static void End(IAsyncResult result)
706             {
707                 AsyncResult.End<SendAsyncResult>(result);
708             }
709
710             // tries to terminate retransmission early, but won't cancel async IO immediately
711             public void CancelRetransmission()
712             {
713                 this.IsCanceled = true;
714             }
715
716             private static void OnSocketSendComplete(IAsyncResult result)
717             {
718                 if (result.CompletedSynchronously)
719                 {
720                     return;
721                 }
722
723                 SendAsyncResult thisPtr = (SendAsyncResult)result.AsyncState;
724                 bool completeSelf = false;
725                 Exception completionException = null;
726
727                 try
728                 {
729                     completeSelf = thisPtr.ContinueTransmitting(result);
730
731                     if (completeSelf && thisPtr.retransmissionEnabled)
732                     {
733                         completeSelf = thisPtr.BeginRetransmission();
734                     }
735                 }
736                 catch (Exception e)
737                 {
738                     if (Fx.IsFatal(e))
739                     {
740                         throw;
741                     }
742
743                     completionException = e;
744                     completeSelf = true;
745                 }
746
747                 if (completeSelf)
748                 {
749                     thisPtr.CompleteAndCleanup(false, completionException);
750                 }
751             }
752
753             private static void OnRetransmitMessage(object state)
754             {
755                 SendAsyncResult thisPtr = (SendAsyncResult)state;
756                 bool completeSelf = false;
757                 Exception completionException = null;
758
759                 try
760                 {
761                     completeSelf = thisPtr.ContinueRetransmission(RetransmitState.WaitCompleted);
762                 }
763                 catch (Exception e)
764                 {
765                     if (Fx.IsFatal(e))
766                     {
767                         throw;
768                     }
769
770                     completionException = e;
771                     completeSelf = true;
772                 }
773
774                 if (completeSelf)
775                 {
776                     thisPtr.CompleteAndCleanup(false, completionException);
777                 }
778             }
779
780             private bool BeginTransmitMessage()
781             {
782                 this.currentSocket = 0;
783                 return this.ContinueTransmitting(null);
784             }
785
786             private bool ContinueTransmitting(IAsyncResult socketAsyncResult)
787             {
788                 while (this.currentSocket < this.sendSockets.Length)
789                 {
790                     if (socketAsyncResult == null)
791                     {
792                         socketAsyncResult = this.sendSockets[this.currentSocket].BeginSendTo(
793                             this.messageData.Array,
794                             this.messageData.Offset,
795                             this.messageData.Count,
796                             this.remoteEndpoint,
797                             onSocketSendComplete,
798                             this);
799
800                         if (!socketAsyncResult.CompletedSynchronously)
801                         {
802                             return false;
803                         }
804                     }
805
806                     this.sendSockets[this.currentSocket].EndSendTo(socketAsyncResult);
807
808                     // check for timeout after calling socket.EndSendTo 
809                     // so that we don't leave the socket in a bad state/leak async results
810                     this.ThrowIfTimedOut();
811
812                     if (this.IsCanceled)
813                     {
814                         // don't send on the next socket and return true to cause Complete to be called.
815                         return true;
816                     }
817
818                     this.currentSocket++;
819                     socketAsyncResult = null;
820                 }
821
822                 return true;
823             }
824
825             private bool BeginRetransmission()
826             {
827                 // BeginRetransmission should only be called in the case where transmission of the message
828                 // completes synchronously.
829                 return this.ContinueRetransmission(RetransmitState.TransmitCompleted);
830             }
831
832             private bool ContinueRetransmission(RetransmitState state)
833             {
834                 this.ThrowIfTimedOut();
835
836                 while (true)
837                 {
838                     switch (state)
839                     {
840                         case RetransmitState.TransmitCompleted:
841                             if (!this.retransmitIterator.MoveNext())
842                             {
843                                 // We are done retransmitting
844                                 return true;
845                             }
846
847                             if (this.retransmitIterator.CurrentDelay > 0)
848                             {
849                                 this.retransmitTimer.Set(this.retransmitIterator.CurrentDelay);
850                                 return false;
851                             }
852
853                             state = RetransmitState.WaitCompleted;
854                             break;
855                         case RetransmitState.WaitCompleted:
856                             if (this.IsCanceled)
857                             {
858                                 this.channel.ThrowIfAborted();
859                                 return true;
860                             }
861
862                             // since we only invoke the encoder once just before the initial send of the message
863                             // we need to handle logging the message in the retransmission case
864                             if (MessageLogger.LogMessagesAtTransportLevel)
865                             {
866                                 UdpOutputChannel.LogMessage(ref this.message, this.messageData);
867                             }
868
869                             // !completedSync
870                             if (!this.BeginTransmitMessage())
871                             {
872                                 return false;
873                             }
874
875                             state = RetransmitState.TransmitCompleted;
876                             break;
877
878                         default:
879                             Fx.Assert("Unknown RetransmitState value encountered");
880                             return true;
881                     }
882                 }
883             }
884
885             private void Initialize(Message message)
886             {
887                 Exception exceptionToThrow;
888                 this.sendSockets = this.channel.GetSendSockets(message, out this.remoteEndpoint, out exceptionToThrow);
889
890                 if (exceptionToThrow != null)
891                 {
892                     throw FxTrace.Exception.AsError(exceptionToThrow);
893                 }
894
895                 this.IsMulticast = UdpUtility.IsMulticastAddress(this.remoteEndpoint.Address);
896
897                 if (this.channel.ShouldRetransmitMessage(this.IsMulticast))
898                 {
899                     this.retransmissionEnabled = true;
900                     this.channel.RetransmitStarting(this.message.Headers.MessageId, this);
901                     this.retransmitTimer = new IOThreadTimer(onRetransmitMessage, this, false);
902                     this.retransmitIterator = this.channel.CreateRetransmitIterator(this.IsMulticast);
903                 }
904
905                 this.messageData = this.channel.EncodeMessage(message);
906             }
907
908             private void ThrowIfTimedOut()
909             {
910                 if (this.timeoutHelper.RemainingTime() <= TimeSpan.Zero)
911                 {
912                     throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(this.timeoutHelper.OriginalTimeout)));
913                 }
914             }
915
916             private void Cleanup()
917             {
918                 if (this.retransmissionEnabled)
919                 {
920                     this.channel.RetransmitStopping(this.message.Headers.MessageId);
921                     this.retransmitTimer.Cancel();
922                 }
923
924                 if (this.messageData.Array != null)
925                 {
926                     this.channel.BufferManager.ReturnBuffer(this.messageData.Array);
927                     this.messageData = default(ArraySegment<byte>);
928                 }
929             }
930
931             private void CompleteAndCleanup(bool completedSynchronously, Exception completionException)
932             {
933                 this.Cleanup();
934                 this.Complete(completedSynchronously, completionException);
935             }
936         }
937
938         // Control flow for async path
939         // We use this mechanism to avoid initializing two async objects as logically cleanup+close is one operation. 
940         // At any point in the Begin* methods, we may go async. The steps are: 
941         // - Cleanup channel
942         // - Close channel
943         private class CloseAsyncResult : AsyncResult
944         {
945             private static Action<object, TimeoutException> completeCleanupCallback = new Action<object, TimeoutException>(CompleteCleanup);
946
947             private UdpOutputChannel channel;
948             private TimeoutHelper timeoutHelper;
949
950             public CloseAsyncResult(UdpOutputChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
951                 : base(callback, state)
952             {
953                 this.channel = channel;
954                 this.timeoutHelper = new TimeoutHelper(timeout);
955
956                 if (this.BeginCleanup())
957                 {
958                     this.Complete(true);
959                 }
960             }
961
962             internal static void End(IAsyncResult result)
963             {
964                 AsyncResult.End<CloseAsyncResult>(result);
965             }
966
967             private static void CompleteCleanup(object state, TimeoutException exception)
968             {
969                 CloseAsyncResult thisPtr = (CloseAsyncResult)state;
970                 Exception completionException = null;
971
972                 if (exception != null)
973                 {
974                     Fx.Assert(exception.GetType() == typeof(TimeoutException), "Exception on callback should always be TimeoutException");
975                     throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(thisPtr.timeoutHelper.OriginalTimeout)));
976                 }
977
978                 try
979                 {
980                     lock (thisPtr.channel.ThisLock)
981                     {
982                         thisPtr.channel.retransmissionDoneWaitHandle = null;
983
984                         // another thread could have called Abort while Close() was waiting for retransmission to complete.
985                         if (!thisPtr.channel.cleanedUp)
986                         {
987                             // never aborting here
988                             thisPtr.channel.CleanupAfterWait(false);
989                         }
990                     }
991                 }
992                 catch (Exception e)
993                 {
994                     if (Fx.IsFatal(e))
995                     {
996                         throw;
997                     }
998
999                     completionException = e;
1000                 }
1001
1002                 thisPtr.Complete(false, completionException);
1003             }
1004
1005             private bool BeginCleanup()
1006             {
1007                 bool needToWait = false;
1008
1009                 if (this.channel.cleanedUp)
1010                 {
1011                     return true;
1012                 }
1013
1014                 lock (this.channel.ThisLock)
1015                 {
1016                     if (this.channel.cleanedUp)
1017                     {
1018                         return true;
1019                     }
1020
1021                     // we're never aborting in this case...
1022                     if (this.channel.retransmitList != null && this.channel.retransmitList.Count > 0)
1023                     {
1024                         needToWait = true;
1025                         this.channel.retransmissionDoneWaitHandle = new AsyncWaitHandle(EventResetMode.ManualReset);
1026                     }
1027                     else
1028                     {
1029                         this.channel.CleanupAfterWait(false);
1030                     }
1031
1032                     // we're guaranteed by CommunicationObject that at most ONE of Close or BeginClose will be called once. 
1033                     // we don't null out retransmissionDoneEvent in the abort case; should be safe to use here. 
1034                     return !needToWait || this.channel.retransmissionDoneWaitHandle.WaitAsync(completeCleanupCallback, this, this.timeoutHelper.RemainingTime());
1035                 }
1036             }
1037         }
1038     }
1039 }