2009-05-26 Atsushi Enomoto <atsushi@ximian.com>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Channels / TcpDuplexSessionChannel.cs
1 // 
2 // TcpDuplexSessionChannel.cs
3 // 
4 // Author: 
5 //      Marcos Cobena (marcoscobena@gmail.com)
6 //      Atsushi Enomoto  <atsushi@ximian.com>
7 // 
8 // Copyright 2007 Marcos Cobena (http://www.youcannoteatbits.org/)
9 //
10 // Copyright (C) 2009 Novell, Inc (http://www.novell.com)
11 //
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
19 // 
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
22 // 
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 //
31
32 using System;
33 using System.Collections.Generic;
34 using System.IO;
35 using System.Net;
36 using System.Net.Sockets;
37 using System.Runtime.Serialization;
38 using System.Runtime.Serialization.Formatters.Binary;
39 using System.ServiceModel.Channels;
40 using System.Text;
41 using System.Xml;
42
43 namespace System.ServiceModel.Channels
44 {
45         internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
46         {
47                 class TcpDuplexSession : DuplexSessionBase
48                 {
49                         TcpDuplexSessionChannel owner;
50
51                         internal TcpDuplexSession (TcpDuplexSessionChannel owner)
52                         {
53                                 this.owner = owner;
54                         }
55
56                         public override TimeSpan DefaultCloseTimeout {
57                                 get { return owner.DefaultCloseTimeout; }
58                         }
59
60                         public override void Close (TimeSpan timeout)
61                         {
62                                 // FIXME: what to do here?
63                                 throw new NotImplementedException ();
64                         }
65                 }
66
67                 TcpChannelInfo info;
68                 TcpClient client;
69                 bool is_service_side;
70                 EndpointAddress local_address;
71                 TcpListener tcp_listener;
72                 TimeSpan timeout;
73                 TcpBinaryFrameManager frame;
74                 TcpDuplexSession session;
75                 
76                 public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
77                         : base (factory, address, via)
78                 {
79                         is_service_side = false;
80                         this.info = info;
81                         session = new TcpDuplexSession (this);
82                 }
83                 
84                 public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpListener tcpListener, TimeSpan timeout)
85                         : base (listener)
86                 {
87                         is_service_side = true;
88                         tcp_listener = tcpListener;
89                         this.info = info;
90                         session = new TcpDuplexSession (this);
91                         this.timeout = timeout;
92                 }
93                 
94                 public MessageEncoder Encoder {
95                         get { return info.MessageEncoder; }
96                 }
97
98                 public override EndpointAddress LocalAddress {
99                         get { return local_address; }
100                 }
101                 
102                 public IDuplexSession Session {
103                         get { return session; }
104                 }
105
106                 public override void Send (Message message)
107                 {
108                         Send (message, DefaultSendTimeout);
109                 }
110                 
111                 public override void Send (Message message, TimeSpan timeout)
112                 {
113                         if (!is_service_side && message.Headers.To == null)
114                                 message.Headers.To = RemoteAddress.Uri;
115                         if (!is_service_side && message.Headers.ReplyTo == null)
116                                 message.Headers.ReplyTo = new EndpointAddress (Constants.WsaAnonymousUri);
117
118                         client.SendTimeout = (int) timeout.TotalMilliseconds;
119                         frame.WriteSizedMessage (message);
120                         // FIXME: should EndRecord be sent here?
121                         //if (is_service_side && client.Available > 0)
122                         //      frame.ProcessEndRecordRecipient ();
123                 }
124                 
125                 [MonoTODO]
126                 public override IAsyncResult BeginTryReceive (TimeSpan timeout, AsyncCallback callback, object state)
127                 {
128                         throw new NotImplementedException ();
129                 }
130                 
131                 [MonoTODO]
132                 public override bool EndTryReceive (IAsyncResult result, out Message message)
133                 {
134                         throw new NotImplementedException ();
135                 }
136                 
137                 public override Message Receive ()
138                 {
139                         return Receive (DefaultReceiveTimeout);
140                 }
141                 
142                 public override Message Receive (TimeSpan timeout)
143                 {
144                         client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
145                         Stream s = client.GetStream ();
146
147                         return frame.ReadSizedMessage ();
148                 }
149                 
150                 public override bool TryReceive (TimeSpan timeout, out Message message)
151                 {
152                         try {
153                                 message = Receive (timeout);
154                                 return true;
155                         } catch (TimeoutException) {
156                                 message = null;
157                                 return false;
158                         }
159                 }
160                 
161                 public override bool WaitForMessage (TimeSpan timeout)
162                 {
163                         // FIXME: use timeout
164                         try {
165                                 client = tcp_listener.AcceptTcpClient ();
166                                 Stream s = client.GetStream ();
167
168                                 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s) { Encoder = this.Encoder };
169                                 frame.ProcessPreambleRecipient ();
170
171                                 // FIXME: use retrieved record properties in the request processing.
172
173                                 return true;
174                         } catch (TimeoutException) {
175                                 return false;
176                         }
177                 }
178                 
179                 // CommunicationObject
180                 
181                 [MonoTODO]
182                 protected override void OnAbort ()
183                 {
184                         throw new NotImplementedException ();
185                 }
186
187                 [MonoTODO]
188                 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
189                         AsyncCallback callback, object state)
190                 {
191                         throw new NotImplementedException ();
192                 }
193
194                 [MonoTODO]
195                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
196                         AsyncCallback callback, object state)
197                 {
198                         throw new NotImplementedException ();
199                 }
200
201                 [MonoTODO]
202                 protected override void OnClose (TimeSpan timeout)
203                 {
204                         client.Close ();
205                 }
206                 
207                 [MonoTODO]
208                 protected override void OnEndClose (IAsyncResult result)
209                 {
210                         throw new NotImplementedException ();
211                 }
212
213                 [MonoTODO]
214                 protected override void OnEndOpen (IAsyncResult result)
215                 {
216                         throw new NotImplementedException ();
217                 }
218                 
219                 [MonoTODO]
220                 protected override void OnOpen (TimeSpan timeout)
221                 {
222                         if (! is_service_side) {
223                                 int explicitPort = RemoteAddress.Uri.Port;
224                                 client = new TcpClient (RemoteAddress.Uri.Host, explicitPort <= 0 ? TcpTransportBindingElement.DefaultPort : explicitPort);
225                                                         //RemoteAddress.Uri.Port);
226                                 
227                                 NetworkStream ns = client.GetStream ();
228                                 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns) {
229                                         Encoder = this.Encoder,
230                                         Via = RemoteAddress.Uri };
231                                 frame.ProcessPreambleInitiator ();
232                         } else {
233                                 // server side
234                         }
235                 }
236                 
237                 class MyBinaryWriter : BinaryWriter
238                 {
239                         public MyBinaryWriter (Stream s)
240                                 : base (s)
241                         {
242                         }
243                         
244                         public void WriteBytes (byte [] bytes)
245                         {
246                                 Write7BitEncodedInt (bytes.Length);
247                                 Write (bytes);
248                         }
249                 }
250         }
251
252         // seealso: [MC-NMF] Windows Protocol document.
253         class TcpBinaryFrameManager
254         {
255                 class MyBinaryReader : BinaryReader
256                 {
257                         public MyBinaryReader (Stream s)
258                                 : base (s)
259                         {
260                         }
261
262                         public int ReadVariableInt ()
263                         {
264                                 return Read7BitEncodedInt ();
265                         }
266                 }
267
268                 class MyBinaryWriter : BinaryWriter
269                 {
270                         public MyBinaryWriter (Stream s)
271                                 : base (s)
272                         {
273                         }
274
275                         public void WriteVariableInt (int value)
276                         {
277                                 Write7BitEncodedInt (value);
278                         }
279                 }
280
281                 class MyXmlBinaryWriterSession : XmlBinaryWriterSession
282                 {
283                         public override bool TryAdd (XmlDictionaryString value, out int key)
284                         {
285                                 if (!base.TryAdd (value, out key))
286                                         return false;
287                                 List.Add (value);
288                                 return true;
289                         }
290
291                         public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
292                 }
293
294                 public const byte VersionRecord = 0;
295                 public const byte ModeRecord = 1;
296                 public const byte ViaRecord = 2;
297                 public const byte KnownEncodingRecord = 3;
298                 public const byte ExtendingEncodingRecord = 4;
299                 public const byte UnsizedEnvelopeRecord = 5;
300                 public const byte SizedEnvelopeRecord = 6;
301                 public const byte EndRecord = 7;
302                 public const byte FaultRecord = 8;
303                 public const byte UpgradeRequestRecord = 9;
304                 public const byte UpgradeResponseRecord = 0xA;
305                 public const byte PreambleAckRecord = 0xB;
306                 public const byte PreambleEndRecord = 0xC;
307
308                 public const byte SingletonUnsizedMode = 1;
309                 public const byte DuplexMode = 2;
310                 public const byte SimplexMode = 3;
311                 public const byte SingletonSizedMode = 4;
312                 MyBinaryReader reader;
313                 MyBinaryWriter writer;
314
315                 public TcpBinaryFrameManager (int mode, Stream s)
316                 {
317                         this.mode = mode;
318                         this.s = s;
319                         reader = new MyBinaryReader (s);
320                         writer = new MyBinaryWriter (s);
321
322                         EncodingRecord = 8; // FIXME: it should depend on mode.
323                 }
324
325                 Stream s;
326
327                 int mode;
328
329                 public byte EncodingRecord { get; set; }
330
331                 public Uri Via { get; set; }
332
333                 public MessageEncoder Encoder { get; set; }
334
335                 public byte [] ReadSizedChunk ()
336                 {
337                         int length = reader.ReadVariableInt ();
338                         
339                         if (length > 65536)
340                                 throw new InvalidOperationException ("The message is too large.");
341
342                         byte [] buffer = new byte [length];
343                         for (int readSize = 0; readSize < length; )
344                                 readSize += reader.Read (buffer, readSize, length - readSize);
345                         return buffer;
346                 }
347
348                 public void WriteSizedChunk (byte [] data)
349                 {
350                         writer.WriteVariableInt (data.Length);
351                         writer.Write (data, 0, data.Length);
352                 }
353
354                 public void ProcessPreambleInitiator ()
355                 {
356                         s.WriteByte (VersionRecord);
357                         s.WriteByte (1);
358                         s.WriteByte (0);
359                         s.WriteByte (ModeRecord);
360                         s.WriteByte ((byte) mode);
361                         s.WriteByte (ViaRecord);
362                         writer.Write (Via.ToString ());
363                         s.WriteByte (KnownEncodingRecord); // FIXME
364                         s.WriteByte ((byte) EncodingRecord);
365                         s.WriteByte (PreambleEndRecord);
366                         s.Flush ();
367
368                         int b = s.ReadByte ();
369                         switch (b) {
370                         case PreambleAckRecord:
371                                 return; // success
372                         case FaultRecord:
373                                 throw new FaultException (reader.ReadString ());
374                         default:
375                                 throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
376                         }
377                 }
378
379                 public void ProcessPreambleRecipient ()
380                 {
381                         bool preambleEnd = false;
382                         while (!preambleEnd) {
383                                 int b = s.ReadByte ();
384                                 switch (b) {
385                                 case VersionRecord:
386                                         if (s.ReadByte () != 1)
387                                                 throw new ProtocolException ("Major version must be 1");
388                                         if (s.ReadByte () != 0)
389                                                 throw new ProtocolException ("Minor version must be 0");
390                                         break;
391                                 case ModeRecord:
392                                         if (s.ReadByte () != mode)
393                                                 throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
394                                         break;
395                                 case ViaRecord:
396                                         Via = new Uri (reader.ReadString ());
397                                         break;
398                                 case KnownEncodingRecord:
399                                         EncodingRecord = (byte) s.ReadByte ();
400                                         break;
401                                 case ExtendingEncodingRecord:
402                                         throw new NotImplementedException ();
403                                 case UpgradeRequestRecord:
404                                         throw new NotImplementedException ();
405                                 case UpgradeResponseRecord:
406                                         throw new NotImplementedException ();
407                                 case PreambleEndRecord:
408                                         preambleEnd = true;
409                                         break;
410                                 default:
411                                         throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
412                                 }
413                         }
414                         s.WriteByte (PreambleAckRecord);
415                 }
416
417                 public Message ReadSizedMessage ()
418                 {
419                         // FIXME: implement [MC-NMF] correctly. Currently it is a guessed protocol hack.
420
421                         var packetType = s.ReadByte ();
422                         if (packetType != SizedEnvelopeRecord)
423                                 throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
424
425                         byte [] buffer = ReadSizedChunk ();
426
427                         var ms = new MemoryStream (buffer, 0, buffer.Length);
428
429                         // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
430                         if (EncodingRecord != 8)
431                                 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
432
433                         // Encoding type 8:
434                         // the returned buffer consists of a serialized reader 
435                         // session and the binary xml body. 
436
437                         var session = new XmlBinaryReaderSession ();
438                         byte [] rsbuf = new TcpBinaryFrameManager (0, ms).ReadSizedChunk ();
439                         int count = 0;
440                         using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
441                                 var rbr = new BinaryReader (rms, Encoding.UTF8);
442                                 while (rms.Position < rms.Length)
443                                         session.Add (count++, rbr.ReadString ());
444                         }
445                         var benc = Encoder as BinaryMessageEncoder;
446                         if (benc != null)
447                                 benc.CurrentReaderSession = session;
448                         // FIXME: supply maxSizeOfHeaders.
449                         Message msg = Encoder.ReadMessage (ms, 0x10000);
450                         if (benc != null)
451                                 benc.CurrentReaderSession = null;
452                         s.Flush ();
453
454                         return msg;
455                 }
456
457                 public void WriteSizedMessage (Message message)
458                 {
459                         // FIXME: implement full [MC-NMF] protocol.
460
461                         if (EncodingRecord != 8)
462                                 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
463
464                         s.WriteByte (SizedEnvelopeRecord);
465
466                         MemoryStream ms = new MemoryStream ();
467                         var session = new MyXmlBinaryWriterSession ();
468                         var benc = Encoder as BinaryMessageEncoder;
469                         try {
470                                 if (benc != null)
471                                         benc.CurrentWriterSession = session;
472                                 Encoder.WriteMessage (message, ms);
473                         } finally {
474                                 benc.CurrentWriterSession = null;
475                         }
476
477                         // dictionary
478                         MemoryStream msd = new MemoryStream ();
479                         BinaryWriter dw = new BinaryWriter (msd);
480                         foreach (var ds in session.List)
481                                 dw.Write (ds.Value);
482                         dw.Flush ();
483                         writer.WriteVariableInt ((int) (msd.Position + ms.Position));
484                         WriteSizedChunk (msd.ToArray ());
485                         // message body
486                         var arr = ms.GetBuffer ();
487                         writer.Write (arr, 0, (int) ms.Position);
488
489                         writer.Write (EndRecord);
490                         writer.Flush ();
491                 }
492
493                 public void ProcessEndRecordRecipient ()
494                 {
495                         int b;
496                         if ((b = s.ReadByte ()) != EndRecord)
497                                 throw new ProtocolException (String.Format ("EndRequest message was expected, got {0:X}", b));
498                 }
499         }
500 }