2 // TcpDuplexSessionChannel.cs
5 // Marcos Cobena (marcoscobena@gmail.com)
6 // Atsushi Enomoto <atsushi@ximian.com>
8 // Copyright 2007 Marcos Cobena (http://www.youcannoteatbits.org/)
10 // Copyright (C) 2009 Novell, Inc (http://www.novell.com)
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 using System.Collections.Generic;
36 using System.Net.Sockets;
37 using System.Runtime.Serialization;
38 using System.Runtime.Serialization.Formatters.Binary;
39 using System.ServiceModel.Channels;
41 using System.Threading;
44 namespace System.ServiceModel.Channels
46 internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
48 class TcpDuplexSession : DuplexSessionBase
50 TcpDuplexSessionChannel owner;
52 internal TcpDuplexSession (TcpDuplexSessionChannel owner)
57 public override TimeSpan DefaultCloseTimeout {
58 get { return owner.DefaultCloseTimeout; }
61 public override void Close (TimeSpan timeout)
63 owner.DiscardSession ();
70 EndpointAddress local_address;
72 TcpBinaryFrameManager frame;
73 TcpDuplexSession session; // do not use this directly. Use Session instead.
75 public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
76 : base (factory, address, via)
78 is_service_side = false;
82 public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpClient client, TimeSpan timeout)
85 is_service_side = true;
88 this.timeout = timeout;
91 public MessageEncoder Encoder {
92 get { return info.MessageEncoder; }
95 public override EndpointAddress LocalAddress {
96 get { return local_address; }
99 public IDuplexSession Session {
102 session = new TcpDuplexSession (this);
107 void DiscardSession ()
109 frame.ProcessEndRecordInitiator ();
113 public override void Send (Message message)
115 Send (message, DefaultSendTimeout);
118 public override void Send (Message message, TimeSpan timeout)
120 if (timeout <= TimeSpan.Zero)
121 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
123 if (!is_service_side) {
124 if (message.Headers.To == null)
125 message.Headers.To = RemoteAddress.Uri;
126 if (message.Headers.ReplyTo == null)
127 message.Headers.ReplyTo = new EndpointAddress (Constants.WsaAnonymousUri);
129 if (message.Headers.RelatesTo == null)
130 message.Headers.RelatesTo = OperationContext.Current.IncomingMessageHeaders.MessageId;
133 client.SendTimeout = (int) timeout.TotalMilliseconds;
134 frame.WriteSizedMessage (message);
135 // FIXME: should EndRecord be sent here?
136 //if (is_service_side && client.Available > 0)
137 // frame.ProcessEndRecordRecipient ();
140 public override Message Receive ()
142 return Receive (DefaultReceiveTimeout);
145 public override Message Receive (TimeSpan timeout)
147 if (timeout <= TimeSpan.Zero)
148 throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
149 client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
150 return frame.ReadSizedMessage ();
153 public override bool TryReceive (TimeSpan timeout, out Message message)
156 DateTime start = DateTime.Now;
157 message = Receive (timeout);
160 // received EndRecord, so close the session and return false instead.
161 // (Closing channel here might not be a good idea, but right now I have no better way.)
162 Close (timeout - (DateTime.Now - start));
164 } catch (TimeoutException) {
170 public override bool WaitForMessage (TimeSpan timeout)
172 if (client.Available > 0)
175 DateTime start = DateTime.Now;
178 if (client.Available > 0)
180 } while (DateTime.Now - start < timeout);
184 // CommunicationObject
187 protected override void OnAbort ()
194 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
195 AsyncCallback callback, object state)
197 throw new NotImplementedException ();
201 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
202 AsyncCallback callback, object state)
204 throw new NotImplementedException ();
208 protected override void OnClose (TimeSpan timeout)
210 if (!is_service_side)
212 session.Close (timeout);
219 protected override void OnEndClose (IAsyncResult result)
221 throw new NotImplementedException ();
225 protected override void OnEndOpen (IAsyncResult result)
227 throw new NotImplementedException ();
231 protected override void OnOpen (TimeSpan timeout)
233 if (! is_service_side) {
234 int explicitPort = RemoteAddress.Uri.Port;
235 client = new TcpClient (RemoteAddress.Uri.Host, explicitPort <= 0 ? TcpTransportBindingElement.DefaultPort : explicitPort);
236 //RemoteAddress.Uri.Port);
238 NetworkStream ns = client.GetStream ();
239 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns, is_service_side) {
240 Encoder = this.Encoder,
241 Via = RemoteAddress.Uri };
242 frame.ProcessPreambleInitiator ();
243 frame.ProcessPreambleAckInitiator ();
246 Stream s = client.GetStream ();
248 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s, is_service_side) { Encoder = this.Encoder };
250 // FIXME: use retrieved record properties in the request processing.
252 frame.ProcessPreambleRecipient ();
253 frame.ProcessPreambleAckRecipient ();
257 class MyBinaryWriter : BinaryWriter
259 public MyBinaryWriter (Stream s)
264 public void WriteBytes (byte [] bytes)
266 Write7BitEncodedInt (bytes.Length);
272 // seealso: [MC-NMF] Windows Protocol document.
273 class TcpBinaryFrameManager
275 class MyBinaryReader : BinaryReader
277 public MyBinaryReader (Stream s)
282 public int ReadVariableInt ()
284 return Read7BitEncodedInt ();
288 class MyBinaryWriter : BinaryWriter
290 public MyBinaryWriter (Stream s)
295 public void WriteVariableInt (int value)
297 Write7BitEncodedInt (value);
300 public int GetSizeOfLength (int value)
306 } while (value != 0);
311 class MyXmlBinaryWriterSession : XmlBinaryWriterSession
313 public override bool TryAdd (XmlDictionaryString value, out int key)
315 if (!base.TryAdd (value, out key))
321 public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
324 public const byte VersionRecord = 0;
325 public const byte ModeRecord = 1;
326 public const byte ViaRecord = 2;
327 public const byte KnownEncodingRecord = 3;
328 public const byte ExtendingEncodingRecord = 4;
329 public const byte UnsizedEnvelopeRecord = 5;
330 public const byte SizedEnvelopeRecord = 6;
331 public const byte EndRecord = 7;
332 public const byte FaultRecord = 8;
333 public const byte UpgradeRequestRecord = 9;
334 public const byte UpgradeResponseRecord = 0xA;
335 public const byte PreambleAckRecord = 0xB;
336 public const byte PreambleEndRecord = 0xC;
338 public const byte SingletonUnsizedMode = 1;
339 public const byte DuplexMode = 2;
340 public const byte SimplexMode = 3;
341 public const byte SingletonSizedMode = 4;
342 MyBinaryReader reader;
343 MyBinaryWriter writer;
345 public TcpBinaryFrameManager (int mode, Stream s, bool isServiceSide)
349 this.is_service_side = isServiceSide;
350 reader = new MyBinaryReader (s);
353 EncodingRecord = 8; // FIXME: it should depend on mode.
358 bool is_service_side;
362 public byte EncodingRecord { get; set; }
364 public Uri Via { get; set; }
366 public MessageEncoder Encoder { get; set; }
368 void ResetWriteBuffer ()
370 this.buffer = new MemoryStream ();
371 writer = new MyBinaryWriter (buffer);
374 public byte [] ReadSizedChunk ()
376 int length = reader.ReadVariableInt ();
379 throw new InvalidOperationException ("The message is too large.");
381 byte [] buffer = new byte [length];
382 for (int readSize = 0; readSize < length; )
383 readSize += reader.Read (buffer, readSize, length - readSize);
387 public void WriteSizedChunk (byte [] data)
389 writer.WriteVariableInt (data.Length);
390 writer.Write (data, 0, data.Length);
393 public void ProcessPreambleInitiator ()
397 buffer.WriteByte (VersionRecord);
398 buffer.WriteByte (1);
399 buffer.WriteByte (0);
400 buffer.WriteByte (ModeRecord);
401 buffer.WriteByte ((byte) mode);
402 buffer.WriteByte (ViaRecord);
403 writer.Write (Via.ToString ());
404 buffer.WriteByte (KnownEncodingRecord); // FIXME
405 buffer.WriteByte ((byte) EncodingRecord);
406 buffer.WriteByte (PreambleEndRecord);
408 s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
412 public void ProcessPreambleAckInitiator ()
414 int b = s.ReadByte ();
416 case PreambleAckRecord:
419 throw new FaultException (reader.ReadString ());
421 throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
425 public void ProcessPreambleAckRecipient ()
427 s.WriteByte (PreambleAckRecord);
430 public void ProcessPreambleRecipient ()
432 bool preambleEnd = false;
433 while (!preambleEnd) {
434 int b = s.ReadByte ();
437 if (s.ReadByte () != 1)
438 throw new ProtocolException ("Major version must be 1");
439 if (s.ReadByte () != 0)
440 throw new ProtocolException ("Minor version must be 0");
443 if (s.ReadByte () != mode)
444 throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
447 Via = new Uri (reader.ReadString ());
449 case KnownEncodingRecord:
450 EncodingRecord = (byte) s.ReadByte ();
452 case ExtendingEncodingRecord:
453 throw new NotImplementedException ("ExtendingEncodingRecord");
454 case UpgradeRequestRecord:
455 throw new NotImplementedException ("UpgradeRequetRecord");
456 case UpgradeResponseRecord:
457 throw new NotImplementedException ("UpgradeResponseRecord");
458 case PreambleEndRecord:
462 throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
467 XmlBinaryReaderSession reader_session;
468 int reader_session_items;
470 public Message ReadSizedMessage ()
472 // FIXME: implement full [MC-NMF].
474 var packetType = s.ReadByte ();
475 if (packetType == EndRecord)
477 if (packetType != SizedEnvelopeRecord)
478 throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
480 byte [] buffer = ReadSizedChunk ();
482 var ms = new MemoryStream (buffer, 0, buffer.Length);
484 // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
485 if (EncodingRecord != 8)
486 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
489 // the returned buffer consists of a serialized reader
490 // session and the binary xml body.
492 var session = reader_session ?? new XmlBinaryReaderSession ();
493 reader_session = session;
494 byte [] rsbuf = new TcpBinaryFrameManager (0, ms, is_service_side).ReadSizedChunk ();
495 using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
496 var rbr = new BinaryReader (rms, Encoding.UTF8);
497 while (rms.Position < rms.Length)
498 session.Add (reader_session_items++, rbr.ReadString ());
500 var benc = Encoder as BinaryMessageEncoder;
502 benc.CurrentReaderSession = session;
504 // FIXME: supply maxSizeOfHeaders.
505 Message msg = Encoder.ReadMessage (ms, 0x10000);
507 benc.CurrentReaderSession = null;
509 // if (!is_service_side)
510 // if (s.Read (eof_buffer, 0, 1) == 1)
511 // if (eof_buffer [0] != EndRecord)
512 // throw new ProtocolException (String.Format ("Expected EndRecord message, got {0:X02}", eof_buffer [0]));
518 byte [] eof_buffer = new byte [1];
519 MyXmlBinaryWriterSession writer_session;
521 public void WriteSizedMessage (Message message)
525 // FIXME: implement full [MC-NMF] protocol.
527 if (EncodingRecord != 8)
528 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
530 buffer.WriteByte (SizedEnvelopeRecord);
532 MemoryStream ms = new MemoryStream ();
533 var session = writer_session ?? new MyXmlBinaryWriterSession ();
534 writer_session = session;
535 int writer_session_count = session.List.Count;
536 var benc = Encoder as BinaryMessageEncoder;
539 benc.CurrentWriterSession = session;
540 Encoder.WriteMessage (message, ms);
542 benc.CurrentWriterSession = null;
546 MemoryStream msd = new MemoryStream ();
547 BinaryWriter dw = new BinaryWriter (msd);
548 foreach (var ds in session.List)
552 int length = (int) (msd.Position + ms.Position);
553 var msda = msd.ToArray ();
554 int sizeOfLength = writer.GetSizeOfLength (msda.Length);
556 writer.WriteVariableInt (length + sizeOfLength); // dictionary array also involves the size of itself.
557 WriteSizedChunk (msda);
559 var arr = ms.GetBuffer ();
560 writer.Write (arr, 0, (int) ms.Position);
564 s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
568 public void ProcessEndRecordInitiator ()
570 s.WriteByte (EndRecord); // it is required
574 public void ProcessEndRecordRecipient ()
577 if ((b = s.ReadByte ()) != EndRecord)
578 throw new ProtocolException (String.Format ("EndRecord message was expected, got {0:X}", b));