2009-06-26 Atsushi Enomoto <atsushi@ximian.com>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Channels / TcpDuplexSessionChannel.cs
1 // 
2 // TcpDuplexSessionChannel.cs
3 // 
4 // Author: 
5 //      Marcos Cobena (marcoscobena@gmail.com)
6 //      Atsushi Enomoto  <atsushi@ximian.com>
7 // 
8 // Copyright 2007 Marcos Cobena (http://www.youcannoteatbits.org/)
9 //
10 // Copyright (C) 2009 Novell, Inc (http://www.novell.com)
11 //
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
19 // 
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
22 // 
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 //
31
32 using System;
33 using System.Collections.Generic;
34 using System.IO;
35 using System.Net;
36 using System.Net.Sockets;
37 using System.Runtime.Serialization;
38 using System.Runtime.Serialization.Formatters.Binary;
39 using System.ServiceModel.Channels;
40 using System.Text;
41 using System.Threading;
42 using System.Xml;
43
44 namespace System.ServiceModel.Channels
45 {
46         internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
47         {
48                 class TcpDuplexSession : DuplexSessionBase
49                 {
50                         TcpDuplexSessionChannel owner;
51
52                         internal TcpDuplexSession (TcpDuplexSessionChannel owner)
53                         {
54                                 this.owner = owner;
55                         }
56
57                         public override TimeSpan DefaultCloseTimeout {
58                                 get { return owner.DefaultCloseTimeout; }
59                         }
60
61                         public override void Close (TimeSpan timeout)
62                         {
63                                 owner.DiscardSession ();
64                         }
65                 }
66
67                 TcpChannelInfo info;
68                 TcpClient client;
69                 bool is_service_side;
70                 EndpointAddress local_address;
71                 TimeSpan timeout;
72                 TcpBinaryFrameManager frame;
73                 TcpDuplexSession session; // do not use this directly. Use Session instead.
74                 
75                 public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
76                         : base (factory, address, via)
77                 {
78                         is_service_side = false;
79                         this.info = info;
80                 }
81                 
82                 public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpClient client, TimeSpan timeout)
83                         : base (listener)
84                 {
85                         is_service_side = true;
86                         this.client = client;
87                         this.info = info;
88                         this.timeout = timeout;
89                 }
90                 
91                 public MessageEncoder Encoder {
92                         get { return info.MessageEncoder; }
93                 }
94
95                 public override EndpointAddress LocalAddress {
96                         get { return local_address; }
97                 }
98                 
99                 public IDuplexSession Session {
100                         get {
101                                 if (session == null)
102                                         session = new TcpDuplexSession (this);
103                                 return session;
104                         }
105                 }
106
107                 void DiscardSession ()
108                 {
109                         frame.ProcessEndRecordInitiator ();
110                         session = null;
111                 }
112
113                 public override void Send (Message message)
114                 {
115                         Send (message, DefaultSendTimeout);
116                 }
117                 
118                 public override void Send (Message message, TimeSpan timeout)
119                 {
120                         if (timeout <= TimeSpan.Zero)
121                                 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
122
123                         if (!is_service_side) {
124                                 if (message.Headers.To == null)
125                                         message.Headers.To = RemoteAddress.Uri;
126                                 if (message.Headers.ReplyTo == null)
127                                         message.Headers.ReplyTo = new EndpointAddress (Constants.WsaAnonymousUri);
128                         } else {
129                                 if (message.Headers.RelatesTo == null)
130                                         message.Headers.RelatesTo = OperationContext.Current.IncomingMessageHeaders.MessageId;
131                         }
132
133                         client.SendTimeout = (int) timeout.TotalMilliseconds;
134                         frame.WriteSizedMessage (message);
135                         // FIXME: should EndRecord be sent here?
136                         //if (is_service_side && client.Available > 0)
137                         //      frame.ProcessEndRecordRecipient ();
138                 }
139                 
140                 public override Message Receive ()
141                 {
142                         return Receive (DefaultReceiveTimeout);
143                 }
144                 
145                 public override Message Receive (TimeSpan timeout)
146                 {
147                         if (timeout <= TimeSpan.Zero)
148                                 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
149                         client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
150                         return frame.ReadSizedMessage ();
151                 }
152                 
153                 public override bool TryReceive (TimeSpan timeout, out Message message)
154                 {
155                         try {
156                                 DateTime start = DateTime.Now;
157                                 message = Receive (timeout);
158                                 if (message != null)
159                                         return true;
160                                 // received EndRecord, so close the session and return false instead.
161                                 // (Closing channel here might not be a good idea, but right now I have no better way.)
162                                 Close (timeout - (DateTime.Now - start));
163                                 return false;
164                         } catch (TimeoutException) {
165                                 message = null;
166                                 return false;
167                         }
168                 }
169                 
170                 public override bool WaitForMessage (TimeSpan timeout)
171                 {
172                         if (client.Available > 0)
173                                 return true;
174
175                         DateTime start = DateTime.Now;
176                         do {
177                                 Thread.Sleep (50);
178                                 if (client.Available > 0)
179                                         return true;
180                         } while (DateTime.Now - start < timeout);
181                         return false;
182                 }
183                 
184                 // CommunicationObject
185                 
186                 [MonoTODO]
187                 protected override void OnAbort ()
188                 {
189                         if (client != null)
190                                 client.Close ();
191                 }
192
193                 [MonoTODO]
194                 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
195                         AsyncCallback callback, object state)
196                 {
197                         throw new NotImplementedException ();
198                 }
199
200                 [MonoTODO]
201                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
202                         AsyncCallback callback, object state)
203                 {
204                         throw new NotImplementedException ();
205                 }
206
207                 [MonoTODO]
208                 protected override void OnClose (TimeSpan timeout)
209                 {
210                         if (!is_service_side)
211                                 if (session != null)
212                                         session.Close (timeout);
213
214                         if (client != null)
215                                 client.Close ();
216                 }
217                 
218                 [MonoTODO]
219                 protected override void OnEndClose (IAsyncResult result)
220                 {
221                         throw new NotImplementedException ();
222                 }
223
224                 [MonoTODO]
225                 protected override void OnEndOpen (IAsyncResult result)
226                 {
227                         throw new NotImplementedException ();
228                 }
229                 
230                 [MonoTODO]
231                 protected override void OnOpen (TimeSpan timeout)
232                 {
233                         if (! is_service_side) {
234                                 int explicitPort = RemoteAddress.Uri.Port;
235                                 client = new TcpClient (RemoteAddress.Uri.Host, explicitPort <= 0 ? TcpTransportBindingElement.DefaultPort : explicitPort);
236                                                         //RemoteAddress.Uri.Port);
237                                 
238                                 NetworkStream ns = client.GetStream ();
239                                 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns, is_service_side) {
240                                         Encoder = this.Encoder,
241                                         Via = RemoteAddress.Uri };
242                                 frame.ProcessPreambleInitiator ();
243                                 frame.ProcessPreambleAckInitiator ();
244                         } else {
245                                 // server side
246                                 Stream s = client.GetStream ();
247
248                                 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s, is_service_side) { Encoder = this.Encoder };
249
250                                 // FIXME: use retrieved record properties in the request processing.
251
252                                 frame.ProcessPreambleRecipient ();
253                                 frame.ProcessPreambleAckRecipient ();
254                         }
255                 }
256                 
257                 class MyBinaryWriter : BinaryWriter
258                 {
259                         public MyBinaryWriter (Stream s)
260                                 : base (s)
261                         {
262                         }
263                         
264                         public void WriteBytes (byte [] bytes)
265                         {
266                                 Write7BitEncodedInt (bytes.Length);
267                                 Write (bytes);
268                         }
269                 }
270         }
271
272         // seealso: [MC-NMF] Windows Protocol document.
273         class TcpBinaryFrameManager
274         {
275                 class MyBinaryReader : BinaryReader
276                 {
277                         public MyBinaryReader (Stream s)
278                                 : base (s)
279                         {
280                         }
281
282                         public int ReadVariableInt ()
283                         {
284                                 return Read7BitEncodedInt ();
285                         }
286                 }
287
288                 class MyBinaryWriter : BinaryWriter
289                 {
290                         public MyBinaryWriter (Stream s)
291                                 : base (s)
292                         {
293                         }
294
295                         public void WriteVariableInt (int value)
296                         {
297                                 Write7BitEncodedInt (value);
298                         }
299
300                         public int GetSizeOfLength (int value)
301                         {
302                                 int x = 0;
303                                 do {
304                                         value /= 0x100;
305                                         x++;
306                                 } while (value != 0);
307                                 return x;
308                         }
309                 }
310
311                 class MyXmlBinaryWriterSession : XmlBinaryWriterSession
312                 {
313                         public override bool TryAdd (XmlDictionaryString value, out int key)
314                         {
315                                 if (!base.TryAdd (value, out key))
316                                         return false;
317                                 List.Add (value);
318                                 return true;
319                         }
320
321                         public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
322                 }
323
324                 public const byte VersionRecord = 0;
325                 public const byte ModeRecord = 1;
326                 public const byte ViaRecord = 2;
327                 public const byte KnownEncodingRecord = 3;
328                 public const byte ExtendingEncodingRecord = 4;
329                 public const byte UnsizedEnvelopeRecord = 5;
330                 public const byte SizedEnvelopeRecord = 6;
331                 public const byte EndRecord = 7;
332                 public const byte FaultRecord = 8;
333                 public const byte UpgradeRequestRecord = 9;
334                 public const byte UpgradeResponseRecord = 0xA;
335                 public const byte PreambleAckRecord = 0xB;
336                 public const byte PreambleEndRecord = 0xC;
337
338                 public const byte SingletonUnsizedMode = 1;
339                 public const byte DuplexMode = 2;
340                 public const byte SimplexMode = 3;
341                 public const byte SingletonSizedMode = 4;
342                 MyBinaryReader reader;
343                 MyBinaryWriter writer;
344
345                 public TcpBinaryFrameManager (int mode, Stream s, bool isServiceSide)
346                 {
347                         this.mode = mode;
348                         this.s = s;
349                         this.is_service_side = isServiceSide;
350                         reader = new MyBinaryReader (s);
351                         ResetWriteBuffer ();
352
353                         EncodingRecord = 8; // FIXME: it should depend on mode.
354                 }
355
356                 Stream s;
357                 MemoryStream buffer;
358                 bool is_service_side;
359
360                 int mode;
361
362                 public byte EncodingRecord { get; set; }
363
364                 public Uri Via { get; set; }
365
366                 public MessageEncoder Encoder { get; set; }
367
368                 void ResetWriteBuffer ()
369                 {
370                         this.buffer = new MemoryStream ();
371                         writer = new MyBinaryWriter (buffer);
372                 }
373
374                 public byte [] ReadSizedChunk ()
375                 {
376                         int length = reader.ReadVariableInt ();
377                         
378                         if (length > 65536)
379                                 throw new InvalidOperationException ("The message is too large.");
380
381                         byte [] buffer = new byte [length];
382                         for (int readSize = 0; readSize < length; )
383                                 readSize += reader.Read (buffer, readSize, length - readSize);
384                         return buffer;
385                 }
386
387                 public void WriteSizedChunk (byte [] data)
388                 {
389                         writer.WriteVariableInt (data.Length);
390                         writer.Write (data, 0, data.Length);
391                 }
392
393                 public void ProcessPreambleInitiator ()
394                 {
395                         ResetWriteBuffer ();
396
397                         buffer.WriteByte (VersionRecord);
398                         buffer.WriteByte (1);
399                         buffer.WriteByte (0);
400                         buffer.WriteByte (ModeRecord);
401                         buffer.WriteByte ((byte) mode);
402                         buffer.WriteByte (ViaRecord);
403                         writer.Write (Via.ToString ());
404                         buffer.WriteByte (KnownEncodingRecord); // FIXME
405                         buffer.WriteByte ((byte) EncodingRecord);
406                         buffer.WriteByte (PreambleEndRecord);
407                         buffer.Flush ();
408                         s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
409                         s.Flush ();
410                 }
411
412                 public void ProcessPreambleAckInitiator ()
413                 {
414                         int b = s.ReadByte ();
415                         switch (b) {
416                         case PreambleAckRecord:
417                                 return; // success
418                         case FaultRecord:
419                                 throw new FaultException (reader.ReadString ());
420                         default:
421                                 throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
422                         }
423                 }
424
425                 public void ProcessPreambleAckRecipient ()
426                 {
427                         s.WriteByte (PreambleAckRecord);
428                 }
429
430                 public void ProcessPreambleRecipient ()
431                 {
432                         bool preambleEnd = false;
433                         while (!preambleEnd) {
434                                 int b = s.ReadByte ();
435                                 switch (b) {
436                                 case VersionRecord:
437                                         if (s.ReadByte () != 1)
438                                                 throw new ProtocolException ("Major version must be 1");
439                                         if (s.ReadByte () != 0)
440                                                 throw new ProtocolException ("Minor version must be 0");
441                                         break;
442                                 case ModeRecord:
443                                         if (s.ReadByte () != mode)
444                                                 throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
445                                         break;
446                                 case ViaRecord:
447                                         Via = new Uri (reader.ReadString ());
448                                         break;
449                                 case KnownEncodingRecord:
450                                         EncodingRecord = (byte) s.ReadByte ();
451                                         break;
452                                 case ExtendingEncodingRecord:
453                                         throw new NotImplementedException ("ExtendingEncodingRecord");
454                                 case UpgradeRequestRecord:
455                                         throw new NotImplementedException ("UpgradeRequetRecord");
456                                 case UpgradeResponseRecord:
457                                         throw new NotImplementedException ("UpgradeResponseRecord");
458                                 case PreambleEndRecord:
459                                         preambleEnd = true;
460                                         break;
461                                 default:
462                                         throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
463                                 }
464                         }
465                 }
466
467                 XmlBinaryReaderSession reader_session;
468                 int reader_session_items;
469
470                 public Message ReadSizedMessage ()
471                 {
472                         // FIXME: implement full [MC-NMF].
473
474                         var packetType = s.ReadByte ();
475                         if (packetType == EndRecord)
476                                 return null;
477                         if (packetType != SizedEnvelopeRecord)
478                                 throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
479
480                         byte [] buffer = ReadSizedChunk ();
481
482                         var ms = new MemoryStream (buffer, 0, buffer.Length);
483
484                         // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
485                         if (EncodingRecord != 8)
486                                 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
487
488                         // Encoding type 8:
489                         // the returned buffer consists of a serialized reader 
490                         // session and the binary xml body. 
491
492                         var session = reader_session ?? new XmlBinaryReaderSession ();
493                         reader_session = session;
494                         byte [] rsbuf = new TcpBinaryFrameManager (0, ms, is_service_side).ReadSizedChunk ();
495                         using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
496                                 var rbr = new BinaryReader (rms, Encoding.UTF8);
497                                 while (rms.Position < rms.Length)
498                                         session.Add (reader_session_items++, rbr.ReadString ());
499                         }
500                         var benc = Encoder as BinaryMessageEncoder;
501                         if (benc != null)
502                                 benc.CurrentReaderSession = session;
503
504                         // FIXME: supply maxSizeOfHeaders.
505                         Message msg = Encoder.ReadMessage (ms, 0x10000);
506                         if (benc != null)
507                                 benc.CurrentReaderSession = null;
508
509 //                      if (!is_service_side)
510 //                              if (s.Read (eof_buffer, 0, 1) == 1)
511 //                                      if (eof_buffer [0] != EndRecord)
512 //                                              throw new ProtocolException (String.Format ("Expected EndRecord message, got {0:X02}", eof_buffer [0]));
513 //
514
515                         return msg;
516                 }
517
518                 byte [] eof_buffer = new byte [1];
519                 MyXmlBinaryWriterSession writer_session;
520
521                 public void WriteSizedMessage (Message message)
522                 {
523                         ResetWriteBuffer ();
524
525                         // FIXME: implement full [MC-NMF] protocol.
526
527                         if (EncodingRecord != 8)
528                                 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
529
530                         buffer.WriteByte (SizedEnvelopeRecord);
531
532                         MemoryStream ms = new MemoryStream ();
533                         var session = writer_session ?? new MyXmlBinaryWriterSession ();
534                         writer_session = session;
535                         int writer_session_count = session.List.Count;
536                         var benc = Encoder as BinaryMessageEncoder;
537                         try {
538                                 if (benc != null)
539                                         benc.CurrentWriterSession = session;
540                                 Encoder.WriteMessage (message, ms);
541                         } finally {
542                                 benc.CurrentWriterSession = null;
543                         }
544
545                         // dictionary
546                         MemoryStream msd = new MemoryStream ();
547                         BinaryWriter dw = new BinaryWriter (msd);
548                         foreach (var ds in session.List)
549                                 dw.Write (ds.Value);
550                         dw.Flush ();
551
552                         int length = (int) (msd.Position + ms.Position);
553                         var msda = msd.ToArray ();
554                         int sizeOfLength = writer.GetSizeOfLength (msda.Length);
555
556                         writer.WriteVariableInt (length + sizeOfLength); // dictionary array also involves the size of itself.
557                         WriteSizedChunk (msda);
558                         // message body
559                         var arr = ms.GetBuffer ();
560                         writer.Write (arr, 0, (int) ms.Position);
561
562                         writer.Flush ();
563
564                         s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
565                         s.Flush ();
566                 }
567
568                 public void ProcessEndRecordInitiator ()
569                 {
570                         s.WriteByte (EndRecord); // it is required
571                         s.Flush ();
572                 }
573
574                 public void ProcessEndRecordRecipient ()
575                 {
576                         int b;
577                         if ((b = s.ReadByte ()) != EndRecord)
578                                 throw new ProtocolException (String.Format ("EndRecord message was expected, got {0:X}", b));
579                 }
580         }
581 }