Fix wrong async completion handling in UDP channel receive.
[mono.git] / mcs / class / System.ServiceModel.Discovery / System.ServiceModel.Discovery.Udp / UdpDuplexChannel.cs
1 //
2 // Author: Atsushi Enomoto <atsushi@ximian.com>
3 //
4 // Copyright (C) 2010 Novell, Inc (http://www.novell.com)
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining
7 // a copy of this software and associated documentation files (the
8 // "Software"), to deal in the Software without restriction, including
9 // without limitation the rights to use, copy, modify, merge, publish,
10 // distribute, sublicense, and/or sell copies of the Software, and to
11 // permit persons to whom the Software is furnished to do so, subject to
12 // the following conditions:
13 // 
14 // The above copyright notice and this permission notice shall be
15 // included in all copies or substantial portions of the Software.
16 // 
17 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 //
25 using System;
26 using System.Collections.Generic;
27 using System.IO;
28 using System.Linq;
29 using System.Net;
30 using System.Net.NetworkInformation;
31 using System.Net.Sockets;
32 using System.ServiceModel;
33 using System.ServiceModel.Channels;
34 using System.ServiceModel.Discovery;
35 using System.Threading;
36 using System.Xml;
37
38 namespace System.ServiceModel.Discovery.Udp
39 {
40         internal class UdpDuplexChannel : ChannelBase, IDuplexChannel
41         {
42                 // channel factory
43                 public UdpDuplexChannel (UdpChannelFactory factory, BindingContext context, EndpointAddress address, Uri via)
44                         : base (factory)
45                 {
46                         if (factory == null)
47                                 throw new ArgumentNullException ("factory");
48                         if (context == null)
49                                 throw new ArgumentNullException ("context");
50                         if (address == null)
51                                 throw new ArgumentNullException ("address");
52
53                         binding_element = factory.Source;
54                         RemoteAddress = address;
55                         Via = via;
56                         FillMessageEncoder (context);
57                 }
58                 
59                 public UdpDuplexChannel (UdpChannelListener listener)
60                         : base (listener)
61                 {
62                         binding_element = listener.Source;
63                         LocalAddress = new EndpointAddress (listener.Uri);
64                         FillMessageEncoder (listener.Context);
65                 }
66                 
67                 MessageEncoder message_encoder;
68                 UdpClient client;
69                 IPAddress multicast_address;
70                 UdpTransportBindingElement binding_element;
71                 
72                 // for servers
73                 public EndpointAddress LocalAddress { get; private set; }
74                 // for clients
75                 public EndpointAddress RemoteAddress { get; private set; }
76                 
77                 public Uri Via { get; private set; }
78
79                 void FillMessageEncoder (BindingContext ctx)
80                 {
81                         var mbe = (MessageEncodingBindingElement) ctx.Binding.Elements.FirstOrDefault (be => be is MessageEncodingBindingElement);
82                         if (mbe == null)
83                                 mbe = new TextMessageEncodingBindingElement ();
84                         message_encoder = mbe.CreateMessageEncoderFactory ().Encoder;
85                 }
86                 
87                 public void Send (Message message)
88                 {
89                         Send (message, DefaultSendTimeout);
90                 }
91
92                 static readonly Random rnd = new Random ();
93
94                 UdpClient GetSenderClient (Message message)
95                 {
96                         if (RemoteAddress != null)
97                                 return client;
98                                 
99                         var rmp = message.Properties [RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty;
100                         if (rmp == null)
101                                 throw new ArgumentException ("This duplex channel from the channel listener cannot send messages without RemoteEndpointMessageProperty");
102                         var cli = new UdpClient ();
103                         cli.Connect (IPAddress.Parse (rmp.Address), rmp.Port);
104                         return cli;
105                 }
106
107                 public void Send (Message message, TimeSpan timeout)
108                 {
109                         if (State != CommunicationState.Opened)
110                                 throw new InvalidOperationException ("The UDP channel must be opened before sending a message.");
111
112                         var cli = GetSenderClient (message);
113                         try {
114                                 SendCore (cli, message, timeout);
115                         } finally {
116                                 if (cli != client)
117                                         cli.Close ();
118                         }
119                 }
120
121                 void SendCore (UdpClient cli, Message message, TimeSpan timeout)
122                 {
123                         var ms = new MemoryStream ();
124                         message_encoder.WriteMessage (message, ms);
125                         // It seems .NET sends the same Message a couple of times so that the receivers don't miss it. So, do the same hack.
126                         for (int i = 0; i < 3; i++) {
127                                 // FIXME: use MaxAnnouncementDelay. It is fixed now.
128                                 Thread.Sleep (rnd.Next (50, 500));
129                                 cli.Send (ms.GetBuffer (), (int) ms.Length);
130                         }
131                 }
132
133                 public bool WaitForMessage (TimeSpan timeout)
134                 {
135                         throw new NotImplementedException ();
136                 }
137
138                 public Message Receive ()
139                 {
140                         return Receive (DefaultReceiveTimeout);
141                 }
142
143                 public Message Receive (TimeSpan timeout)
144                 {
145                         Message msg;
146                         if (!TryReceive (timeout, out msg))
147                                 throw new TimeoutException ();
148                         return msg;
149                 }
150
151                 public bool TryReceive (TimeSpan timeout, out Message msg)
152                 {
153                         DateTime start = DateTime.Now;
154                         ThrowIfDisposedOrNotOpen ();
155                         msg = null;
156
157                         if (client == null) // could be invoked while being closed.
158                                 return false;
159
160                         byte [] bytes = null;
161                         IPEndPoint ip = new IPEndPoint (IPAddress.Any, 0);
162                         ManualResetEvent wait = new ManualResetEvent (false);
163                         var ar = client.BeginReceive (delegate (IAsyncResult result) {
164                                 try {
165                                         UdpClient cli = (UdpClient) result.AsyncState;
166                                         try {
167                                                 bytes = cli.EndReceive (result, ref ip);
168                                         } catch (ObjectDisposedException) {
169                                                 if (State == CommunicationState.Opened)
170                                                         throw;
171                                                 // Otherwise, called during shutdown. Ignore it.
172                                         }
173                                 } finally {
174                                         wait.Set ();
175                                 }
176                         }, client);
177
178                         if (!ar.IsCompleted && !wait.WaitOne (timeout))
179                                 return false;
180                         if (bytes == null || bytes.Length == 0)
181                                 return false;
182
183                         // Clients will send the same message many times, and this receiver has to 
184
185                         // FIXME: give maxSizeOfHeaders
186                         msg = message_encoder.ReadMessage (new MemoryStream (bytes), int.MaxValue);
187                         var id = msg.Headers.MessageId;
188                         if (message_ids.Contains (id))
189                                 return TryReceive (timeout - (DateTime.Now - start), out msg);
190                         if (id != null) {
191                                 message_ids.Enqueue (id);
192                                 if (message_ids.Count >= binding_element.TransportSettings.DuplicateMessageHistoryLength)
193                                         message_ids.Dequeue ();
194                         }
195                         msg.Properties.Add ("Via", LocalAddress.Uri);
196                         msg.Properties.Add ("Encoder", message_encoder);
197                         msg.Properties.Add (RemoteEndpointMessageProperty.Name, new RemoteEndpointMessageProperty (ip.Address.ToString (), ip.Port));
198                         return true;
199                 }
200
201                 Queue<UniqueId> message_ids = new Queue<UniqueId> ();
202
203                 protected override void OnAbort ()
204                 {
205                         OnClose (TimeSpan.Zero);
206                 }
207                 
208                 Action<TimeSpan> open_delegate, close_delegate;
209                 
210                 protected override IAsyncResult OnBeginClose (TimeSpan timeout, AsyncCallback callback, object state)
211                 {
212                         if (close_delegate == null)
213                                 close_delegate = new Action<TimeSpan> (OnClose);
214                         return close_delegate.BeginInvoke (timeout, callback, state);
215                 }
216                 
217                 protected override void OnEndClose (IAsyncResult result)
218                 {
219                         close_delegate.EndInvoke (result);
220                 }
221                 
222                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout, AsyncCallback callback, object state)
223                 {
224                         if (open_delegate == null)
225                                 open_delegate = new Action<TimeSpan> (OnOpen);
226                         return open_delegate.BeginInvoke (timeout, callback, state);
227                 }
228                 
229                 protected override void OnEndOpen (IAsyncResult result)
230                 {
231                         open_delegate.EndInvoke (result);
232                 }
233                 
234                 protected override void OnClose (TimeSpan timeout)
235                 {
236                         if (client != null) {
237                                 if (multicast_address != null) {
238                                         client.DropMulticastGroup (multicast_address, LocalAddress.Uri.Port);
239                                         multicast_address = null;
240                                 }
241                                 client.Close ();
242                         }
243                         client = null;
244                 }
245                 
246                 protected override void OnOpen (TimeSpan timeout)
247                 {
248                         if (RemoteAddress != null) {
249                                 client = new UdpClient ();
250                                 var uri = Via ?? RemoteAddress.Uri;
251                                 client.Connect (uri.Host, uri.Port);
252                         } else {
253                                 var ip = IPAddress.Parse (LocalAddress.Uri.Host);
254                                 bool isMulticast = NetworkInterface.GetAllNetworkInterfaces ().Any (nic => nic.SupportsMulticast && nic.GetIPProperties ().MulticastAddresses.Any (mca => mca.Address.Equals (ip)));
255                                 int port = LocalAddress.Uri.Port;
256                                 if (isMulticast) {
257                                         multicast_address = ip;
258                                         client = new UdpClient (new IPEndPoint (IPAddress.Any, port));
259                                         client.JoinMulticastGroup (ip, binding_element.TransportSettings.TimeToLive);
260                                 }
261                                 else
262                                         client = new UdpClient (new IPEndPoint (ip, port));
263                         }
264
265                         client.EnableBroadcast = true;
266
267                         // FIXME: apply UdpTransportSetting here.
268                 }
269                 
270                 Func<TimeSpan,Message> receive_delegate;
271                 
272                 public IAsyncResult BeginReceive (AsyncCallback callback, object state)
273                 {
274                         return BeginReceive (DefaultReceiveTimeout, callback, state);
275                 }
276                 
277                 public IAsyncResult BeginReceive (TimeSpan timeout, AsyncCallback callback, object state)
278                 {
279                         if (receive_delegate == null)
280                                 receive_delegate = new Func<TimeSpan,Message> (Receive);
281                         return receive_delegate.BeginInvoke (timeout, callback, state);
282                 }
283                 
284                 public Message EndReceive (IAsyncResult result)
285                 {
286                         return receive_delegate.EndInvoke (result);
287                 }
288                 
289                 delegate bool TryReceiveDelegate (TimeSpan timeout, out Message msg);
290                 TryReceiveDelegate try_receive_delegate;
291
292                 public IAsyncResult BeginTryReceive (TimeSpan timeout, AsyncCallback callback, object state)
293                 {
294                         if (try_receive_delegate == null)
295                                 try_receive_delegate = new TryReceiveDelegate (TryReceive);
296                         Message dummy;
297                         return try_receive_delegate.BeginInvoke (timeout, out dummy, callback, state);
298                 }
299                 
300                 public bool EndTryReceive (IAsyncResult result, out Message msg)
301                 {
302                         return try_receive_delegate.EndInvoke (out msg, result);
303                 }
304
305                 Func<TimeSpan,bool> wait_delegate;
306                 
307                 public IAsyncResult BeginWaitForMessage (TimeSpan timeout, AsyncCallback callback, object state)
308                 {
309                         if (wait_delegate == null)
310                                 wait_delegate = new Func<TimeSpan,bool> (WaitForMessage);
311                         return wait_delegate.BeginInvoke (timeout, callback, state);
312                 }
313                 
314                 public bool EndWaitForMessage (IAsyncResult result)
315                 {
316                         return wait_delegate.EndInvoke (result);
317                 }
318
319                 Action<Message,TimeSpan> send_delegate;
320                 
321                 public IAsyncResult BeginSend (Message message, AsyncCallback callback, object state)
322                 {
323                         return BeginSend (message, DefaultSendTimeout, callback, state);
324                 }
325                 
326                 public IAsyncResult BeginSend (Message message, TimeSpan timeout, AsyncCallback callback, object state)
327                 {
328                         if (send_delegate == null)
329                                 send_delegate = new Action<Message,TimeSpan> (Send);
330                         return send_delegate.BeginInvoke (message, timeout, callback, state);
331                 }
332                 
333                 public void EndSend (IAsyncResult result)
334                 {
335                         send_delegate.EndInvoke (result);
336                 }
337         }
338 }