* TcpReplyChannel.cs : new reply channel implementation.
* TcpChannelListener.cs : use above for streamed reply channel.
* TcpDuplexSessionChannel.cs : more streaming mode support.
* TcpRequestChannel.cs : a couple of updates to get it working
with the reply channel above. Still some issues on .NET interop.
* System.ServiceModel.dll.sources: add TcpReplyChannel.
svn path=/trunk/mcs/; revision=138299
+2009-07-21 Astushi Enomoto <atsushi@ximian.com>
+
+ * System.ServiceModel.dll.sources: add TcpReplyChannel.
+
2009-07-20 Jb Evain <jbevain@novell.com>
* Makefile: filter the valid profile on the framework version,
+2009-07-21 Atsushi Enomoto <atsushi@ximian.com>
+
+ * TcpReplyChannel.cs : new reply channel implementation.
+ * TcpChannelListener.cs : use above for streamed reply channel.
+ * TcpDuplexSessionChannel.cs : more streaming mode support.
+ * TcpRequestChannel.cs : a couple of updates to get it working
+ with the reply channel above. Still some issues on .NET interop.
+
2009-07-21 Atsushi Enomoto <atsushi@ximian.com>
* RequestContext.cs : added internal derived class that implements
if (typeof (TChannel) == typeof (IDuplexSessionChannel))
return (TChannel) (object) new TcpDuplexSessionChannel (this, info, client);
- // FIXME: To implement more.
- throw new NotImplementedException ();
+ if (typeof (TChannel) == typeof (IReplyChannel))
+ return (TChannel) (object) new TcpReplyChannel (this, info, client);
+
+ throw new InvalidOperationException (String.Format ("Channel type {0} is not supported.", typeof (TChannel).Name));
}
[MonoTODO]
public const byte DuplexMode = 2;
public const byte SimplexMode = 3;
public const byte SingletonSizedMode = 4;
+
+ public const byte EncodingUtf8 = 3;
+ public const byte EncodingUtf16 = 4;
+ public const byte EncodingUtf16LE = 5;
+ public const byte EncodingMtom = 6;
+ public const byte EncodingBinary = 7;
+ public const byte EncodingBinaryWithDictionary = 8;
+
MyBinaryReader reader;
MyBinaryWriter writer;
reader = new MyBinaryReader (s);
ResetWriteBuffer ();
- EncodingRecord = 8; // FIXME: it should depend on mode.
+ EncodingRecord = EncodingBinaryWithDictionary; // FIXME: it should depend on mode.
}
Stream s;
return buffer;
}
- public void WriteSizedChunk (byte [] data)
+ public void WriteSizedChunk (byte [] data, int index, int length)
{
- writer.WriteVariableInt (data.Length);
- writer.Write (data, 0, data.Length);
+ writer.WriteVariableInt (length);
+ writer.Write (data, index, length);
}
public void ProcessPreambleInitiator ()
var ms = new MemoryStream (buffer, 0, buffer.Length);
// FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
- if (EncodingRecord != 8)
+ if (EncodingRecord != EncodingBinaryWithDictionary)
throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
// Encoding type 8:
public Message ReadUnsizedMessage (TimeSpan timeout)
{
var packetType = s.ReadByte ();
+
if (packetType == EndRecord)
return null;
if (packetType != UnsizedEnvelopeRecord)
throw new NotImplementedException (String.Format ("Packet type {0:X} is not implemented", packetType));
- // FIXME: turned out that it could be either in-band dictionary ([MC-NBFSE]), or a mere xml body ([MC-NBFS]).
- if (EncodingRecord != 8)
+ // Encoding type 7 is expected
+ if (EncodingRecord != EncodingBinary)
throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
- // Encoding type 8:
- // the returned buffer consists of a serialized reader
- // session and the binary xml body.
-
- var session = reader_session ?? new XmlBinaryReaderSession ();
- reader_session = session;
- byte [] rsbuf = new TcpBinaryFrameManager (0, s, is_service_side).ReadSizedChunk ();
-
- using (var rms = new MemoryStream (rsbuf, 0, rsbuf.Length)) {
- var rbr = new BinaryReader (rms, Encoding.UTF8);
- while (rms.Position < rms.Length)
- session.Add (reader_session_items++, rbr.ReadString ());
- }
- var benc = Encoder as BinaryMessageEncoder;
- if (benc != null)
- benc.CurrentReaderSession = session;
+ byte [] buffer = ReadSizedChunk ();
+ var ms = new MemoryStream (buffer, 0, buffer.Length);
// FIXME: supply maxSizeOfHeaders.
- Message msg = Encoder.ReadMessage (s, 0x10000);
- if (benc != null)
- benc.CurrentReaderSession = null;
- if (s.ReadByte () != UnsizedMessageTerminator)
- throw new InvalidOperationException ("Unsized message terminator is expected");
+ Message msg = Encoder.ReadMessage (ms, 0x10000);
return msg;
}
+ public void ReadUnsizedMessageTerminator (TimeSpan timeout)
+ {
+ var terminator = s.ReadByte ();
+ if (terminator != UnsizedMessageTerminator)
+ throw new InvalidOperationException (String.Format ("Unsized message terminator is expected. Got '{0}' (&#x{1:X};).", (char) terminator, terminator));
+ }
+
byte [] eof_buffer = new byte [1];
MyXmlBinaryWriterSession writer_session;
int sizeOfLength = writer.GetSizeOfLength (msda.Length);
writer.WriteVariableInt (length + sizeOfLength); // dictionary array also involves the size of itself.
- WriteSizedChunk (msda);
+ WriteSizedChunk (msda, 0, msda.Length);
// message body
var arr = ms.GetBuffer ();
writer.Write (arr, 0, (int) ms.Position);
{
ResetWriteBuffer ();
- if (EncodingRecord != 8)
+ if (EncodingRecord != EncodingBinary)
throw new NotImplementedException (String.Format ("Message encoding {0:X} is not implemented yet", EncodingRecord));
- buffer.WriteByte (UnsizedEnvelopeRecord);
-
- MemoryStream ms = new MemoryStream ();
- var session = writer_session ?? new MyXmlBinaryWriterSession ();
- writer_session = session;
- int writer_session_count = session.List.Count;
- var benc = Encoder as BinaryMessageEncoder;
- try {
- if (benc != null)
- benc.CurrentWriterSession = session;
- Encoder.WriteMessage (message, ms);
- } finally {
- benc.CurrentWriterSession = null;
- }
-
- // dictionary
- MemoryStream msd = new MemoryStream ();
- BinaryWriter dw = new BinaryWriter (msd);
- for (int i = writer_session_count; i < session.List.Count; i++)
- dw.Write (session.List [i].Value);
- dw.Flush ();
-
- int length = (int) (msd.Position + ms.Position);
- var msda = msd.ToArray ();
-
- writer.Write (msda, 0, msda.Length);
- // message body
- var arr = ms.GetBuffer ();
- writer.Write (arr, 0, (int) ms.Position);
-
- writer.Write (UnsizedMessageTerminator); // terminator
- writer.Flush ();
+ s.WriteByte (UnsizedEnvelopeRecord);
+ s.Flush ();
- // FIXME: it should be rewritten to directly write to the stream.
+ Encoder.WriteMessage (message, buffer);
+ new MyBinaryWriter (s).WriteVariableInt ((int) buffer.Position);
s.Write (buffer.GetBuffer (), 0, (int) buffer.Position);
s.Flush ();
}
+ // FIXME: handle timeout
+ public void WriteUnsizedMessageTerminator (TimeSpan timeout)
+ {
+ s.WriteByte (UnsizedMessageTerminator); // terminator
+ s.Flush ();
+ }
+
public void ProcessEndRecordInitiator ()
{
s.WriteByte (EndRecord); // it is required
--- /dev/null
+//
+// HttpReplyChannel.cs
+//
+// Author:
+// Atsushi Enomoto <atsushi@ximian.com>
+//
+// Copyright (C) 2006 Novell, Inc. http://www.novell.com
+//
+// Permission is hereby granted, free of charge, to any person obtaining
+// a copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to
+// permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.ServiceModel;
+using System.Text;
+using System.Threading;
+
+namespace System.ServiceModel.Channels
+{
+ internal class TcpReplyChannel : ReplyChannelBase
+ {
+ TcpClient client;
+ TcpChannelInfo info;
+ TcpBinaryFrameManager frame;
+
+ public TcpReplyChannel (ChannelListenerBase listener, TcpChannelInfo info, TcpClient client)
+ : base (listener)
+ {
+ this.client = client;
+ this.info = info;
+ }
+
+ public MessageEncoder Encoder {
+ get { return info.MessageEncoder; }
+ }
+
+ public override RequestContext ReceiveRequest (TimeSpan timeout)
+ {
+ if (timeout <= TimeSpan.Zero)
+ throw new ArgumentException (String.Format ("Timeout value must be positive value. It was {0}", timeout));
+ var msg = frame.ReadUnsizedMessage (timeout);
+ return new TcpRequestContext (this, msg);
+ }
+
+ class TcpRequestContext : InternalRequestContext
+ {
+ public TcpRequestContext (TcpReplyChannel owner, Message request)
+ : base (owner.Manager)
+ {
+ this.owner = owner;
+ this.request = request;
+ }
+
+ TcpReplyChannel owner;
+ Message request;
+
+ public override Message RequestMessage {
+ get { return request; }
+ }
+
+ public override void Abort ()
+ {
+ Close (TimeSpan.Zero);
+ }
+
+ public override void Close (TimeSpan timeout)
+ {
+ }
+
+ public override void Reply (Message message, TimeSpan timeout)
+ {
+ DateTime start = DateTime.Now;
+ owner.frame.WriteUnsizedMessage (message, timeout);
+ owner.frame.ReadUnsizedMessageTerminator (timeout - (DateTime.Now - start));
+ owner.frame.ProcessEndRecordRecipient ();
+ }
+ }
+
+ public override bool TryReceiveRequest (TimeSpan timeout, out RequestContext context)
+ {
+ try {
+ DateTime start = DateTime.Now;
+ context = ReceiveRequest (timeout);
+ if (context != null)
+ return true;
+ // received EndRecord, so close the session and return false instead.
+ // (Closing channel here might not be a good idea, but right now I have no better way.)
+ Close (timeout - (DateTime.Now - start));
+ return false;
+ } catch (TimeoutException) {
+ context = null;
+ return false;
+ }
+ }
+
+ public override bool WaitForRequest (TimeSpan timeout)
+ {
+ throw new NotImplementedException ();
+ }
+
+ public override EndpointAddress LocalAddress {
+ get { throw new NotImplementedException (); }
+ }
+
+ protected override void OnClose (TimeSpan timeout)
+ {
+ if (client != null)
+ client.Close ();
+ }
+
+ protected override void OnOpen (TimeSpan timeout)
+ {
+ NetworkStream ns = client.GetStream ();
+ frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.SingletonUnsizedMode, ns, true) { Encoder = this.Encoder, EncodingRecord = TcpBinaryFrameManager.EncodingBinary };
+ frame.ProcessPreambleRecipient ();
+ frame.ProcessPreambleAckRecipient ();
+ }
+ }
+}
NetworkStream ns = client.GetStream ();
frame = new TcpBinaryFrameManager (TcpBinaryFrameManager.SingletonUnsizedMode, ns, false) {
Encoder = this.Encoder,
- Via = RemoteAddress.Uri };
+ Via = RemoteAddress.Uri,
+ EncodingRecord = TcpBinaryFrameManager.EncodingBinary };
frame.ProcessPreambleInitiator ();
frame.ProcessPreambleAckInitiator ();
}
public override Message Request (Message input, TimeSpan timeout)
{
DateTime start = DateTime.Now;
+
+ if (input.Headers.To == null)
+ input.Headers.To = RemoteAddress.Uri;
+
frame.WriteUnsizedMessage (input, timeout);
var ret = frame.ReadUnsizedMessage (timeout - (DateTime.Now - start));
+ frame.WriteUnsizedMessageTerminator (timeout - (DateTime.Now - start));
frame.ProcessEndRecordInitiator ();
- frame.ProcessEndRecordRecipient (); // process *both*.
return ret;
}
}
System.ServiceModel.Channels/TcpChannelListener.cs
System.ServiceModel.Channels/TcpConnectionPoolSettings.cs
System.ServiceModel.Channels/TcpDuplexSessionChannel.cs
+System.ServiceModel.Channels/TcpReplyChannel.cs
System.ServiceModel.Channels/TcpRequestChannel.cs
System.ServiceModel.Channels/TcpTransportBindingElement.cs
System.ServiceModel.Channels/TextMessageEncoder.cs