2009-05-26 Atsushi Enomoto <atsushi@ximian.com>
[mono.git] / mcs / class / System.ServiceModel / System.ServiceModel.Channels / TcpDuplexSessionChannel.cs
index b8e41a0b08752128338d42705c189fb37278f576..cd3dbde1132cc0b97b25ecb8d048949a81c2ac08 100644 (file)
@@ -30,6 +30,7 @@
 //
 
 using System;
+using System.Collections.Generic;
 using System.IO;
 using System.Net;
 using System.Net.Sockets;
@@ -43,6 +44,26 @@ namespace System.ServiceModel.Channels
 {
        internal class TcpDuplexSessionChannel : DuplexChannelBase, IDuplexSessionChannel
        {
+               class TcpDuplexSession : DuplexSessionBase
+               {
+                       TcpDuplexSessionChannel owner;
+
+                       internal TcpDuplexSession (TcpDuplexSessionChannel owner)
+                       {
+                               this.owner = owner;
+                       }
+
+                       public override TimeSpan DefaultCloseTimeout {
+                               get { return owner.DefaultCloseTimeout; }
+                       }
+
+                       public override void Close (TimeSpan timeout)
+                       {
+                               // FIXME: what to do here?
+                               throw new NotImplementedException ();
+                       }
+               }
+
                TcpChannelInfo info;
                TcpClient client;
                bool is_service_side;
@@ -50,28 +71,24 @@ namespace System.ServiceModel.Channels
                TcpListener tcp_listener;
                TimeSpan timeout;
                TcpBinaryFrameManager frame;
+               TcpDuplexSession session;
                
                public TcpDuplexSessionChannel (ChannelFactoryBase factory, TcpChannelInfo info, EndpointAddress address, Uri via)
                        : base (factory, address, via)
                {
                        is_service_side = false;
                        this.info = info;
+                       session = new TcpDuplexSession (this);
                }
                
-               public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpClient acceptedRequest, TimeSpan timeout)
+               public TcpDuplexSessionChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpListener tcpListener, TimeSpan timeout)
                        : base (listener)
                {
                        is_service_side = true;
+                       tcp_listener = tcpListener;
                        this.info = info;
-                       this.client = acceptedRequest;
+                       session = new TcpDuplexSession (this);
                        this.timeout = timeout;
-
-                       Stream s = client.GetStream ();
-
-                       frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s);
-                       frame.ProcessPreambleRecipient ();
-
-                       // FIXME: use retrieved record properties in the request processing.
                }
                
                public MessageEncoder Encoder {
@@ -82,27 +99,8 @@ namespace System.ServiceModel.Channels
                        get { return local_address; }
                }
                
-               // FIXME: implement
                public IDuplexSession Session {
-                       get { throw new NotImplementedException (); }
-               }
-               
-               [MonoTODO]
-               public override IAsyncResult BeginSend (Message message, AsyncCallback callback, object state)
-               {
-                       throw new NotImplementedException ();
-               }
-               
-               [MonoTODO]
-               public override IAsyncResult BeginSend (Message message, TimeSpan timeout, AsyncCallback callback, object state)
-               {
-                       throw new NotImplementedException ();
-               }
-               
-               [MonoTODO]
-               public override void EndSend (IAsyncResult result)
-               {
-                       throw new NotImplementedException ();
+                       get { return session; }
                }
 
                public override void Send (Message message)
@@ -112,38 +110,16 @@ namespace System.ServiceModel.Channels
                
                public override void Send (Message message, TimeSpan timeout)
                {
-                       client.SendTimeout = (int) timeout.TotalMilliseconds;
-                       MemoryStream ms = new MemoryStream ();
-                       BinaryFormatter bf = new BinaryFormatter ();
-                       
-                       try
-                       {
-                               NetworkStream stream = client.GetStream ();
-                               MyBinaryWriter bw = new MyBinaryWriter (stream);
-                               bw.Write ((byte) 6);
-                               Encoder.WriteMessage (message, ms);
-                               bw.WriteBytes (ms.ToArray ());
-                               bw.Write ((byte) 7);
-                               bw.Flush ();
+                       if (!is_service_side && message.Headers.To == null)
+                               message.Headers.To = RemoteAddress.Uri;
+                       if (!is_service_side && message.Headers.ReplyTo == null)
+                               message.Headers.ReplyTo = new EndpointAddress (Constants.WsaAnonymousUri);
 
-                               stream.ReadByte (); // 7
-                       }
-                       catch (Exception e)
-                       {
-                               throw e;
-                       }
-               }
-               
-               [MonoTODO]
-               public override IAsyncResult BeginReceive (AsyncCallback callback, object state)
-               {
-                       throw new NotImplementedException ();
-               }
-               
-               [MonoTODO]
-               public override IAsyncResult BeginReceive (TimeSpan timeout, AsyncCallback callback, object state)
-               {
-                       throw new NotImplementedException ();
+                       client.SendTimeout = (int) timeout.TotalMilliseconds;
+                       frame.WriteSizedMessage (message);
+                       // FIXME: should EndRecord be sent here?
+                       //if (is_service_side && client.Available > 0)
+                       //      frame.ProcessEndRecordRecipient ();
                }
                
                [MonoTODO]
@@ -152,30 +128,12 @@ namespace System.ServiceModel.Channels
                        throw new NotImplementedException ();
                }
                
-               [MonoTODO]
-               public override IAsyncResult BeginWaitForMessage (TimeSpan timeout, AsyncCallback callback, object state)
-               {
-                       throw new NotImplementedException ();
-               }
-               
-               [MonoTODO]
-               public override Message EndReceive (IAsyncResult result)
-               {
-                       throw new NotImplementedException ();
-               }
-               
                [MonoTODO]
                public override bool EndTryReceive (IAsyncResult result, out Message message)
                {
                        throw new NotImplementedException ();
                }
                
-               [MonoTODO]
-               public override bool EndWaitForMessage (IAsyncResult result)
-               {
-                       throw new NotImplementedException ();
-               }
-               
                public override Message Receive ()
                {
                        return Receive (DefaultReceiveTimeout);
@@ -185,38 +143,8 @@ namespace System.ServiceModel.Channels
                {
                        client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
                        Stream s = client.GetStream ();
-                       var packetType = s.ReadByte (); // 6
-                       if (packetType != 6)
-                               throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
-
-                       // FIXME: implement [MC-NMF] correctly. Currently it is a guessed protocol hack.
-                       byte [] buffer = frame.ReadSizedChunk ();
-
-                       var ms = new MemoryStream (buffer, 0, buffer.Length);
-                       // The returned buffer consists of a serialized reader 
-                       // session and the binary xml body. 
-                       // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
-
-                       var session = new XmlBinaryReaderSession ();
-                       byte [] rsbuf = new TcpBinaryFrameManager (0, ms).ReadSizedChunk ();
-                       int count = 0;
-                       using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
-                               var rbr = new BinaryReader (rms, Encoding.UTF8);
-                               while (rms.Position < rms.Length)
-                                       session.Add (count++, rbr.ReadString ());
-                       }
-                       var benc = Encoder as BinaryMessageEncoder;
-                       if (benc != null)
-                               benc.CurrentBinarySession = session;
-                       // FIXME: supply maxSizeOfHeaders.
-                       Message msg = Encoder.ReadMessage (ms, 0x10000);
-                       if (benc != null)
-                               benc.CurrentBinarySession = null;
-//                     s.ReadByte (); // 7
-//                     s.WriteByte (7);
-                       s.Flush ();
 
-                       return msg;
+                       return frame.ReadSizedMessage ();
                }
                
                public override bool TryReceive (TimeSpan timeout, out Message message)
@@ -232,9 +160,16 @@ namespace System.ServiceModel.Channels
                
                public override bool WaitForMessage (TimeSpan timeout)
                {
-                       client.ReceiveTimeout = (int) timeout.TotalMilliseconds;
+                       // FIXME: use timeout
                        try {
-                               client.GetStream ();
+                               client = tcp_listener.AcceptTcpClient ();
+                               Stream s = client.GetStream ();
+
+                               frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, s) { Encoder = this.Encoder };
+                               frame.ProcessPreambleRecipient ();
+
+                               // FIXME: use retrieved record properties in the request processing.
+
                                return true;
                        } catch (TimeoutException) {
                                return false;
@@ -290,16 +225,13 @@ namespace System.ServiceModel.Channels
                                                        //RemoteAddress.Uri.Port);
                                
                                NetworkStream ns = client.GetStream ();
-                               frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns);
-                               // FIXME: it still results in SocketException (remote host closes the connection).
-                               frame.Via = RemoteAddress.Uri;
+                               frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.DuplexMode, ns) {
+                                       Encoder = this.Encoder,
+                                       Via = RemoteAddress.Uri };
                                frame.ProcessPreambleInitiator ();
+                       } else {
+                               // server side
                        }
-                       // Service side.
-                       /*
-                       else
-                               Console.WriteLine ("Server side.");
-                       */
                }
                
                class MyBinaryWriter : BinaryWriter
@@ -333,6 +265,32 @@ namespace System.ServiceModel.Channels
                        }
                }
 
+               class MyBinaryWriter : BinaryWriter
+               {
+                       public MyBinaryWriter (Stream s)
+                               : base (s)
+                       {
+                       }
+
+                       public void WriteVariableInt (int value)
+                       {
+                               Write7BitEncodedInt (value);
+                       }
+               }
+
+               class MyXmlBinaryWriterSession : XmlBinaryWriterSession
+               {
+                       public override bool TryAdd (XmlDictionaryString value, out int key)
+                       {
+                               if (!base.TryAdd (value, out key))
+                                       return false;
+                               List.Add (value);
+                               return true;
+                       }
+
+                       public List<XmlDictionaryString> List = new List<XmlDictionaryString> ();
+               }
+
                public const byte VersionRecord = 0;
                public const byte ModeRecord = 1;
                public const byte ViaRecord = 2;
@@ -352,20 +310,27 @@ namespace System.ServiceModel.Channels
                public const byte SimplexMode = 3;
                public const byte SingletonSizedMode = 4;
                MyBinaryReader reader;
-               BinaryWriter writer;
+               MyBinaryWriter writer;
 
                public TcpBinaryFrameManager (int mode, Stream s)
                {
                        this.mode = mode;
                        this.s = s;
                        reader = new MyBinaryReader (s);
-                       writer = new BinaryWriter (s);
+                       writer = new MyBinaryWriter (s);
+
+                       EncodingRecord = 8; // FIXME: it should depend on mode.
                }
 
                Stream s;
 
                int mode;
-               int encoding_record = 3; // SOAP12, UTF-8
+
+               public byte EncodingRecord { get; set; }
+
+               public Uri Via { get; set; }
+
+               public MessageEncoder Encoder { get; set; }
 
                public byte [] ReadSizedChunk ()
                {
@@ -375,12 +340,16 @@ namespace System.ServiceModel.Channels
                                throw new InvalidOperationException ("The message is too large.");
 
                        byte [] buffer = new byte [length];
-                       reader.Read (buffer, 0, length);
-                       
+                       for (int readSize = 0; readSize < length; )
+                               readSize += reader.Read (buffer, readSize, length - readSize);
                        return buffer;
                }
 
-               public Uri Via { get; set; }
+               public void WriteSizedChunk (byte [] data)
+               {
+                       writer.WriteVariableInt (data.Length);
+                       writer.Write (data, 0, data.Length);
+               }
 
                public void ProcessPreambleInitiator ()
                {
@@ -392,7 +361,7 @@ namespace System.ServiceModel.Channels
                        s.WriteByte (ViaRecord);
                        writer.Write (Via.ToString ());
                        s.WriteByte (KnownEncodingRecord); // FIXME
-                       s.WriteByte ((byte) encoding_record);
+                       s.WriteByte ((byte) EncodingRecord);
                        s.WriteByte (PreambleEndRecord);
                        s.Flush ();
 
@@ -403,7 +372,7 @@ namespace System.ServiceModel.Channels
                        case FaultRecord:
                                throw new FaultException (reader.ReadString ());
                        default:
-                               throw new ArgumentException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
+                               throw new ProtocolException (String.Format ("Preamble Ack Record is expected, got {0:X}", b));
                        }
                }
 
@@ -415,19 +384,19 @@ namespace System.ServiceModel.Channels
                                switch (b) {
                                case VersionRecord:
                                        if (s.ReadByte () != 1)
-                                               throw new ArgumentException ("Major version must be 1");
+                                               throw new ProtocolException ("Major version must be 1");
                                        if (s.ReadByte () != 0)
-                                               throw new ArgumentException ("Minor version must be 0");
+                                               throw new ProtocolException ("Minor version must be 0");
                                        break;
                                case ModeRecord:
                                        if (s.ReadByte () != mode)
-                                               throw new ArgumentException (String.Format ("Duplex mode is expected to be {0:X}", mode));
+                                               throw new ProtocolException (String.Format ("Duplex mode is expected to be {0:X}", mode));
                                        break;
                                case ViaRecord:
                                        Via = new Uri (reader.ReadString ());
                                        break;
                                case KnownEncodingRecord:
-                                       encoding_record = s.ReadByte ();
+                                       EncodingRecord = (byte) s.ReadByte ();
                                        break;
                                case ExtendingEncodingRecord:
                                        throw new NotImplementedException ();
@@ -439,10 +408,93 @@ namespace System.ServiceModel.Channels
                                        preambleEnd = true;
                                        break;
                                default:
-                                       throw new ArgumentException (String.Format ("Unexpected record type {0:X2}", b));
+                                       throw new ProtocolException (String.Format ("Unexpected record type {0:X2}", b));
                                }
                        }
                        s.WriteByte (PreambleAckRecord);
                }
+
+               public Message ReadSizedMessage ()
+               {
+                       // FIXME: implement [MC-NMF] correctly. Currently it is a guessed protocol hack.
+
+                       var packetType = s.ReadByte ();
+                       if (packetType != SizedEnvelopeRecord)
+                               throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
+
+                       byte [] buffer = ReadSizedChunk ();
+
+                       var ms = new MemoryStream (buffer, 0, buffer.Length);
+
+                       // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
+                       if (EncodingRecord != 8)
+                               throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
+
+                       // Encoding type 8:
+                       // the returned buffer consists of a serialized reader 
+                       // session and the binary xml body. 
+
+                       var session = new XmlBinaryReaderSession ();
+                       byte [] rsbuf = new TcpBinaryFrameManager (0, ms).ReadSizedChunk ();
+                       int count = 0;
+                       using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
+                               var rbr = new BinaryReader (rms, Encoding.UTF8);
+                               while (rms.Position < rms.Length)
+                                       session.Add (count++, rbr.ReadString ());
+                       }
+                       var benc = Encoder as BinaryMessageEncoder;
+                       if (benc != null)
+                               benc.CurrentReaderSession = session;
+                       // FIXME: supply maxSizeOfHeaders.
+                       Message msg = Encoder.ReadMessage (ms, 0x10000);
+                       if (benc != null)
+                               benc.CurrentReaderSession = null;
+                       s.Flush ();
+
+                       return msg;
+               }
+
+               public void WriteSizedMessage (Message message)
+               {
+                       // FIXME: implement full [MC-NMF] protocol.
+
+                       if (EncodingRecord != 8)
+                               throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
+
+                       s.WriteByte (SizedEnvelopeRecord);
+
+                       MemoryStream ms = new MemoryStream ();
+                       var session = new MyXmlBinaryWriterSession ();
+                       var benc = Encoder as BinaryMessageEncoder;
+                       try {
+                               if (benc != null)
+                                       benc.CurrentWriterSession = session;
+                               Encoder.WriteMessage (message, ms);
+                       } finally {
+                               benc.CurrentWriterSession = null;
+                       }
+
+                       // dictionary
+                       MemoryStream msd = new MemoryStream ();
+                       BinaryWriter dw = new BinaryWriter (msd);
+                       foreach (var ds in session.List)
+                               dw.Write (ds.Value);
+                       dw.Flush ();
+                       writer.WriteVariableInt ((int) (msd.Position + ms.Position));
+                       WriteSizedChunk (msd.ToArray ());
+                       // message body
+                       var arr = ms.GetBuffer ();
+                       writer.Write (arr, 0, (int) ms.Position);
+
+                       writer.Write (EndRecord);
+                       writer.Flush ();
+               }
+
+               public void ProcessEndRecordRecipient ()
+               {
+                       int b;
+                       if ((b = s.ReadByte ()) != EndRecord)
+                               throw new ProtocolException (String.Format ("EndRequest message was expected, got {0:X}", b));
+               }
        }
 }