2009-06-12 Bill Holmes <billholmes54@gmail.com>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Channels / TcpDuplexSessionChannel.cs
1 // 
2 // TcpDuplexSessionChannel.cs
3 // 
4 // Author: 
5 //      Marcos Cobena (marcoscobena@gmail.com)
6 //      Atsushi Enomoto  <atsushi@ximian.com>
7 // 
8 // Copyright 2007 Marcos Cobena (http://www.youcannoteatbits.org/)
9 //
10 // Copyright (C) 2009 Novell, Inc (http://www.novell.com)
11 //
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
19 // 
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
22 // 
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 //
31
32 using System;
33 using System.Collections.Generic;
34 using System.IO;
35 using System.Net;
36 using System.Net.Sockets;
37 using System.Runtime.Serialization;
38 using System.Runtime.Serialization.Formatters.Binary;
39 using System.ServiceModel.Channels;
40 using System.Text;
41 using System.Threading;
42 using System.Xml;
43
44 namespace System.ServiceModel.Channels
45 {
46         internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
47         {
48                 class TcpDuplexSession : DuplexSessionBase
49                 {
50                         TcpDuplexSessionChannel owner;
51
52                         internal TcpDuplexSession (TcpDuplexSessionChannel owner)
53                         {
54                                 this.owner = owner;
55                         }
56
57                         public override TimeSpan DefaultCloseTimeout {
58                                 get { return owner.DefaultCloseTimeout; }
59                         }
60
61                         public override void Close (TimeSpan timeout)
62                         {
63                                 // FIXME: what to do here?
64                                 throw new NotImplementedException ();
65                         }
66                 }
67
68                 TcpChannelInfo info;
69                 TcpClient client;
70                 bool is_service_side;
71                 EndpointAddress local_address;
72                 TcpListener tcp_listener;
73                 TimeSpan timeout;
74                 TcpBinaryFrameManager frame;
75                 TcpDuplexSession session;
76                 
77                 public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
78                         : base (factory, address, via)
79                 {
80                         is_service_side = false;
81                         this.info = info;
82                         session = new TcpDuplexSession (this);
83                 }
84                 
85                 public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpListener tcpListener, TimeSpan timeout)
86                         : base (listener)
87                 {
88                         is_service_side = true;
89                         tcp_listener = tcpListener;
90                         this.info = info;
91                         session = new TcpDuplexSession (this);
92                         this.timeout = timeout;
93                 }
94                 
95                 public MessageEncoder Encoder {
96                         get { return info.MessageEncoder; }
97                 }
98
99                 public override EndpointAddress LocalAddress {
100                         get { return local_address; }
101                 }
102                 
103                 public IDuplexSession Session {
104                         get { return session; }
105                 }
106
107                 public override void Send (Message message)
108                 {
109                         Send (message, DefaultSendTimeout);
110                 }
111                 
112                 public override void Send (Message message, TimeSpan timeout)
113                 {
114                         if (timeout <= TimeSpan.Zero)
115                                 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
116
117                         if (!is_service_side) {
118                                 if (message.Headers.To == null)
119                                         message.Headers.To = RemoteAddress.Uri;
120                                 if (message.Headers.ReplyTo == null)
121                                         message.Headers.ReplyTo = new EndpointAddress (Constants.WsaAnonymousUri);
122                         } else {
123                                 if (message.Headers.RelatesTo == null)
124                                         message.Headers.RelatesTo = OperationContext.Current.IncomingMessageHeaders.MessageId;
125                         }
126
127                         client.SendTimeout = (int) timeout.TotalMilliseconds;
128                         frame.WriteSizedMessage (message);
129                         // FIXME: should EndRecord be sent here?
130                         //if (is_service_side && client.Available > 0)
131                         //      frame.ProcessEndRecordRecipient ();
132                 }
133                 
134                 [MonoTODO]
135                 public override IAsyncResult BeginTryReceive (TimeSpan timeout, AsyncCallback callback, object state)
136                 {
137                         throw new NotImplementedException ();
138                 }
139                 
140                 [MonoTODO]
141                 public override bool EndTryReceive (IAsyncResult result, out Message message)
142                 {
143                         throw new NotImplementedException ();
144                 }
145                 
146                 public override Message Receive ()
147                 {
148                         return Receive (DefaultReceiveTimeout);
149                 }
150                 
151                 public override Message Receive (TimeSpan timeout)
152                 {
153                         if (timeout <= TimeSpan.Zero)
154                                 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
155                         client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
156                         return frame.ReadSizedMessage ();
157                 }
158                 
159                 public override bool TryReceive (TimeSpan timeout, out Message message)
160                 {
161                         try {
162                                 message = Receive (timeout);
163                                 return true;
164                         } catch (TimeoutException) {
165                                 message = null;
166                                 return false;
167                         }
168                 }
169                 
170                 public override bool WaitForMessage (TimeSpan timeout)
171                 {
172                         if (client.Available > 0)
173                                 return true;
174
175                         DateTime start = DateTime.Now;
176                         do {
177                                 Thread.Sleep (50);
178                                 if (client.Available > 0)
179                                         return true;
180                         } while (DateTime.Now - start < timeout);
181                         return false;
182                 }
183                 
184                 // CommunicationObject
185                 
186                 [MonoTODO]
187                 protected override void OnAbort ()
188                 {
189                         if (client != null)
190                                 client.Close ();
191                 }
192
193                 [MonoTODO]
194                 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
195                         AsyncCallback callback, object state)
196                 {
197                         throw new NotImplementedException ();
198                 }
199
200                 [MonoTODO]
201                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
202                         AsyncCallback callback, object state)
203                 {
204                         throw new NotImplementedException ();
205                 }
206
207                 [MonoTODO]
208                 protected override void OnClose (TimeSpan timeout)
209                 {
210                         if (client != null)
211                                 client.Close ();
212                 }
213                 
214                 [MonoTODO]
215                 protected override void OnEndClose (IAsyncResult result)
216                 {
217                         throw new NotImplementedException ();
218                 }
219
220                 [MonoTODO]
221                 protected override void OnEndOpen (IAsyncResult result)
222                 {
223                         throw new NotImplementedException ();
224                 }
225                 
226                 [MonoTODO]
227                 protected override void OnOpen (TimeSpan timeout)
228                 {
229                         if (! is_service_side) {
230                                 int explicitPort = RemoteAddress.Uri.Port;
231                                 client = new TcpClient (RemoteAddress.Uri.Host, explicitPort <= 0 ? TcpTransportBindingElement.DefaultPort : explicitPort);
232                                                         //RemoteAddress.Uri.Port);
233                                 
234                                 NetworkStream ns = client.GetStream ();
235                                 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns, is_service_side) {
236                                         Encoder = this.Encoder,
237                                         Via = RemoteAddress.Uri };
238                         } else {
239                                 // server side
240                                 client = tcp_listener.AcceptTcpClient ();
241                                 Stream s = client.GetStream ();
242
243                                 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s, is_service_side) { Encoder = this.Encoder };
244
245                                 // FIXME: use retrieved record properties in the request processing.
246
247                         }
248                 }
249                 
250                 class MyBinaryWriter : BinaryWriter
251                 {
252                         public MyBinaryWriter (Stream s)
253                                 : base (s)
254                         {
255                         }
256                         
257                         public void WriteBytes (byte [] bytes)
258                         {
259                                 Write7BitEncodedInt (bytes.Length);
260                                 Write (bytes);
261                         }
262                 }
263         }
264
265         // seealso: [MC-NMF] Windows Protocol document.
266         class TcpBinaryFrameManager
267         {
268                 class MyBinaryReader : BinaryReader
269                 {
270                         public MyBinaryReader (Stream s)
271                                 : base (s)
272                         {
273                         }
274
275                         public int ReadVariableInt ()
276                         {
277                                 return Read7BitEncodedInt ();
278                         }
279                 }
280
281                 class MyBinaryWriter : BinaryWriter
282                 {
283                         public MyBinaryWriter (Stream s)
284                                 : base (s)
285                         {
286                         }
287
288                         public void WriteVariableInt (int value)
289                         {
290                                 Write7BitEncodedInt (value);
291                         }
292
293                         public int GetSizeOfLength (int value)
294                         {
295                                 int x = 0;
296                                 do {
297                                         value /= 0x100;
298                                         x++;
299                                 } while (value != 0);
300                                 return x;
301                         }
302                 }
303
304                 class MyXmlBinaryWriterSession : XmlBinaryWriterSession
305                 {
306                         public override bool TryAdd (XmlDictionaryString value, out int key)
307                         {
308                                 if (!base.TryAdd (value, out key))
309                                         return false;
310                                 List.Add (value);
311                                 return true;
312                         }
313
314                         public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
315                 }
316
317                 public const byte VersionRecord = 0;
318                 public const byte ModeRecord = 1;
319                 public const byte ViaRecord = 2;
320                 public const byte KnownEncodingRecord = 3;
321                 public const byte ExtendingEncodingRecord = 4;
322                 public const byte UnsizedEnvelopeRecord = 5;
323                 public const byte SizedEnvelopeRecord = 6;
324                 public const byte EndRecord = 7;
325                 public const byte FaultRecord = 8;
326                 public const byte UpgradeRequestRecord = 9;
327                 public const byte UpgradeResponseRecord = 0xA;
328                 public const byte PreambleAckRecord = 0xB;
329                 public const byte PreambleEndRecord = 0xC;
330
331                 public const byte SingletonUnsizedMode = 1;
332                 public const byte DuplexMode = 2;
333                 public const byte SimplexMode = 3;
334                 public const byte SingletonSizedMode = 4;
335                 MyBinaryReader reader;
336                 MyBinaryWriter writer;
337
338                 public TcpBinaryFrameManager (int mode, Stream s, bool isServiceSide)
339                 {
340                         this.mode = mode;
341                         this.s = s;
342                         this.is_service_side = isServiceSide;
343                         reader = new MyBinaryReader (s);
344                         ResetWriteBuffer ();
345
346                         EncodingRecord = 8; // FIXME: it should depend on mode.
347                 }
348
349                 Stream s;
350                 MemoryStream buffer;
351                 bool is_service_side;
352
353                 int mode;
354
355                 public byte EncodingRecord { get; set; }
356
357                 public Uri Via { get; set; }
358
359                 public MessageEncoder Encoder { get; set; }
360
361                 void ResetWriteBuffer ()
362                 {
363                         this.buffer = new MemoryStream ();
364                         writer = new MyBinaryWriter (buffer);
365                 }
366
367                 public byte [] ReadSizedChunk ()
368                 {
369                         int length = reader.ReadVariableInt ();
370                         
371                         if (length > 65536)
372                                 throw new InvalidOperationException ("The message is too large.");
373
374                         byte [] buffer = new byte [length];
375                         for (int readSize = 0; readSize < length; )
376                                 readSize += reader.Read (buffer, readSize, length - readSize);
377                         return buffer;
378                 }
379
380                 public void WriteSizedChunk (byte [] data)
381                 {
382                         writer.WriteVariableInt (data.Length);
383                         writer.Write (data, 0, data.Length);
384                 }
385
386                 void ProcessPreambleInitiator ()
387                 {
388                         buffer.WriteByte (VersionRecord);
389                         buffer.WriteByte (1);
390                         buffer.WriteByte (0);
391                         buffer.WriteByte (ModeRecord);
392                         buffer.WriteByte ((byte) mode);
393                         buffer.WriteByte (ViaRecord);
394                         writer.Write (Via.ToString ());
395                         buffer.WriteByte (KnownEncodingRecord); // FIXME
396                         buffer.WriteByte ((byte) EncodingRecord);
397                         buffer.WriteByte (PreambleEndRecord);
398                         buffer.Flush ();
399                 }
400
401                 void ProcessPreambleAckInitiator ()
402                 {
403                         int b = s.ReadByte ();
404                         switch (b) {
405                         case PreambleAckRecord:
406                                 return; // success
407                         case FaultRecord:
408                                 throw new FaultException (reader.ReadString ());
409                         default:
410                                 throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
411                         }
412                 }
413
414                 void ProcessPreambleAckRecipient ()
415                 {
416                         s.WriteByte (PreambleAckRecord);
417                 }
418
419                 void ProcessPreambleRecipient (bool allowEndRecord)
420                 {
421                         bool preambleEnd = false;
422                         while (!preambleEnd) {
423                                 int b = s.ReadByte ();
424                                 switch (b) {
425                                 case VersionRecord:
426                                         if (s.ReadByte () != 1)
427                                                 throw new ProtocolException ("Major version must be 1");
428                                         if (s.ReadByte () != 0)
429                                                 throw new ProtocolException ("Minor version must be 0");
430                                         break;
431                                 case ModeRecord:
432                                         if (s.ReadByte () != mode)
433                                                 throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
434                                         break;
435                                 case ViaRecord:
436                                         Via = new Uri (reader.ReadString ());
437                                         break;
438                                 case KnownEncodingRecord:
439                                         EncodingRecord = (byte) s.ReadByte ();
440                                         break;
441                                 case ExtendingEncodingRecord:
442                                         throw new NotImplementedException ();
443                                 case UpgradeRequestRecord:
444                                         throw new NotImplementedException ();
445                                 case UpgradeResponseRecord:
446                                         throw new NotImplementedException ();
447                                 case PreambleEndRecord:
448                                         preambleEnd = true;
449                                         break;
450                                 case EndRecord:
451                                         if (allowEndRecord)
452                                                 break;
453                                         goto default;
454                                 default:
455                                         throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
456                                 }
457                         }
458                 }
459
460                 bool message_already_read_once;
461
462                 public Message ReadSizedMessage ()
463                 {
464                         // FIXME: implement full [MC-NMF].
465                         if (is_service_side) {
466                                 ProcessPreambleRecipient (message_already_read_once);
467                                 message_already_read_once = true;
468                                 ProcessPreambleAckRecipient ();
469                         }
470
471                         var packetType = s.ReadByte ();
472                         if (packetType != SizedEnvelopeRecord)
473                                 throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
474
475                         byte [] buffer = ReadSizedChunk ();
476
477                         var ms = new MemoryStream (buffer, 0, buffer.Length);
478
479                         // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
480                         if (EncodingRecord != 8)
481                                 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
482
483                         // Encoding type 8:
484                         // the returned buffer consists of a serialized reader 
485                         // session and the binary xml body. 
486
487                         var session = new XmlBinaryReaderSession ();
488                         byte [] rsbuf = new TcpBinaryFrameManager (0, ms, is_service_side).ReadSizedChunk ();
489                         int count = 0;
490                         using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
491                                 var rbr = new BinaryReader (rms, Encoding.UTF8);
492                                 while (rms.Position < rms.Length)
493                                         session.Add (count++, rbr.ReadString ());
494                         }
495                         var benc = Encoder as BinaryMessageEncoder;
496                         if (benc != null)
497                                 benc.CurrentReaderSession = session;
498                         // FIXME: supply maxSizeOfHeaders.
499                         Message msg = Encoder.ReadMessage (ms, 0x10000);
500                         if (benc != null)
501                                 benc.CurrentReaderSession = null;
502
503                         if (!is_service_side)
504                                 if (s.Read (eof_buffer, 0, 1) == 1)
505                                         if (eof_buffer [0] != EndRecord)
506                                                 throw new ProtocolException (String.Format ("Expected EndRecord message, got {0:X02}", eof_buffer [0]));
507
508                         return msg;
509                 }
510
511                 byte [] eof_buffer = new byte [1];
512
513                 public void WriteSizedMessage (Message message)
514                 {
515                         ResetWriteBuffer ();
516
517                         if (!is_service_side)
518                                 ProcessPreambleInitiator ();
519
520                         // FIXME: implement full [MC-NMF] protocol.
521
522                         if (EncodingRecord != 8)
523                                 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
524
525                         buffer.WriteByte (SizedEnvelopeRecord);
526
527                         MemoryStream ms = new MemoryStream ();
528                         var session = new MyXmlBinaryWriterSession ();
529                         var benc = Encoder as BinaryMessageEncoder;
530                         try {
531                                 if (benc != null)
532                                         benc.CurrentWriterSession = session;
533                                 Encoder.WriteMessage (message, ms);
534                         } finally {
535                                 benc.CurrentWriterSession = null;
536                         }
537
538                         // dictionary
539                         MemoryStream msd = new MemoryStream ();
540                         BinaryWriter dw = new BinaryWriter (msd);
541                         foreach (var ds in session.List)
542                                 dw.Write (ds.Value);
543                         dw.Flush ();
544
545                         int length = (int) (msd.Position + ms.Position);
546                         var msda = msd.ToArray ();
547                         int sizeOfLength = writer.GetSizeOfLength (msda.Length);
548
549                         writer.WriteVariableInt (length + sizeOfLength); // dictionary array also involves the size of itself.
550                         WriteSizedChunk (msda);
551                         // message body
552                         var arr = ms.GetBuffer ();
553                         writer.Write (arr, 0, (int) ms.Position);
554
555                         writer.Flush ();
556
557                         s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
558                         s.Flush ();
559
560                         // It is processed at *this* late.
561                         if (!is_service_side)
562                                 ProcessPreambleAckInitiator ();
563
564                         s.WriteByte (EndRecord); // it is required
565                         s.Flush ();
566                 }
567
568                 public void ProcessEndRecordRecipient ()
569                 {
570                         int b;
571                         if ((b = s.ReadByte ()) != EndRecord)
572                                 throw new ProtocolException (String.Format ("EndRecord message was expected, got {0:X}", b));
573                 }
574         }
575 }