2 // TcpDuplexSessionChannel.cs
5 // Marcos Cobena (marcoscobena@gmail.com)
6 // Atsushi Enomoto <atsushi@ximian.com>
8 // Copyright 2007 Marcos Cobena (http://www.youcannoteatbits.org/)
10 // Copyright (C) 2009 Novell, Inc (http://www.novell.com)
12 // Permission is hereby granted, free of charge, to any person obtaining
13 // a copy of this software and associated documentation files (the
14 // "Software"), to deal in the Software without restriction, including
15 // without limitation the rights to use, copy, modify, merge, publish,
16 // distribute, sublicense, and/or sell copies of the Software, and to
17 // permit persons to whom the Software is furnished to do so, subject to
18 // the following conditions:
20 // The above copyright notice and this permission notice shall be
21 // included in all copies or substantial portions of the Software.
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
27 // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
28 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
29 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 using System.Collections.Generic;
36 using System.Net.Sockets;
37 using System.Runtime.Serialization;
38 using System.Runtime.Serialization.Formatters.Binary;
39 using System.ServiceModel.Channels;
43 namespace System.ServiceModel.Channels
45 internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
47 class TcpDuplexSession : DuplexSessionBase
49 TcpDuplexSessionChannel owner;
51 internal TcpDuplexSession (TcpDuplexSessionChannel owner)
56 public override TimeSpan DefaultCloseTimeout {
57 get { return owner.DefaultCloseTimeout; }
60 public override void Close (TimeSpan timeout)
62 // FIXME: what to do here?
63 throw new NotImplementedException ();
70 EndpointAddress local_address;
71 TcpListener tcp_listener;
73 TcpBinaryFrameManager frame;
74 TcpDuplexSession session;
76 public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
77 : base (factory, address, via)
79 is_service_side = false;
81 session = new TcpDuplexSession (this);
84 public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpListener tcpListener, TimeSpan timeout)
87 is_service_side = true;
88 tcp_listener = tcpListener;
90 session = new TcpDuplexSession (this);
91 this.timeout = timeout;
94 public MessageEncoder Encoder {
95 get { return info.MessageEncoder; }
98 public override EndpointAddress LocalAddress {
99 get { return local_address; }
102 public IDuplexSession Session {
103 get { return session; }
106 public override void Send (Message message)
108 Send (message, DefaultSendTimeout);
111 public override void Send (Message message, TimeSpan timeout)
113 // FIXME: add MessageID and ReplyTo (might not be here; it's likely in session channel in common)
115 if (!is_service_side && message.Headers.To == null)
116 message.Headers.To = RemoteAddress.Uri;
117 client.SendTimeout = (int) timeout.TotalMilliseconds;
118 frame.WriteSizedMessage (message);
119 // FIXME: should EndRecord be sent here?
120 //if (is_service_side && client.Available > 0)
121 // frame.ProcessEndRecordRecipient ();
125 public override IAsyncResult BeginTryReceive (TimeSpan timeout, AsyncCallback callback, object state)
127 throw new NotImplementedException ();
131 public override bool EndTryReceive (IAsyncResult result, out Message message)
133 throw new NotImplementedException ();
136 public override Message Receive ()
138 return Receive (DefaultReceiveTimeout);
141 public override Message Receive (TimeSpan timeout)
143 client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
144 Stream s = client.GetStream ();
146 return frame.ReadSizedMessage ();
149 public override bool TryReceive (TimeSpan timeout, out Message message)
152 message = Receive (timeout);
154 } catch (TimeoutException) {
160 public override bool WaitForMessage (TimeSpan timeout)
162 // FIXME: use timeout
164 client = tcp_listener.AcceptTcpClient ();
165 Stream s = client.GetStream ();
167 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s) { Encoder = this.Encoder };
168 frame.ProcessPreambleRecipient ();
170 // FIXME: use retrieved record properties in the request processing.
173 } catch (TimeoutException) {
178 // CommunicationObject
181 protected override void OnAbort ()
183 throw new NotImplementedException ();
187 protected override IAsyncResult OnBeginClose (TimeSpan timeout,
188 AsyncCallback callback, object state)
190 throw new NotImplementedException ();
194 protected override IAsyncResult OnBeginOpen (TimeSpan timeout,
195 AsyncCallback callback, object state)
197 throw new NotImplementedException ();
201 protected override void OnClose (TimeSpan timeout)
207 protected override void OnEndClose (IAsyncResult result)
209 throw new NotImplementedException ();
213 protected override void OnEndOpen (IAsyncResult result)
215 throw new NotImplementedException ();
219 protected override void OnOpen (TimeSpan timeout)
221 if (! is_service_side) {
222 int explicitPort = RemoteAddress.Uri.Port;
223 client = new TcpClient (RemoteAddress.Uri.Host, explicitPort <= 0 ? TcpTransportBindingElement.DefaultPort : explicitPort);
224 //RemoteAddress.Uri.Port);
226 NetworkStream ns = client.GetStream ();
227 frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns) {
228 Encoder = this.Encoder,
229 Via = RemoteAddress.Uri };
230 frame.ProcessPreambleInitiator ();
236 class MyBinaryWriter : BinaryWriter
238 public MyBinaryWriter (Stream s)
243 public void WriteBytes (byte [] bytes)
245 Write7BitEncodedInt (bytes.Length);
251 // seealso: [MC-NMF] Windows Protocol document.
252 class TcpBinaryFrameManager
254 class MyBinaryReader : BinaryReader
256 public MyBinaryReader (Stream s)
261 public int ReadVariableInt ()
263 return Read7BitEncodedInt ();
267 class MyBinaryWriter : BinaryWriter
269 public MyBinaryWriter (Stream s)
274 public void WriteVariableInt (int value)
276 Write7BitEncodedInt (value);
280 class MyXmlBinaryWriterSession : XmlBinaryWriterSession
282 public override bool TryAdd (XmlDictionaryString value, out int key)
284 if (!base.TryAdd (value, out key))
290 public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
293 public const byte VersionRecord = 0;
294 public const byte ModeRecord = 1;
295 public const byte ViaRecord = 2;
296 public const byte KnownEncodingRecord = 3;
297 public const byte ExtendingEncodingRecord = 4;
298 public const byte UnsizedEnvelopeRecord = 5;
299 public const byte SizedEnvelopeRecord = 6;
300 public const byte EndRecord = 7;
301 public const byte FaultRecord = 8;
302 public const byte UpgradeRequestRecord = 9;
303 public const byte UpgradeResponseRecord = 0xA;
304 public const byte PreambleAckRecord = 0xB;
305 public const byte PreambleEndRecord = 0xC;
307 public const byte SingletonUnsizedMode = 1;
308 public const byte DuplexMode = 2;
309 public const byte SimplexMode = 3;
310 public const byte SingletonSizedMode = 4;
311 MyBinaryReader reader;
312 MyBinaryWriter writer;
314 public TcpBinaryFrameManager (int mode, Stream s)
318 reader = new MyBinaryReader (s);
319 writer = new MyBinaryWriter (s);
321 EncodingRecord = 8; // FIXME: it should depend on mode.
328 public byte EncodingRecord { get; set; }
330 public Uri Via { get; set; }
332 public MessageEncoder Encoder { get; set; }
334 public byte [] ReadSizedChunk ()
336 int length = reader.ReadVariableInt ();
339 throw new InvalidOperationException ("The message is too large.");
341 byte [] buffer = new byte [length];
342 reader.Read (buffer, 0, length);
347 public void WriteSizedChunk (byte [] data)
349 writer.WriteVariableInt (data.Length);
350 writer.Write (data, 0, data.Length);
353 public void ProcessPreambleInitiator ()
355 s.WriteByte (VersionRecord);
358 s.WriteByte (ModeRecord);
359 s.WriteByte ((byte) mode);
360 s.WriteByte (ViaRecord);
361 writer.Write (Via.ToString ());
362 s.WriteByte (KnownEncodingRecord); // FIXME
363 s.WriteByte ((byte) EncodingRecord);
364 s.WriteByte (PreambleEndRecord);
367 int b = s.ReadByte ();
369 case PreambleAckRecord:
372 throw new FaultException (reader.ReadString ());
374 throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
378 public void ProcessPreambleRecipient ()
380 bool preambleEnd = false;
381 while (!preambleEnd) {
382 int b = s.ReadByte ();
385 if (s.ReadByte () != 1)
386 throw new ProtocolException ("Major version must be 1");
387 if (s.ReadByte () != 0)
388 throw new ProtocolException ("Minor version must be 0");
391 if (s.ReadByte () != mode)
392 throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
395 Via = new Uri (reader.ReadString ());
397 case KnownEncodingRecord:
398 EncodingRecord = (byte) s.ReadByte ();
400 case ExtendingEncodingRecord:
401 throw new NotImplementedException ();
402 case UpgradeRequestRecord:
403 throw new NotImplementedException ();
404 case UpgradeResponseRecord:
405 throw new NotImplementedException ();
406 case PreambleEndRecord:
410 throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
413 s.WriteByte (PreambleAckRecord);
416 public Message ReadSizedMessage ()
418 // FIXME: implement [MC-NMF] correctly. Currently it is a guessed protocol hack.
420 var packetType = s.ReadByte ();
421 if (packetType != SizedEnvelopeRecord)
422 throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
424 byte [] buffer = ReadSizedChunk ();
426 var ms = new MemoryStream (buffer, 0, buffer.Length);
428 // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
429 if (EncodingRecord != 8)
430 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
433 // the returned buffer consists of a serialized reader
434 // session and the binary xml body.
436 var session = new XmlBinaryReaderSession ();
437 byte [] rsbuf = new TcpBinaryFrameManager (0, ms).ReadSizedChunk ();
439 using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
440 var rbr = new BinaryReader (rms, Encoding.UTF8);
441 while (rms.Position < rms.Length)
442 session.Add (count++, rbr.ReadString ());
444 var benc = Encoder as BinaryMessageEncoder;
446 benc.CurrentReaderSession = session;
447 // FIXME: supply maxSizeOfHeaders.
448 Message msg = Encoder.ReadMessage (ms, 0x10000);
450 benc.CurrentReaderSession = null;
456 public void WriteSizedMessage (Message message)
458 // FIXME: implement full [MC-NMF] protocol.
460 if (EncodingRecord != 8)
461 throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
463 s.WriteByte (SizedEnvelopeRecord);
465 MemoryStream ms = new MemoryStream ();
466 var session = new MyXmlBinaryWriterSession ();
467 var benc = Encoder as BinaryMessageEncoder;
470 benc.CurrentWriterSession = session;
471 Encoder.WriteMessage (message, ms);
473 benc.CurrentWriterSession = null;
477 MemoryStream msd = new MemoryStream ();
478 BinaryWriter dw = new BinaryWriter (msd);
479 foreach (var ds in session.List)
482 writer.WriteVariableInt ((int) (msd.Position + ms.Position));
483 WriteSizedChunk (msd.ToArray ());
485 var arr = ms.GetBuffer ();
486 writer.Write (arr, 0, (int) ms.Position);
488 writer.Write (EndRecord);
492 public void ProcessEndRecordRecipient ()
495 if ((b = s.ReadByte ()) != EndRecord)
496 throw new ProtocolException (String.Format ("EndRequest message was expected, got {0:X}", b));