2009-05-21 Atsushi Enomoto <atsushi@ximian.com>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Channels / TcpDuplexSessionChannel.cs
1 // 
2 // TcpDuplexSessionChannel.cs
3 // 
4 // Author: 
5 //      Marcos Cobena (marcoscobena@gmail.com)
6 //      Atsushi Enomoto  <atsushi@ximian.com>
7 // 
8 // Copyright 2007 Marcos Cobena (http://www.youcannoteatbits.org/)
9 //
10 // Copyright (C) 2009 Novell, Inc (http://www.novell.com)
11 //
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
19 // 
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
22 // 
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 //
31
32 using System;
33 using System.Collections.Generic;
34 using System.IO;
35 using System.Net;
36 using System.Net.Sockets;
37 using System.Runtime.Serialization;
38 using System.Runtime.Serialization.Formatters.Binary;
39 using System.ServiceModel.Channels;
40 using System.Text;
41 using System.Xml;
42
43 namespace System.ServiceModel.Channels
44 {
45         internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
46         {
47                 class TcpDuplexSession : DuplexSessionBase
48                 {
49                         TcpDuplexSessionChannel owner;
50
51                         internal TcpDuplexSession (TcpDuplexSessionChannel owner)
52                         {
53                                 this.owner = owner;
54                         }
55
56                         public override TimeSpan DefaultCloseTimeout {
57                                 get { return owner.DefaultCloseTimeout; }
58                         }
59
60                         public override void Close (TimeSpan timeout)
61                         {
62                                 // FIXME: what to do here?
63                                 throw new NotImplementedException ();
64                         }
65                 }
66
67                 TcpChannelInfo info;
68                 TcpClient client;
69                 bool is_service_side;
70                 EndpointAddress local_address;
71                 TcpListener tcp_listener;
72                 TimeSpan timeout;
73                 TcpBinaryFrameManager frame;
74                 TcpDuplexSession session;
75                 
76                 public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
77                         : base (factory, address, via)
78                 {
79                         is_service_side = false;
80                         this.info = info;
81                         session = new TcpDuplexSession (this);
82                 }
83                 
84                 public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpListener tcpListener, TimeSpan timeout)
85                         : base (listener)
86                 {
87                         is_service_side = true;
88                         tcp_listener = tcpListener;
89                         this.info = info;
90                         session = new TcpDuplexSession (this);
91                         this.timeout = timeout;
92                 }
93                 
94                 public MessageEncoder Encoder {
95                         get { return info.MessageEncoder; }
96                 }
97
98                 public override EndpointAddress LocalAddress {
99                         get { return local_address; }
100                 }
101                 
102                 public IDuplexSession Session {
103                         get { return session; }
104                 }
105
106                 public override void Send (Message message)
107                 {
108                         Send (message, DefaultSendTimeout);
109                 }
110                 
111                 public override void Send (Message message, TimeSpan timeout)
112                 {
113                         // FIXME: add MessageID and ReplyTo (might not be here; it's likely in session channel in common)
114
115                         if (!is_service_side && message.Headers.To == null)
116                                 message.Headers.To = RemoteAddress.Uri;
117                         client.SendTimeout = (int) timeout.TotalMilliseconds;
118                         frame.WriteSizedMessage (message);
119                         // FIXME: should EndRecord be sent here?
120                         //if (is_service_side && client.Available > 0)
121                         //      frame.ProcessEndRecordRecipient ();
122                 }
123                 
124                 [MonoTODO]
125                 public override IAsyncResult BeginTryReceive (TimeSpan timeout, AsyncCallback callback, object state)
126                 {
127                         throw new NotImplementedException ();
128                 }
129                 
130                 [MonoTODO]
131                 public override bool EndTryReceive (IAsyncResult result, out Message message)
132                 {
133                         throw new NotImplementedException ();
134                 }
135                 
136                 public override Message Receive ()
137                 {
138                         return Receive (DefaultReceiveTimeout);
139                 }
140                 
141                 public override Message Receive (TimeSpan timeout)
142                 {
143                         client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
144                         Stream s = client.GetStream ();
145
146                         return frame.ReadSizedMessage ();
147                 }
148                 
149                 public override bool TryReceive (TimeSpan timeout, out Message message)
150                 {
151                         try {
152                                 message = Receive (timeout);
153                                 return true;
154                         } catch (TimeoutException) {
155                                 message = null;
156                                 return false;
157                         }
158                 }
159                 
160                 public override bool WaitForMessage (TimeSpan timeout)
161                 {
162                         // FIXME: use timeout
163                         try {
164                                 client = tcp_listener.AcceptTcpClient ();
165                                 Stream s = client.GetStream ();
166
167                                 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s) { Encoder = this.Encoder };
168                                 frame.ProcessPreambleRecipient ();
169
170                                 // FIXME: use retrieved record properties in the request processing.
171
172                                 return true;
173                         } catch (TimeoutException) {
174                                 return false;
175                         }
176                 }
177                 
178                 // CommunicationObject
179                 
180                 [MonoTODO]
181                 protected override void OnAbort ()
182                 {
183                         throw new NotImplementedException ();
184                 }
185
186                 [MonoTODO]
187                 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
188                         AsyncCallback callback, object state)
189                 {
190                         throw new NotImplementedException ();
191                 }
192
193                 [MonoTODO]
194                 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
195                         AsyncCallback callback, object state)
196                 {
197                         throw new NotImplementedException ();
198                 }
199
200                 [MonoTODO]
201                 protected override void OnClose (TimeSpan timeout)
202                 {
203                         client.Close ();
204                 }
205                 
206                 [MonoTODO]
207                 protected override void OnEndClose (IAsyncResult result)
208                 {
209                         throw new NotImplementedException ();
210                 }
211
212                 [MonoTODO]
213                 protected override void OnEndOpen (IAsyncResult result)
214                 {
215                         throw new NotImplementedException ();
216                 }
217                 
218                 [MonoTODO]
219                 protected override void OnOpen (TimeSpan timeout)
220                 {
221                         if (! is_service_side) {
222                                 int explicitPort = RemoteAddress.Uri.Port;
223                                 client = new TcpClient (RemoteAddress.Uri.Host, explicitPort <= 0 ? TcpTransportBindingElement.DefaultPort : explicitPort);
224                                                         //RemoteAddress.Uri.Port);
225                                 
226                                 NetworkStream ns = client.GetStream ();
227                                 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns) {
228                                         Encoder = this.Encoder,
229                                         Via = RemoteAddress.Uri };
230                                 frame.ProcessPreambleInitiator ();
231                         } else {
232                                 // server side
233                         }
234                 }
235                 
236                 class MyBinaryWriter : BinaryWriter
237                 {
238                         public MyBinaryWriter (Stream s)
239                                 : base (s)
240                         {
241                         }
242                         
243                         public void WriteBytes (byte [] bytes)
244                         {
245                                 Write7BitEncodedInt (bytes.Length);
246                                 Write (bytes);
247                         }
248                 }
249         }
250
251         // seealso: [MC-NMF] Windows Protocol document.
252         class TcpBinaryFrameManager
253         {
254                 class MyBinaryReader : BinaryReader
255                 {
256                         public MyBinaryReader (Stream s)
257                                 : base (s)
258                         {
259                         }
260
261                         public int ReadVariableInt ()
262                         {
263                                 return Read7BitEncodedInt ();
264                         }
265                 }
266
267                 class MyBinaryWriter : BinaryWriter
268                 {
269                         public MyBinaryWriter (Stream s)
270                                 : base (s)
271                         {
272                         }
273
274                         public void WriteVariableInt (int value)
275                         {
276                                 Write7BitEncodedInt (value);
277                         }
278                 }
279
280                 class MyXmlBinaryWriterSession : XmlBinaryWriterSession
281                 {
282                         public override bool TryAdd (XmlDictionaryString value, out int key)
283                         {
284                                 if (!base.TryAdd (value, out key))
285                                         return false;
286                                 List.Add (value);
287                                 return true;
288                         }
289
290                         public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
291                 }
292
293                 public const byte VersionRecord = 0;
294                 public const byte ModeRecord = 1;
295                 public const byte ViaRecord = 2;
296                 public const byte KnownEncodingRecord = 3;
297                 public const byte ExtendingEncodingRecord = 4;
298                 public const byte UnsizedEnvelopeRecord = 5;
299                 public const byte SizedEnvelopeRecord = 6;
300                 public const byte EndRecord = 7;
301                 public const byte FaultRecord = 8;
302                 public const byte UpgradeRequestRecord = 9;
303                 public const byte UpgradeResponseRecord = 0xA;
304                 public const byte PreambleAckRecord = 0xB;
305                 public const byte PreambleEndRecord = 0xC;
306
307                 public const byte SingletonUnsizedMode = 1;
308                 public const byte DuplexMode = 2;
309                 public const byte SimplexMode = 3;
310                 public const byte SingletonSizedMode = 4;
311                 MyBinaryReader reader;
312                 MyBinaryWriter writer;
313
314                 public TcpBinaryFrameManager (int mode, Stream s)
315                 {
316                         this.mode = mode;
317                         this.s = s;
318                         reader = new MyBinaryReader (s);
319                         writer = new MyBinaryWriter (s);
320
321                         EncodingRecord = 8; // FIXME: it should depend on mode.
322                 }
323
324                 Stream s;
325
326                 int mode;
327
328                 public byte EncodingRecord { get; set; }
329
330                 public Uri Via { get; set; }
331
332                 public MessageEncoder Encoder { get; set; }
333
334                 public byte [] ReadSizedChunk ()
335                 {
336                         int length = reader.ReadVariableInt ();
337                         
338                         if (length > 65536)
339                                 throw new InvalidOperationException ("The message is too large.");
340
341                         byte [] buffer = new byte [length];
342                         reader.Read (buffer, 0, length);
343                         
344                         return buffer;
345                 }
346
347                 public void WriteSizedChunk (byte [] data)
348                 {
349                         writer.WriteVariableInt (data.Length);
350                         writer.Write (data, 0, data.Length);
351                 }
352
353                 public void ProcessPreambleInitiator ()
354                 {
355                         s.WriteByte (VersionRecord);
356                         s.WriteByte (1);
357                         s.WriteByte (0);
358                         s.WriteByte (ModeRecord);
359                         s.WriteByte ((byte) mode);
360                         s.WriteByte (ViaRecord);
361                         writer.Write (Via.ToString ());
362                         s.WriteByte (KnownEncodingRecord); // FIXME
363                         s.WriteByte ((byte) EncodingRecord);
364                         s.WriteByte (PreambleEndRecord);
365                         s.Flush ();
366
367                         int b = s.ReadByte ();
368                         switch (b) {
369                         case PreambleAckRecord:
370                                 return; // success
371                         case FaultRecord:
372                                 throw new FaultException (reader.ReadString ());
373                         default:
374                                 throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
375                         }
376                 }
377
378                 public void ProcessPreambleRecipient ()
379                 {
380                         bool preambleEnd = false;
381                         while (!preambleEnd) {
382                                 int b = s.ReadByte ();
383                                 switch (b) {
384                                 case VersionRecord:
385                                         if (s.ReadByte () != 1)
386                                                 throw new ProtocolException ("Major version must be 1");
387                                         if (s.ReadByte () != 0)
388                                                 throw new ProtocolException ("Minor version must be 0");
389                                         break;
390                                 case ModeRecord:
391                                         if (s.ReadByte () != mode)
392                                                 throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
393                                         break;
394                                 case ViaRecord:
395                                         Via = new Uri (reader.ReadString ());
396                                         break;
397                                 case KnownEncodingRecord:
398                                         EncodingRecord = (byte) s.ReadByte ();
399                                         break;
400                                 case ExtendingEncodingRecord:
401                                         throw new NotImplementedException ();
402                                 case UpgradeRequestRecord:
403                                         throw new NotImplementedException ();
404                                 case UpgradeResponseRecord:
405                                         throw new NotImplementedException ();
406                                 case PreambleEndRecord:
407                                         preambleEnd = true;
408                                         break;
409                                 default:
410                                         throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
411                                 }
412                         }
413                         s.WriteByte (PreambleAckRecord);
414                 }
415
416                 public Message ReadSizedMessage ()
417                 {
418                         // FIXME: implement [MC-NMF] correctly. Currently it is a guessed protocol hack.
419
420                         var packetType = s.ReadByte ();
421                         if (packetType != SizedEnvelopeRecord)
422                                 throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
423
424                         byte [] buffer = ReadSizedChunk ();
425
426                         var ms = new MemoryStream (buffer, 0, buffer.Length);
427
428                         // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
429                         if (EncodingRecord != 8)
430                                 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
431
432                         // Encoding type 8:
433                         // the returned buffer consists of a serialized reader 
434                         // session and the binary xml body. 
435
436                         var session = new XmlBinaryReaderSession ();
437                         byte [] rsbuf = new TcpBinaryFrameManager (0, ms).ReadSizedChunk ();
438                         int count = 0;
439                         using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
440                                 var rbr = new BinaryReader (rms, Encoding.UTF8);
441                                 while (rms.Position < rms.Length)
442                                         session.Add (count++, rbr.ReadString ());
443                         }
444                         var benc = Encoder as BinaryMessageEncoder;
445                         if (benc != null)
446                                 benc.CurrentReaderSession = session;
447                         // FIXME: supply maxSizeOfHeaders.
448                         Message msg = Encoder.ReadMessage (ms, 0x10000);
449                         if (benc != null)
450                                 benc.CurrentReaderSession = null;
451                         s.Flush ();
452
453                         return msg;
454                 }
455
456                 public void WriteSizedMessage (Message message)
457                 {
458                         // FIXME: implement full [MC-NMF] protocol.
459
460                         if (EncodingRecord != 8)
461                                 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
462
463                         s.WriteByte (SizedEnvelopeRecord);
464
465                         MemoryStream ms = new MemoryStream ();
466                         var session = new MyXmlBinaryWriterSession ();
467                         var benc = Encoder as BinaryMessageEncoder;
468                         try {
469                                 if (benc != null)
470                                         benc.CurrentWriterSession = session;
471                                 Encoder.WriteMessage (message, ms);
472                         } finally {
473                                 benc.CurrentWriterSession = null;
474                         }
475
476                         // dictionary
477                         MemoryStream msd = new MemoryStream ();
478                         BinaryWriter dw = new BinaryWriter (msd);
479                         foreach (var ds in session.List)
480                                 dw.Write (ds.Value);
481                         dw.Flush ();
482                         writer.WriteVariableInt ((int) (msd.Position + ms.Position));
483                         WriteSizedChunk (msd.ToArray ());
484                         // message body
485                         var arr = ms.GetBuffer ();
486                         writer.Write (arr, 0, (int) ms.Position);
487
488                         writer.Write (EndRecord);
489                         writer.Flush ();
490                 }
491
492                 public void ProcessEndRecordRecipient ()
493                 {
494                         int b;
495                         if ((b = s.ReadByte ()) != EndRecord)
496                                 throw new ProtocolException (String.Format ("EndRequest message was expected, got {0:X}", b));
497                 }
498         }
499 }