2 // TcpDuplexSessionChannel.cs
5 // Marcos Cobena (marcoscobena@gmail.com)
6 // Atsushi Enomoto <atsushi@ximian.com>
8 // Copyright 2007 Marcos Cobena (http://www.youcannoteatbits.org/)
10 // Copyright (C) 2009 Novell, Inc (http://www.novell.com)
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 using System.Collections.Generic;
36 using System.Net.Sockets;
37 using System.Runtime.Serialization;
38 using System.Runtime.Serialization.Formatters.Binary;
39 using System.ServiceModel.Channels;
41 using System.Threading;
44 namespace System.ServiceModel.Channels
46 internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
48 class TcpDuplexSession : DuplexSessionBase
50 TcpDuplexSessionChannel owner;
52 internal TcpDuplexSession (TcpDuplexSessionChannel owner)
57 public override TimeSpan DefaultCloseTimeout {
58 get { return owner.DefaultCloseTimeout; }
61 public override void Close (TimeSpan timeout)
63 // FIXME: what to do here?
64 throw new NotImplementedException ();
71 EndpointAddress local_address;
72 TcpListener tcp_listener;
74 TcpBinaryFrameManager frame;
75 TcpDuplexSession session;
77 public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
78 : base (factory, address, via)
80 is_service_side = false;
82 session = new TcpDuplexSession (this);
85 public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpListener tcpListener, TimeSpan timeout)
88 is_service_side = true;
89 tcp_listener = tcpListener;
91 session = new TcpDuplexSession (this);
92 this.timeout = timeout;
95 public MessageEncoder Encoder {
96 get { return info.MessageEncoder; }
99 public override EndpointAddress LocalAddress {
100 get { return local_address; }
103 public IDuplexSession Session {
104 get { return session; }
107 public override void Send (Message message)
109 Send (message, DefaultSendTimeout);
112 public override void Send (Message message, TimeSpan timeout)
114 if (timeout <= TimeSpan.Zero)
115 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
117 if (!is_service_side) {
118 if (message.Headers.To == null)
119 message.Headers.To = RemoteAddress.Uri;
120 if (message.Headers.ReplyTo == null)
121 message.Headers.ReplyTo = new EndpointAddress (Constants.WsaAnonymousUri);
123 if (message.Headers.RelatesTo == null)
124 message.Headers.RelatesTo = OperationContext.Current.IncomingMessageHeaders.MessageId;
127 client.SendTimeout = (int) timeout.TotalMilliseconds;
128 frame.WriteSizedMessage (message);
129 // FIXME: should EndRecord be sent here?
130 //if (is_service_side && client.Available > 0)
131 // frame.ProcessEndRecordRecipient ();
135 public override IAsyncResult BeginTryReceive (TimeSpan timeout, AsyncCallback callback, object state)
137 throw new NotImplementedException ();
141 public override bool EndTryReceive (IAsyncResult result, out Message message)
143 throw new NotImplementedException ();
146 public override Message Receive ()
148 return Receive (DefaultReceiveTimeout);
151 public override Message Receive (TimeSpan timeout)
153 if (timeout <= TimeSpan.Zero)
154 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
155 client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
156 return frame.ReadSizedMessage ();
159 public override bool TryReceive (TimeSpan timeout, out Message message)
162 message = Receive (timeout);
164 } catch (TimeoutException) {
170 public override bool WaitForMessage (TimeSpan timeout)
172 if (client.Available > 0)
175 DateTime start = DateTime.Now;
178 if (client.Available > 0)
180 } while (DateTime.Now - start < timeout);
184 // CommunicationObject
187 protected override void OnAbort ()
194 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
195 AsyncCallback callback, object state)
197 throw new NotImplementedException ();
201 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
202 AsyncCallback callback, object state)
204 throw new NotImplementedException ();
208 protected override void OnClose (TimeSpan timeout)
215 protected override void OnEndClose (IAsyncResult result)
217 throw new NotImplementedException ();
221 protected override void OnEndOpen (IAsyncResult result)
223 throw new NotImplementedException ();
227 protected override void OnOpen (TimeSpan timeout)
229 if (! is_service_side) {
230 int explicitPort = RemoteAddress.Uri.Port;
231 client = new TcpClient (RemoteAddress.Uri.Host, explicitPort <= 0 ? TcpTransportBindingElement.DefaultPort : explicitPort);
232 //RemoteAddress.Uri.Port);
234 NetworkStream ns = client.GetStream ();
235 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns, is_service_side) {
236 Encoder = this.Encoder,
237 Via = RemoteAddress.Uri };
240 client = tcp_listener.AcceptTcpClient ();
241 Stream s = client.GetStream ();
243 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s, is_service_side) { Encoder = this.Encoder };
245 // FIXME: use retrieved record properties in the request processing.
250 class MyBinaryWriter : BinaryWriter
252 public MyBinaryWriter (Stream s)
257 public void WriteBytes (byte [] bytes)
259 Write7BitEncodedInt (bytes.Length);
265 // seealso: [MC-NMF] Windows Protocol document.
266 class TcpBinaryFrameManager
268 class MyBinaryReader : BinaryReader
270 public MyBinaryReader (Stream s)
275 public int ReadVariableInt ()
277 return Read7BitEncodedInt ();
281 class MyBinaryWriter : BinaryWriter
283 public MyBinaryWriter (Stream s)
288 public void WriteVariableInt (int value)
290 Write7BitEncodedInt (value);
293 public int GetSizeOfLength (int value)
299 } while (value != 0);
304 class MyXmlBinaryWriterSession : XmlBinaryWriterSession
306 public override bool TryAdd (XmlDictionaryString value, out int key)
308 if (!base.TryAdd (value, out key))
314 public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
317 public const byte VersionRecord = 0;
318 public const byte ModeRecord = 1;
319 public const byte ViaRecord = 2;
320 public const byte KnownEncodingRecord = 3;
321 public const byte ExtendingEncodingRecord = 4;
322 public const byte UnsizedEnvelopeRecord = 5;
323 public const byte SizedEnvelopeRecord = 6;
324 public const byte EndRecord = 7;
325 public const byte FaultRecord = 8;
326 public const byte UpgradeRequestRecord = 9;
327 public const byte UpgradeResponseRecord = 0xA;
328 public const byte PreambleAckRecord = 0xB;
329 public const byte PreambleEndRecord = 0xC;
331 public const byte SingletonUnsizedMode = 1;
332 public const byte DuplexMode = 2;
333 public const byte SimplexMode = 3;
334 public const byte SingletonSizedMode = 4;
335 MyBinaryReader reader;
336 MyBinaryWriter writer;
338 public TcpBinaryFrameManager (int mode, Stream s, bool isServiceSide)
342 this.is_service_side = isServiceSide;
343 reader = new MyBinaryReader (s);
346 EncodingRecord = 8; // FIXME: it should depend on mode.
351 bool is_service_side;
355 public byte EncodingRecord { get; set; }
357 public Uri Via { get; set; }
359 public MessageEncoder Encoder { get; set; }
361 void ResetWriteBuffer ()
363 this.buffer = new MemoryStream ();
364 writer = new MyBinaryWriter (buffer);
367 public byte [] ReadSizedChunk ()
369 int length = reader.ReadVariableInt ();
372 throw new InvalidOperationException ("The message is too large.");
374 byte [] buffer = new byte [length];
375 for (int readSize = 0; readSize < length; )
376 readSize += reader.Read (buffer, readSize, length - readSize);
380 public void WriteSizedChunk (byte [] data)
382 writer.WriteVariableInt (data.Length);
383 writer.Write (data, 0, data.Length);
386 void ProcessPreambleInitiator ()
388 buffer.WriteByte (VersionRecord);
389 buffer.WriteByte (1);
390 buffer.WriteByte (0);
391 buffer.WriteByte (ModeRecord);
392 buffer.WriteByte ((byte) mode);
393 buffer.WriteByte (ViaRecord);
394 writer.Write (Via.ToString ());
395 buffer.WriteByte (KnownEncodingRecord); // FIXME
396 buffer.WriteByte ((byte) EncodingRecord);
397 buffer.WriteByte (PreambleEndRecord);
401 void ProcessPreambleAckInitiator ()
403 int b = s.ReadByte ();
405 case PreambleAckRecord:
408 throw new FaultException (reader.ReadString ());
410 throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
414 void ProcessPreambleAckRecipient ()
416 s.WriteByte (PreambleAckRecord);
419 void ProcessPreambleRecipient (bool allowEndRecord)
421 bool preambleEnd = false;
422 while (!preambleEnd) {
423 int b = s.ReadByte ();
426 if (s.ReadByte () != 1)
427 throw new ProtocolException ("Major version must be 1");
428 if (s.ReadByte () != 0)
429 throw new ProtocolException ("Minor version must be 0");
432 if (s.ReadByte () != mode)
433 throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
436 Via = new Uri (reader.ReadString ());
438 case KnownEncodingRecord:
439 EncodingRecord = (byte) s.ReadByte ();
441 case ExtendingEncodingRecord:
442 throw new NotImplementedException ();
443 case UpgradeRequestRecord:
444 throw new NotImplementedException ();
445 case UpgradeResponseRecord:
446 throw new NotImplementedException ();
447 case PreambleEndRecord:
455 throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
460 bool message_already_read_once;
462 public Message ReadSizedMessage ()
464 // FIXME: implement full [MC-NMF].
465 if (is_service_side) {
466 ProcessPreambleRecipient (message_already_read_once);
467 message_already_read_once = true;
468 ProcessPreambleAckRecipient ();
471 var packetType = s.ReadByte ();
472 if (packetType != SizedEnvelopeRecord)
473 throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
475 byte [] buffer = ReadSizedChunk ();
477 var ms = new MemoryStream (buffer, 0, buffer.Length);
479 // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
480 if (EncodingRecord != 8)
481 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
484 // the returned buffer consists of a serialized reader
485 // session and the binary xml body.
487 var session = new XmlBinaryReaderSession ();
488 byte [] rsbuf = new TcpBinaryFrameManager (0, ms, is_service_side).ReadSizedChunk ();
490 using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
491 var rbr = new BinaryReader (rms, Encoding.UTF8);
492 while (rms.Position < rms.Length)
493 session.Add (count++, rbr.ReadString ());
495 var benc = Encoder as BinaryMessageEncoder;
497 benc.CurrentReaderSession = session;
498 // FIXME: supply maxSizeOfHeaders.
499 Message msg = Encoder.ReadMessage (ms, 0x10000);
501 benc.CurrentReaderSession = null;
503 if (!is_service_side)
504 if (s.Read (eof_buffer, 0, 1) == 1)
505 if (eof_buffer [0] != EndRecord)
506 throw new ProtocolException (String.Format ("Expected EndRecord message, got {0:X02}", eof_buffer [0]));
511 byte [] eof_buffer = new byte [1];
513 public void WriteSizedMessage (Message message)
517 if (!is_service_side)
518 ProcessPreambleInitiator ();
520 // FIXME: implement full [MC-NMF] protocol.
522 if (EncodingRecord != 8)
523 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
525 buffer.WriteByte (SizedEnvelopeRecord);
527 MemoryStream ms = new MemoryStream ();
528 var session = new MyXmlBinaryWriterSession ();
529 var benc = Encoder as BinaryMessageEncoder;
532 benc.CurrentWriterSession = session;
533 Encoder.WriteMessage (message, ms);
535 benc.CurrentWriterSession = null;
539 MemoryStream msd = new MemoryStream ();
540 BinaryWriter dw = new BinaryWriter (msd);
541 foreach (var ds in session.List)
545 int length = (int) (msd.Position + ms.Position);
546 var msda = msd.ToArray ();
547 int sizeOfLength = writer.GetSizeOfLength (msda.Length);
549 writer.WriteVariableInt (length + sizeOfLength); // dictionary array also involves the size of itself.
550 WriteSizedChunk (msda);
552 var arr = ms.GetBuffer ();
553 writer.Write (arr, 0, (int) ms.Position);
557 s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
560 // It is processed at *this* late.
561 if (!is_service_side)
562 ProcessPreambleAckInitiator ();
564 s.WriteByte (EndRecord); // it is required
568 public void ProcessEndRecordRecipient ()
571 if ((b = s.ReadByte ()) != EndRecord)
572 throw new ProtocolException (String.Format ("EndRecord message was expected, got {0:X}", b));