2009-07-11 Michael Barker <mike@middlesoft.co.uk>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Channels / TcpDuplexSessionChannel.cs
1 // 
2 // TcpDuplexSessionChannel.cs
3 // 
4 // Author: 
5 //      Marcos Cobena (marcoscobena@gmail.com)
6 //      Atsushi Enomoto  <atsushi@ximian.com>
7 // 
8 // Copyright 2007 Marcos Cobena (http://www.youcannoteatbits.org/)
9 //
10 // Copyright (C) 2009 Novell, Inc (http://www.novell.com)
11 //
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
19 // 
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
22 // 
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 //
31
32 using System;
33 using System.Collections.Generic;
34 using System.IO;
35 using System.Net;
36 using System.Net.Sockets;
37 using System.Runtime.Serialization;
38 using System.Runtime.Serialization.Formatters.Binary;
39 using System.ServiceModel.Channels;
40 using System.Text;
41 using System.Threading;
42 using System.Xml;
43
44 namespace System.ServiceModel.Channels
45 {
46         internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
47         {
48                 class TcpDuplexSession : DuplexSessionBase
49                 {
50                         TcpDuplexSessionChannel owner;
51
52                         internal TcpDuplexSession (TcpDuplexSessionChannel owner)
53                         {
54                                 this.owner = owner;
55                         }
56
57                         public override TimeSpan DefaultCloseTimeout {
58                                 get { return owner.DefaultCloseTimeout; }
59                         }
60
61                         public override void Close (TimeSpan timeout)
62                         {
63                                 owner.DiscardSession ();
64                         }
65                 }
66
67                 TcpChannelInfo info;
68                 TcpClient client;
69                 bool is_service_side;
70                 EndpointAddress local_address;
71                 TcpBinaryFrameManager frame;
72                 TcpDuplexSession session; // do not use this directly. Use Session instead.
73                 
74                 public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
75                         : base (factory, address, via)
76                 {
77                         is_service_side = false;
78                         this.info = info;
79                 }
80                 
81                 public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpClient client)
82                         : base (listener)
83                 {
84                         is_service_side = true;
85                         this.client = client;
86                         this.info = info;
87                 }
88                 
89                 public MessageEncoder Encoder {
90                         get { return info.MessageEncoder; }
91                 }
92
93                 public override EndpointAddress LocalAddress {
94                         get { return local_address; }
95                 }
96                 
97                 public IDuplexSession Session {
98                         get {
99                                 if (session == null)
100                                         session = new TcpDuplexSession (this);
101                                 return session;
102                         }
103                 }
104
105                 void DiscardSession ()
106                 {
107                         frame.ProcessEndRecordInitiator ();
108                         session = null;
109                 }
110
111                 public override void Send (Message message)
112                 {
113                         Send (message, DefaultSendTimeout);
114                 }
115                 
116                 public override void Send (Message message, TimeSpan timeout)
117                 {
118                         if (timeout <= TimeSpan.Zero)
119                                 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
120
121                         if (!is_service_side) {
122                                 if (message.Headers.To == null)
123                                         message.Headers.To = RemoteAddress.Uri;
124                                 if (message.Headers.ReplyTo == null)
125                                         message.Headers.ReplyTo = new EndpointAddress (Constants.WsaAnonymousUri);
126                         } else {
127                                 if (message.Headers.RelatesTo == null)
128                                         message.Headers.RelatesTo = OperationContext.Current.IncomingMessageHeaders.MessageId;
129                         }
130
131                         client.SendTimeout = (int) timeout.TotalMilliseconds;
132                         frame.WriteSizedMessage (message);
133                         // FIXME: should EndRecord be sent here?
134                         //if (is_service_side && client.Available > 0)
135                         //      frame.ProcessEndRecordRecipient ();
136                 }
137                 
138                 public override Message Receive ()
139                 {
140                         return Receive (DefaultReceiveTimeout);
141                 }
142                 
143                 public override Message Receive (TimeSpan timeout)
144                 {
145                         if (timeout <= TimeSpan.Zero)
146                                 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
147                         client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
148                         return frame.ReadSizedMessage ();
149                 }
150                 
151                 public override bool TryReceive (TimeSpan timeout, out Message message)
152                 {
153                         try {
154                                 DateTime start = DateTime.Now;
155                                 message = Receive (timeout);
156                                 if (message != null)
157                                         return true;
158                                 // received EndRecord, so close the session and return false instead.
159                                 // (Closing channel here might not be a good idea, but right now I have no better way.)
160                                 Close (timeout - (DateTime.Now - start));
161                                 return false;
162                         } catch (TimeoutException) {
163                                 message = null;
164                                 return false;
165                         }
166                 }
167                 
168                 public override bool WaitForMessage (TimeSpan timeout)
169                 {
170                         if (client.Available > 0)
171                                 return true;
172
173                         DateTime start = DateTime.Now;
174                         do {
175                                 Thread.Sleep (50);
176                                 if (client.Available > 0)
177                                         return true;
178                         } while (DateTime.Now - start < timeout);
179                         return false;
180                 }
181                 
182                 // CommunicationObject
183                 
184                 [MonoTODO]
185                 protected override void OnAbort ()
186                 {
187                         Close (TimeSpan.FromTicks (1));
188                 }
189
190                 [MonoTODO]
191                 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
192                         AsyncCallback callback, object state)
193                 {
194                         throw new NotImplementedException ();
195                 }
196
197                 [MonoTODO]
198                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
199                         AsyncCallback callback, object state)
200                 {
201                         throw new NotImplementedException ();
202                 }
203
204                 [MonoTODO]
205                 protected override void OnClose (TimeSpan timeout)
206                 {
207                         if (!is_service_side)
208                                 if (session != null)
209                                         session.Close (timeout);
210
211                         if (client != null)
212                                 client.Close ();
213                 }
214                 
215                 [MonoTODO]
216                 protected override void OnEndClose (IAsyncResult result)
217                 {
218                         throw new NotImplementedException ();
219                 }
220
221                 [MonoTODO]
222                 protected override void OnEndOpen (IAsyncResult result)
223                 {
224                         throw new NotImplementedException ();
225                 }
226                 
227                 [MonoTODO]
228                 protected override void OnOpen (TimeSpan timeout)
229                 {
230                         if (! is_service_side) {
231                                 int explicitPort = RemoteAddress.Uri.Port;
232                                 client = new TcpClient (RemoteAddress.Uri.Host, explicitPort <= 0 ? TcpTransportBindingElement.DefaultPort : explicitPort);
233                                                         //RemoteAddress.Uri.Port);
234                                 
235                                 NetworkStream ns = client.GetStream ();
236                                 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns, is_service_side) {
237                                         Encoder = this.Encoder,
238                                         Via = RemoteAddress.Uri };
239                                 frame.ProcessPreambleInitiator ();
240                                 frame.ProcessPreambleAckInitiator ();
241                         } else {
242                                 // server side
243                                 Stream s = client.GetStream ();
244
245                                 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s, is_service_side) { Encoder = this.Encoder };
246
247                                 // FIXME: use retrieved record properties in the request processing.
248
249                                 frame.ProcessPreambleRecipient ();
250                                 frame.ProcessPreambleAckRecipient ();
251                         }
252                 }
253                 
254                 class MyBinaryWriter : BinaryWriter
255                 {
256                         public MyBinaryWriter (Stream s)
257                                 : base (s)
258                         {
259                         }
260                         
261                         public void WriteBytes (byte [] bytes)
262                         {
263                                 Write7BitEncodedInt (bytes.Length);
264                                 Write (bytes);
265                         }
266                 }
267         }
268
269         // seealso: [MC-NMF] Windows Protocol document.
270         class TcpBinaryFrameManager
271         {
272                 class MyBinaryReader : BinaryReader
273                 {
274                         public MyBinaryReader (Stream s)
275                                 : base (s)
276                         {
277                         }
278
279                         public int ReadVariableInt ()
280                         {
281                                 return Read7BitEncodedInt ();
282                         }
283                 }
284
285                 class MyBinaryWriter : BinaryWriter
286                 {
287                         public MyBinaryWriter (Stream s)
288                                 : base (s)
289                         {
290                         }
291
292                         public void WriteVariableInt (int value)
293                         {
294                                 Write7BitEncodedInt (value);
295                         }
296
297                         public int GetSizeOfLength (int value)
298                         {
299                                 int x = 0;
300                                 do {
301                                         value /= 0x100;
302                                         x++;
303                                 } while (value != 0);
304                                 return x;
305                         }
306                 }
307
308                 class MyXmlBinaryWriterSession : XmlBinaryWriterSession
309                 {
310                         public override bool TryAdd (XmlDictionaryString value, out int key)
311                         {
312                                 if (!base.TryAdd (value, out key))
313                                         return false;
314                                 List.Add (value);
315                                 return true;
316                         }
317
318                         public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
319                 }
320
321                 public const byte VersionRecord = 0;
322                 public const byte ModeRecord = 1;
323                 public const byte ViaRecord = 2;
324                 public const byte KnownEncodingRecord = 3;
325                 public const byte ExtendingEncodingRecord = 4;
326                 public const byte UnsizedEnvelopeRecord = 5;
327                 public const byte SizedEnvelopeRecord = 6;
328                 public const byte EndRecord = 7;
329                 public const byte FaultRecord = 8;
330                 public const byte UpgradeRequestRecord = 9;
331                 public const byte UpgradeResponseRecord = 0xA;
332                 public const byte PreambleAckRecord = 0xB;
333                 public const byte PreambleEndRecord = 0xC;
334
335                 public const byte SingletonUnsizedMode = 1;
336                 public const byte DuplexMode = 2;
337                 public const byte SimplexMode = 3;
338                 public const byte SingletonSizedMode = 4;
339                 MyBinaryReader reader;
340                 MyBinaryWriter writer;
341
342                 public TcpBinaryFrameManager (int mode, Stream s, bool isServiceSide)
343                 {
344                         this.mode = mode;
345                         this.s = s;
346                         this.is_service_side = isServiceSide;
347                         reader = new MyBinaryReader (s);
348                         ResetWriteBuffer ();
349
350                         EncodingRecord = 8; // FIXME: it should depend on mode.
351                 }
352
353                 Stream s;
354                 MemoryStream buffer;
355                 bool is_service_side;
356
357                 int mode;
358
359                 public byte EncodingRecord { get; set; }
360
361                 public Uri Via { get; set; }
362
363                 public MessageEncoder Encoder { get; set; }
364
365                 void ResetWriteBuffer ()
366                 {
367                         this.buffer = new MemoryStream ();
368                         writer = new MyBinaryWriter (buffer);
369                 }
370
371                 public byte [] ReadSizedChunk ()
372                 {
373                         int length = reader.ReadVariableInt ();
374                         
375                         if (length > 65536)
376                                 throw new InvalidOperationException ("The message is too large.");
377
378                         byte [] buffer = new byte [length];
379                         for (int readSize = 0; readSize < length; )
380                                 readSize += reader.Read (buffer, readSize, length - readSize);
381                         return buffer;
382                 }
383
384                 public void WriteSizedChunk (byte [] data)
385                 {
386                         writer.WriteVariableInt (data.Length);
387                         writer.Write (data, 0, data.Length);
388                 }
389
390                 public void ProcessPreambleInitiator ()
391                 {
392                         ResetWriteBuffer ();
393
394                         buffer.WriteByte (VersionRecord);
395                         buffer.WriteByte (1);
396                         buffer.WriteByte (0);
397                         buffer.WriteByte (ModeRecord);
398                         buffer.WriteByte ((byte) mode);
399                         buffer.WriteByte (ViaRecord);
400                         writer.Write (Via.ToString ());
401                         buffer.WriteByte (KnownEncodingRecord); // FIXME
402                         buffer.WriteByte ((byte) EncodingRecord);
403                         buffer.WriteByte (PreambleEndRecord);
404                         buffer.Flush ();
405                         s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
406                         s.Flush ();
407                 }
408
409                 public void ProcessPreambleAckInitiator ()
410                 {
411                         int b = s.ReadByte ();
412                         switch (b) {
413                         case PreambleAckRecord:
414                                 return; // success
415                         case FaultRecord:
416                                 throw new FaultException (reader.ReadString ());
417                         default:
418                                 throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
419                         }
420                 }
421
422                 public void ProcessPreambleAckRecipient ()
423                 {
424                         s.WriteByte (PreambleAckRecord);
425                 }
426
427                 public void ProcessPreambleRecipient ()
428                 {
429                         bool preambleEnd = false;
430                         while (!preambleEnd) {
431                                 int b = s.ReadByte ();
432                                 switch (b) {
433                                 case VersionRecord:
434                                         if (s.ReadByte () != 1)
435                                                 throw new ProtocolException ("Major version must be 1");
436                                         if (s.ReadByte () != 0)
437                                                 throw new ProtocolException ("Minor version must be 0");
438                                         break;
439                                 case ModeRecord:
440                                         if (s.ReadByte () != mode)
441                                                 throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
442                                         break;
443                                 case ViaRecord:
444                                         Via = new Uri (reader.ReadString ());
445                                         break;
446                                 case KnownEncodingRecord:
447                                         EncodingRecord = (byte) s.ReadByte ();
448                                         break;
449                                 case ExtendingEncodingRecord:
450                                         throw new NotImplementedException ("ExtendingEncodingRecord");
451                                 case UpgradeRequestRecord:
452                                         throw new NotImplementedException ("UpgradeRequetRecord");
453                                 case UpgradeResponseRecord:
454                                         throw new NotImplementedException ("UpgradeResponseRecord");
455                                 case PreambleEndRecord:
456                                         preambleEnd = true;
457                                         break;
458                                 default:
459                                         throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
460                                 }
461                         }
462                 }
463
464                 XmlBinaryReaderSession reader_session;
465                 int reader_session_items;
466
467                 public Message ReadSizedMessage ()
468                 {
469                         // FIXME: implement full [MC-NMF].
470
471                         var packetType = s.ReadByte ();
472                         if (packetType == EndRecord)
473                                 return null;
474                         if (packetType != SizedEnvelopeRecord)
475                                 throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
476
477                         byte [] buffer = ReadSizedChunk ();
478
479                         var ms = new MemoryStream (buffer, 0, buffer.Length);
480
481                         // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
482                         if (EncodingRecord != 8)
483                                 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
484
485                         // Encoding type 8:
486                         // the returned buffer consists of a serialized reader 
487                         // session and the binary xml body. 
488
489                         var session = reader_session ?? new XmlBinaryReaderSession ();
490                         reader_session = session;
491                         byte [] rsbuf = new TcpBinaryFrameManager (0, ms, is_service_side).ReadSizedChunk ();
492                         using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
493                                 var rbr = new BinaryReader (rms, Encoding.UTF8);
494                                 while (rms.Position < rms.Length)
495                                         session.Add (reader_session_items++, rbr.ReadString ());
496                         }
497                         var benc = Encoder as BinaryMessageEncoder;
498                         if (benc != null)
499                                 benc.CurrentReaderSession = session;
500
501                         // FIXME: supply maxSizeOfHeaders.
502                         Message msg = Encoder.ReadMessage (ms, 0x10000);
503                         if (benc != null)
504                                 benc.CurrentReaderSession = null;
505
506 //                      if (!is_service_side)
507 //                              if (s.Read (eof_buffer, 0, 1) == 1)
508 //                                      if (eof_buffer [0] != EndRecord)
509 //                                              throw new ProtocolException (String.Format ("Expected EndRecord message, got {0:X02}", eof_buffer [0]));
510 //
511
512                         return msg;
513                 }
514
515                 byte [] eof_buffer = new byte [1];
516                 MyXmlBinaryWriterSession writer_session;
517
518                 public void WriteSizedMessage (Message message)
519                 {
520                         ResetWriteBuffer ();
521
522                         // FIXME: implement full [MC-NMF] protocol.
523
524                         if (EncodingRecord != 8)
525                                 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
526
527                         buffer.WriteByte (SizedEnvelopeRecord);
528
529                         MemoryStream ms = new MemoryStream ();
530                         var session = writer_session ?? new MyXmlBinaryWriterSession ();
531                         writer_session = session;
532                         int writer_session_count = session.List.Count;
533                         var benc = Encoder as BinaryMessageEncoder;
534                         try {
535                                 if (benc != null)
536                                         benc.CurrentWriterSession = session;
537                                 Encoder.WriteMessage (message, ms);
538                         } finally {
539                                 benc.CurrentWriterSession = null;
540                         }
541
542                         // dictionary
543                         MemoryStream msd = new MemoryStream ();
544                         BinaryWriter dw = new BinaryWriter (msd);
545                         for (int i = writer_session_count; i < session.List.Count; i++)
546                                 dw.Write (session.List [i].Value);
547                         dw.Flush ();
548
549                         int length = (int) (msd.Position + ms.Position);
550                         var msda = msd.ToArray ();
551                         int sizeOfLength = writer.GetSizeOfLength (msda.Length);
552
553                         writer.WriteVariableInt (length + sizeOfLength); // dictionary array also involves the size of itself.
554                         WriteSizedChunk (msda);
555                         // message body
556                         var arr = ms.GetBuffer ();
557                         writer.Write (arr, 0, (int) ms.Position);
558
559                         writer.Flush ();
560
561                         s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
562                         s.Flush ();
563                 }
564
565                 public void ProcessEndRecordInitiator ()
566                 {
567                         s.WriteByte (EndRecord); // it is required
568                         s.Flush ();
569                 }
570
571                 public void ProcessEndRecordRecipient ()
572                 {
573                         int b;
574                         if ((b = s.ReadByte ()) != EndRecord)
575                                 throw new ProtocolException (String.Format ("EndRecord message was expected, got {0:X}", b));
576                 }
577         }
578 }