2 // TcpDuplexSessionChannel.cs
5 // Marcos Cobena (marcoscobena@gmail.com)
6 // Atsushi Enomoto <atsushi@ximian.com>
8 // Copyright 2007 Marcos Cobena (http://www.youcannoteatbits.org/)
10 // Copyright (C) 2009 Novell, Inc (http://www.novell.com)
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 using System.Collections.Generic;
36 using System.Net.Sockets;
37 using System.Runtime.Serialization;
38 using System.Runtime.Serialization.Formatters.Binary;
39 using System.ServiceModel.Channels;
41 using System.Threading;
44 namespace System.ServiceModel.Channels
46 internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
48 class TcpDuplexSession : DuplexSessionBase
50 TcpDuplexSessionChannel owner;
52 internal TcpDuplexSession (TcpDuplexSessionChannel owner)
57 public override TimeSpan DefaultCloseTimeout {
58 get { return owner.DefaultCloseTimeout; }
61 public override void Close (TimeSpan timeout)
63 owner.DiscardSession ();
70 EndpointAddress local_address;
71 TcpBinaryFrameManager frame;
72 TcpDuplexSession session; // do not use this directly. Use Session instead.
74 public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
75 : base (factory, address, via)
77 is_service_side = false;
81 public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpClient client)
84 is_service_side = true;
89 public MessageEncoder Encoder {
90 get { return info.MessageEncoder; }
93 public override EndpointAddress LocalAddress {
94 get { return local_address; }
97 public IDuplexSession Session {
100 session = new TcpDuplexSession (this);
105 void DiscardSession ()
107 frame.ProcessEndRecordInitiator ();
111 public override void Send (Message message)
113 Send (message, DefaultSendTimeout);
116 public override void Send (Message message, TimeSpan timeout)
118 if (timeout <= TimeSpan.Zero)
119 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
121 if (!is_service_side) {
122 if (message.Headers.To == null)
123 message.Headers.To = RemoteAddress.Uri;
124 if (message.Headers.ReplyTo == null)
125 message.Headers.ReplyTo = new EndpointAddress (Constants.WsaAnonymousUri);
127 if (message.Headers.RelatesTo == null)
128 message.Headers.RelatesTo = OperationContext.Current.IncomingMessageHeaders.MessageId;
131 client.SendTimeout = (int) timeout.TotalMilliseconds;
132 frame.WriteSizedMessage (message);
133 // FIXME: should EndRecord be sent here?
134 //if (is_service_side && client.Available > 0)
135 // frame.ProcessEndRecordRecipient ();
138 public override Message Receive ()
140 return Receive (DefaultReceiveTimeout);
143 public override Message Receive (TimeSpan timeout)
145 if (timeout <= TimeSpan.Zero)
146 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
147 client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
148 return frame.ReadSizedMessage ();
151 public override bool TryReceive (TimeSpan timeout, out Message message)
154 DateTime start = DateTime.Now;
155 message = Receive (timeout);
158 // received EndRecord, so close the session and return false instead.
159 // (Closing channel here might not be a good idea, but right now I have no better way.)
160 Close (timeout - (DateTime.Now - start));
162 } catch (TimeoutException) {
168 public override bool WaitForMessage (TimeSpan timeout)
170 if (client.Available > 0)
173 DateTime start = DateTime.Now;
176 if (client.Available > 0)
178 } while (DateTime.Now - start < timeout);
182 // CommunicationObject
185 protected override void OnAbort ()
187 Close (TimeSpan.FromTicks (1));
191 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
192 AsyncCallback callback, object state)
194 throw new NotImplementedException ();
198 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
199 AsyncCallback callback, object state)
201 throw new NotImplementedException ();
205 protected override void OnClose (TimeSpan timeout)
207 if (!is_service_side)
209 session.Close (timeout);
216 protected override void OnEndClose (IAsyncResult result)
218 throw new NotImplementedException ();
222 protected override void OnEndOpen (IAsyncResult result)
224 throw new NotImplementedException ();
228 protected override void OnOpen (TimeSpan timeout)
230 if (! is_service_side) {
231 int explicitPort = RemoteAddress.Uri.Port;
232 client = new TcpClient (RemoteAddress.Uri.Host, explicitPort <= 0 ? TcpTransportBindingElement.DefaultPort : explicitPort);
233 //RemoteAddress.Uri.Port);
235 NetworkStream ns = client.GetStream ();
236 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns, is_service_side) {
237 Encoder = this.Encoder,
238 Via = RemoteAddress.Uri };
239 frame.ProcessPreambleInitiator ();
240 frame.ProcessPreambleAckInitiator ();
243 Stream s = client.GetStream ();
245 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s, is_service_side) { Encoder = this.Encoder };
247 // FIXME: use retrieved record properties in the request processing.
249 frame.ProcessPreambleRecipient ();
250 frame.ProcessPreambleAckRecipient ();
254 class MyBinaryWriter : BinaryWriter
256 public MyBinaryWriter (Stream s)
261 public void WriteBytes (byte [] bytes)
263 Write7BitEncodedInt (bytes.Length);
269 // seealso: [MC-NMF] Windows Protocol document.
270 class TcpBinaryFrameManager
272 class MyBinaryReader : BinaryReader
274 public MyBinaryReader (Stream s)
279 public int ReadVariableInt ()
281 return Read7BitEncodedInt ();
285 class MyBinaryWriter : BinaryWriter
287 public MyBinaryWriter (Stream s)
292 public void WriteVariableInt (int value)
294 Write7BitEncodedInt (value);
297 public int GetSizeOfLength (int value)
303 } while (value != 0);
308 class MyXmlBinaryWriterSession : XmlBinaryWriterSession
310 public override bool TryAdd (XmlDictionaryString value, out int key)
312 if (!base.TryAdd (value, out key))
318 public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
321 public const byte VersionRecord = 0;
322 public const byte ModeRecord = 1;
323 public const byte ViaRecord = 2;
324 public const byte KnownEncodingRecord = 3;
325 public const byte ExtendingEncodingRecord = 4;
326 public const byte UnsizedEnvelopeRecord = 5;
327 public const byte SizedEnvelopeRecord = 6;
328 public const byte EndRecord = 7;
329 public const byte FaultRecord = 8;
330 public const byte UpgradeRequestRecord = 9;
331 public const byte UpgradeResponseRecord = 0xA;
332 public const byte PreambleAckRecord = 0xB;
333 public const byte PreambleEndRecord = 0xC;
335 public const byte SingletonUnsizedMode = 1;
336 public const byte DuplexMode = 2;
337 public const byte SimplexMode = 3;
338 public const byte SingletonSizedMode = 4;
339 MyBinaryReader reader;
340 MyBinaryWriter writer;
342 public TcpBinaryFrameManager (int mode, Stream s, bool isServiceSide)
346 this.is_service_side = isServiceSide;
347 reader = new MyBinaryReader (s);
350 EncodingRecord = 8; // FIXME: it should depend on mode.
355 bool is_service_side;
359 public byte EncodingRecord { get; set; }
361 public Uri Via { get; set; }
363 public MessageEncoder Encoder { get; set; }
365 void ResetWriteBuffer ()
367 this.buffer = new MemoryStream ();
368 writer = new MyBinaryWriter (buffer);
371 public byte [] ReadSizedChunk ()
373 int length = reader.ReadVariableInt ();
376 throw new InvalidOperationException ("The message is too large.");
378 byte [] buffer = new byte [length];
379 for (int readSize = 0; readSize < length; )
380 readSize += reader.Read (buffer, readSize, length - readSize);
384 public void WriteSizedChunk (byte [] data)
386 writer.WriteVariableInt (data.Length);
387 writer.Write (data, 0, data.Length);
390 public void ProcessPreambleInitiator ()
394 buffer.WriteByte (VersionRecord);
395 buffer.WriteByte (1);
396 buffer.WriteByte (0);
397 buffer.WriteByte (ModeRecord);
398 buffer.WriteByte ((byte) mode);
399 buffer.WriteByte (ViaRecord);
400 writer.Write (Via.ToString ());
401 buffer.WriteByte (KnownEncodingRecord); // FIXME
402 buffer.WriteByte ((byte) EncodingRecord);
403 buffer.WriteByte (PreambleEndRecord);
405 s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
409 public void ProcessPreambleAckInitiator ()
411 int b = s.ReadByte ();
413 case PreambleAckRecord:
416 throw new FaultException (reader.ReadString ());
418 throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
422 public void ProcessPreambleAckRecipient ()
424 s.WriteByte (PreambleAckRecord);
427 public void ProcessPreambleRecipient ()
429 bool preambleEnd = false;
430 while (!preambleEnd) {
431 int b = s.ReadByte ();
434 if (s.ReadByte () != 1)
435 throw new ProtocolException ("Major version must be 1");
436 if (s.ReadByte () != 0)
437 throw new ProtocolException ("Minor version must be 0");
440 if (s.ReadByte () != mode)
441 throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
444 Via = new Uri (reader.ReadString ());
446 case KnownEncodingRecord:
447 EncodingRecord = (byte) s.ReadByte ();
449 case ExtendingEncodingRecord:
450 throw new NotImplementedException ("ExtendingEncodingRecord");
451 case UpgradeRequestRecord:
452 throw new NotImplementedException ("UpgradeRequetRecord");
453 case UpgradeResponseRecord:
454 throw new NotImplementedException ("UpgradeResponseRecord");
455 case PreambleEndRecord:
459 throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
464 XmlBinaryReaderSession reader_session;
465 int reader_session_items;
467 public Message ReadSizedMessage ()
469 // FIXME: implement full [MC-NMF].
471 var packetType = s.ReadByte ();
472 if (packetType == EndRecord)
474 if (packetType != SizedEnvelopeRecord)
475 throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
477 byte [] buffer = ReadSizedChunk ();
479 var ms = new MemoryStream (buffer, 0, buffer.Length);
481 // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
482 if (EncodingRecord != 8)
483 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
486 // the returned buffer consists of a serialized reader
487 // session and the binary xml body.
489 var session = reader_session ?? new XmlBinaryReaderSession ();
490 reader_session = session;
491 byte [] rsbuf = new TcpBinaryFrameManager (0, ms, is_service_side).ReadSizedChunk ();
492 using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
493 var rbr = new BinaryReader (rms, Encoding.UTF8);
494 while (rms.Position < rms.Length)
495 session.Add (reader_session_items++, rbr.ReadString ());
497 var benc = Encoder as BinaryMessageEncoder;
499 benc.CurrentReaderSession = session;
501 // FIXME: supply maxSizeOfHeaders.
502 Message msg = Encoder.ReadMessage (ms, 0x10000);
504 benc.CurrentReaderSession = null;
506 // if (!is_service_side)
507 // if (s.Read (eof_buffer, 0, 1) == 1)
508 // if (eof_buffer [0] != EndRecord)
509 // throw new ProtocolException (String.Format ("Expected EndRecord message, got {0:X02}", eof_buffer [0]));
515 byte [] eof_buffer = new byte [1];
516 MyXmlBinaryWriterSession writer_session;
518 public void WriteSizedMessage (Message message)
522 // FIXME: implement full [MC-NMF] protocol.
524 if (EncodingRecord != 8)
525 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
527 buffer.WriteByte (SizedEnvelopeRecord);
529 MemoryStream ms = new MemoryStream ();
530 var session = writer_session ?? new MyXmlBinaryWriterSession ();
531 writer_session = session;
532 int writer_session_count = session.List.Count;
533 var benc = Encoder as BinaryMessageEncoder;
536 benc.CurrentWriterSession = session;
537 Encoder.WriteMessage (message, ms);
539 benc.CurrentWriterSession = null;
543 MemoryStream msd = new MemoryStream ();
544 BinaryWriter dw = new BinaryWriter (msd);
545 for (int i = writer_session_count; i < session.List.Count; i++)
546 dw.Write (session.List [i].Value);
549 int length = (int) (msd.Position + ms.Position);
550 var msda = msd.ToArray ();
551 int sizeOfLength = writer.GetSizeOfLength (msda.Length);
553 writer.WriteVariableInt (length + sizeOfLength); // dictionary array also involves the size of itself.
554 WriteSizedChunk (msda);
556 var arr = ms.GetBuffer ();
557 writer.Write (arr, 0, (int) ms.Position);
561 s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
565 public void ProcessEndRecordInitiator ()
567 s.WriteByte (EndRecord); // it is required
571 public void ProcessEndRecordRecipient ()
574 if ((b = s.ReadByte ()) != EndRecord)
575 throw new ProtocolException (String.Format ("EndRecord message was expected, got {0:X}", b));