//
using System;
+using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
{
internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
{
+ class TcpDuplexSession : DuplexSessionBase
+ {
+ TcpDuplexSessionChannel owner;
+
+ internal TcpDuplexSession (TcpDuplexSessionChannel owner)
+ {
+ this.owner = owner;
+ }
+
+ public override TimeSpan DefaultCloseTimeout {
+ get { return owner.DefaultCloseTimeout; }
+ }
+
+ public override void Close (TimeSpan timeout)
+ {
+ // FIXME: what to do here?
+ throw new NotImplementedException ();
+ }
+ }
+
TcpChannelInfo info;
TcpClient client;
bool is_service_side;
TcpListener tcp_listener;
TimeSpan timeout;
TcpBinaryFrameManager frame;
+ TcpDuplexSession session;
public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
: base (factory, address, via)
{
is_service_side = false;
this.info = info;
+ session = new TcpDuplexSession (this);
}
- public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpClient acceptedRequest, TimeSpan timeout)
+ public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpListener tcpListener, TimeSpan timeout)
: base (listener)
{
is_service_side = true;
+ tcp_listener = tcpListener;
this.info = info;
- this.client = acceptedRequest;
+ session = new TcpDuplexSession (this);
this.timeout = timeout;
-
- Stream s = client.GetStream ();
-
- frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s);
- frame.ProcessPreambleRecipient ();
-
- // FIXME: use retrieved record properties in the request processing.
}
public MessageEncoder Encoder {
get { return local_address; }
}
- // FIXME: implement
public IDuplexSession Session {
- get { throw new NotImplementedException (); }
- }
-
- [MonoTODO]
- public override IAsyncResult BeginSend (Message message, AsyncCallback callback, object state)
- {
- throw new NotImplementedException ();
- }
-
- [MonoTODO]
- public override IAsyncResult BeginSend (Message message, TimeSpan timeout, AsyncCallback callback, object state)
- {
- throw new NotImplementedException ();
- }
-
- [MonoTODO]
- public override void EndSend (IAsyncResult result)
- {
- throw new NotImplementedException ();
+ get { return session; }
}
public override void Send (Message message)
public override void Send (Message message, TimeSpan timeout)
{
- client.SendTimeout = (int) timeout.TotalMilliseconds;
- MemoryStream ms = new MemoryStream ();
- BinaryFormatter bf = new BinaryFormatter ();
-
- try
- {
- NetworkStream stream = client.GetStream ();
- MyBinaryWriter bw = new MyBinaryWriter (stream);
- bw.Write ((byte) 6);
- Encoder.WriteMessage (message, ms);
- bw.WriteBytes (ms.ToArray ());
- bw.Write ((byte) 7);
- bw.Flush ();
+ if (!is_service_side && message.Headers.To == null)
+ message.Headers.To = RemoteAddress.Uri;
+ if (!is_service_side && message.Headers.ReplyTo == null)
+ message.Headers.ReplyTo = new EndpointAddress (Constants.WsaAnonymousUri);
- stream.ReadByte (); // 7
- }
- catch (Exception e)
- {
- throw e;
- }
- }
-
- [MonoTODO]
- public override IAsyncResult BeginReceive (AsyncCallback callback, object state)
- {
- throw new NotImplementedException ();
- }
-
- [MonoTODO]
- public override IAsyncResult BeginReceive (TimeSpan timeout, AsyncCallback callback, object state)
- {
- throw new NotImplementedException ();
+ client.SendTimeout = (int) timeout.TotalMilliseconds;
+ frame.WriteSizedMessage (message);
+ // FIXME: should EndRecord be sent here?
+ //if (is_service_side && client.Available > 0)
+ // frame.ProcessEndRecordRecipient ();
}
[MonoTODO]
throw new NotImplementedException ();
}
- [MonoTODO]
- public override IAsyncResult BeginWaitForMessage (TimeSpan timeout, AsyncCallback callback, object state)
- {
- throw new NotImplementedException ();
- }
-
- [MonoTODO]
- public override Message EndReceive (IAsyncResult result)
- {
- throw new NotImplementedException ();
- }
-
[MonoTODO]
public override bool EndTryReceive (IAsyncResult result, out Message message)
{
throw new NotImplementedException ();
}
- [MonoTODO]
- public override bool EndWaitForMessage (IAsyncResult result)
- {
- throw new NotImplementedException ();
- }
-
public override Message Receive ()
{
return Receive (DefaultReceiveTimeout);
{
client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
Stream s = client.GetStream ();
- var packetType = s.ReadByte (); // 6
- if (packetType != 6)
- throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
-
- // FIXME: implement [MC-NMF] correctly. Currently it is a guessed protocol hack.
- byte [] buffer = frame.ReadSizedChunk ();
-
- var ms = new MemoryStream (buffer, 0, buffer.Length);
- // The returned buffer consists of a serialized reader
- // session and the binary xml body.
- // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
-
- var session = new XmlBinaryReaderSession ();
- byte [] rsbuf = new TcpBinaryFrameManager (0, ms).ReadSizedChunk ();
- int count = 0;
- using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
- var rbr = new BinaryReader (rms, Encoding.UTF8);
- while (rms.Position < rms.Length)
- session.Add (count++, rbr.ReadString ());
- }
- var benc = Encoder as BinaryMessageEncoder;
- if (benc != null)
- benc.CurrentBinarySession = session;
- // FIXME: supply maxSizeOfHeaders.
- Message msg = Encoder.ReadMessage (ms, 0x10000);
- if (benc != null)
- benc.CurrentBinarySession = null;
-// s.ReadByte (); // 7
-// s.WriteByte (7);
- s.Flush ();
- return msg;
+ return frame.ReadSizedMessage ();
}
public override bool TryReceive (TimeSpan timeout, out Message message)
public override bool WaitForMessage (TimeSpan timeout)
{
- client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
+ // FIXME: use timeout
try {
- client.GetStream ();
+ client = tcp_listener.AcceptTcpClient ();
+ Stream s = client.GetStream ();
+
+ frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s) { Encoder = this.Encoder };
+ frame.ProcessPreambleRecipient ();
+
+ // FIXME: use retrieved record properties in the request processing.
+
return true;
} catch (TimeoutException) {
return false;
//RemoteAddress.Uri.Port);
NetworkStream ns = client.GetStream ();
- frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns);
- // FIXME: it still results in SocketException (remote host closes the connection).
- frame.Via = RemoteAddress.Uri;
+ frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns) {
+ Encoder = this.Encoder,
+ Via = RemoteAddress.Uri };
frame.ProcessPreambleInitiator ();
+ } else {
+ // server side
}
- // Service side.
- /*
- else
- Console.WriteLine ("Server side.");
- */
}
class MyBinaryWriter : BinaryWriter
}
}
+ class MyBinaryWriter : BinaryWriter
+ {
+ public MyBinaryWriter (Stream s)
+ : base (s)
+ {
+ }
+
+ public void WriteVariableInt (int value)
+ {
+ Write7BitEncodedInt (value);
+ }
+ }
+
+ class MyXmlBinaryWriterSession : XmlBinaryWriterSession
+ {
+ public override bool TryAdd (XmlDictionaryString value, out int key)
+ {
+ if (!base.TryAdd (value, out key))
+ return false;
+ List.Add (value);
+ return true;
+ }
+
+ public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
+ }
+
public const byte VersionRecord = 0;
public const byte ModeRecord = 1;
public const byte ViaRecord = 2;
public const byte SimplexMode = 3;
public const byte SingletonSizedMode = 4;
MyBinaryReader reader;
- BinaryWriter writer;
+ MyBinaryWriter writer;
public TcpBinaryFrameManager (int mode, Stream s)
{
this.mode = mode;
this.s = s;
reader = new MyBinaryReader (s);
- writer = new BinaryWriter (s);
+ writer = new MyBinaryWriter (s);
+
+ EncodingRecord = 8; // FIXME: it should depend on mode.
}
Stream s;
int mode;
- int encoding_record = 3; // SOAP12, UTF-8
+
+ public byte EncodingRecord { get; set; }
+
+ public Uri Via { get; set; }
+
+ public MessageEncoder Encoder { get; set; }
public byte [] ReadSizedChunk ()
{
throw new InvalidOperationException ("The message is too large.");
byte [] buffer = new byte [length];
- reader.Read (buffer, 0, length);
-
+ for (int readSize = 0; readSize < length; )
+ readSize += reader.Read (buffer, readSize, length - readSize);
return buffer;
}
- public Uri Via { get; set; }
+ public void WriteSizedChunk (byte [] data)
+ {
+ writer.WriteVariableInt (data.Length);
+ writer.Write (data, 0, data.Length);
+ }
public void ProcessPreambleInitiator ()
{
s.WriteByte (ViaRecord);
writer.Write (Via.ToString ());
s.WriteByte (KnownEncodingRecord); // FIXME
- s.WriteByte ((byte) encoding_record);
+ s.WriteByte ((byte) EncodingRecord);
s.WriteByte (PreambleEndRecord);
s.Flush ();
case FaultRecord:
throw new FaultException (reader.ReadString ());
default:
- throw new ArgumentException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
+ throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
}
}
switch (b) {
case VersionRecord:
if (s.ReadByte () != 1)
- throw new ArgumentException ("Major version must be 1");
+ throw new ProtocolException ("Major version must be 1");
if (s.ReadByte () != 0)
- throw new ArgumentException ("Minor version must be 0");
+ throw new ProtocolException ("Minor version must be 0");
break;
case ModeRecord:
if (s.ReadByte () != mode)
- throw new ArgumentException (String.Format ("Duplex mode is expected to be {0:X}", mode));
+ throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
break;
case ViaRecord:
Via = new Uri (reader.ReadString ());
break;
case KnownEncodingRecord:
- encoding_record = s.ReadByte ();
+ EncodingRecord = (byte) s.ReadByte ();
break;
case ExtendingEncodingRecord:
throw new NotImplementedException ();
preambleEnd = true;
break;
default:
- throw new ArgumentException (String.Format ("Unexpected record type {0:X2}", b));
+ throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
}
}
s.WriteByte (PreambleAckRecord);
}
+
+ public Message ReadSizedMessage ()
+ {
+ // FIXME: implement [MC-NMF] correctly. Currently it is a guessed protocol hack.
+
+ var packetType = s.ReadByte ();
+ if (packetType != SizedEnvelopeRecord)
+ throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
+
+ byte [] buffer = ReadSizedChunk ();
+
+ var ms = new MemoryStream (buffer, 0, buffer.Length);
+
+ // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
+ if (EncodingRecord != 8)
+ throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
+
+ // Encoding type 8:
+ // the returned buffer consists of a serialized reader
+ // session and the binary xml body.
+
+ var session = new XmlBinaryReaderSession ();
+ byte [] rsbuf = new TcpBinaryFrameManager (0, ms).ReadSizedChunk ();
+ int count = 0;
+ using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
+ var rbr = new BinaryReader (rms, Encoding.UTF8);
+ while (rms.Position < rms.Length)
+ session.Add (count++, rbr.ReadString ());
+ }
+ var benc = Encoder as BinaryMessageEncoder;
+ if (benc != null)
+ benc.CurrentReaderSession = session;
+ // FIXME: supply maxSizeOfHeaders.
+ Message msg = Encoder.ReadMessage (ms, 0x10000);
+ if (benc != null)
+ benc.CurrentReaderSession = null;
+ s.Flush ();
+
+ return msg;
+ }
+
+ public void WriteSizedMessage (Message message)
+ {
+ // FIXME: implement full [MC-NMF] protocol.
+
+ if (EncodingRecord != 8)
+ throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
+
+ s.WriteByte (SizedEnvelopeRecord);
+
+ MemoryStream ms = new MemoryStream ();
+ var session = new MyXmlBinaryWriterSession ();
+ var benc = Encoder as BinaryMessageEncoder;
+ try {
+ if (benc != null)
+ benc.CurrentWriterSession = session;
+ Encoder.WriteMessage (message, ms);
+ } finally {
+ benc.CurrentWriterSession = null;
+ }
+
+ // dictionary
+ MemoryStream msd = new MemoryStream ();
+ BinaryWriter dw = new BinaryWriter (msd);
+ foreach (var ds in session.List)
+ dw.Write (ds.Value);
+ dw.Flush ();
+ writer.WriteVariableInt ((int) (msd.Position + ms.Position));
+ WriteSizedChunk (msd.ToArray ());
+ // message body
+ var arr = ms.GetBuffer ();
+ writer.Write (arr, 0, (int) ms.Position);
+
+ writer.Write (EndRecord);
+ writer.Flush ();
+ }
+
+ public void ProcessEndRecordRecipient ()
+ {
+ int b;
+ if ((b = s.ReadByte ()) != EndRecord)
+ throw new ProtocolException (String.Format ("EndRequest message was expected, got {0:X}", b));
+ }
}
}