2009-08-05 Atsushi Enomoto <atsushi@ximian.com>
authorAtsushi Eno <atsushieno@gmail.com>
Wed, 5 Aug 2009 06:31:23 +0000 (06:31 -0000)
committerAtsushi Eno <atsushieno@gmail.com>
Wed, 5 Aug 2009 06:31:23 +0000 (06:31 -0000)
* PeerDuplexChannel.cs : ongoing listener refactoring. Now it uses
  ServiceHost to process neighbor connection (not sure if this is
  right way to go though ...).

svn path=/trunk/mcs/; revision=139391

mcs/class/System.ServiceModel/System.ServiceModel.Channels/ChangeLog
mcs/class/System.ServiceModel/System.ServiceModel.Channels/PeerDuplexChannel.cs

index bac4620f8a7e97b3731ce96e36cd524def2d19c1..3ffcc4eedc66540b55b4ff6d724f3ecdbfb626a0 100755 (executable)
@@ -1,3 +1,9 @@
+2009-08-05  Atsushi Enomoto  <atsushi@ximian.com>
+
+       * PeerDuplexChannel.cs : ongoing listener refactoring. Now it uses
+         ServiceHost to process neighbor connection (not sure if this is
+         right way to go though ...).
+
 2009-07-31  Atsushi Enomoto  <atsushi@ximian.com>
 
        * PeerDuplexChannel.cs : ongoing [MC-PRCH] implementation. Rewrote
index fb8ff097cdcd022fbd3f1ce0ad9e6247d12d735a..efdbb60c9025177d1c1ac38932b1a6b62ce216a7 100755 (executable)
@@ -69,6 +69,46 @@ namespace System.ServiceModel.Channels
                        public IPeerConnectorClient Channel { get; set; }
                }
 
+               class LocalPeerReceiver : IPeerReceiverContract
+               {
+                       public LocalPeerReceiver (PeerDuplexChannel owner)
+                       {
+                               this.owner = owner;
+                       }
+
+                       PeerDuplexChannel owner;
+
+                       public void Connect (ConnectInfo connect)
+                       {
+                               if (connect == null)
+                                       throw new ArgumentNullException ("connect");
+try {
+                               var ch = OperationContext.Current.GetCallbackChannel<IPeerConnectorContract> ();
+                               // FIXME: check and reject if inappropriate.
+                               ch.Welcome (new WelcomeInfo () { NodeId = connect.NodeId });
+
+} catch (Exception ex) {
+Console.WriteLine ("Exception during Connect()");
+Console.WriteLine (ex);
+throw;
+}
+
+                       }
+
+                       public void Welcome (WelcomeInfo welcome)
+                       {
+                       }
+
+                       public void Refuse (RefuseInfo refuse)
+                       {
+                       }
+
+                       public void SendMessage (Message msg)
+                       {
+                               owner.EnqueueMessage (msg);
+                       }
+               }
+
                interface IPeerConnectorClient : IClientChannel, IPeerConnectorContract
                {
                }
@@ -78,7 +118,7 @@ namespace System.ServiceModel.Channels
                PeerTransportBindingElement binding;
                PeerResolver resolver;
                PeerNode node;
-               IChannelListener<IDuplexSessionChannel> channel_listener;
+               ServiceHost listener_host;
                TcpChannelInfo info;
                List<RemotePeerConnection> peers = new List<RemotePeerConnection> ();
 
@@ -123,7 +163,7 @@ namespace System.ServiceModel.Channels
                        }
 
                        // FIXME: EndpointAddress must be "net.p2p://{meshId}", eliminating the remaining path.
-                       return channel_factory.CreateChannel (RemoteAddress ?? LocalAddress, pna.EndpointAddress.Uri);
+                       return channel_factory.CreateChannel (new EndpointAddress ("net.p2p://" + node.MeshId), pna.EndpointAddress.Uri);
                }
 
                public override void Send (Message message, TimeSpan timeout)
@@ -149,18 +189,28 @@ namespace System.ServiceModel.Channels
                        }
                }
 
+               internal void EnqueueMessage (Message message)
+               {
+Console.WriteLine ("###########################");
+var mb = message.CreateBufferedCopy (0x10000);
+Console.WriteLine (mb.CreateMessage ());
+message = mb.CreateMessage ();
+                       queue.Enqueue (message);
+                       receive_handle.Set ();
+               }
+
+               Queue<Message> queue = new Queue<Message> ();
+               AutoResetEvent receive_handle = new AutoResetEvent (false);
+
                public override Message Receive (TimeSpan timeout)
                {
                        ThrowIfDisposedOrNotOpen ();
                        DateTime start = DateTime.Now;
 
-                       var ch = channel_listener.AcceptChannel (timeout);
-                       ch.Open (timeout - (DateTime.Now - start));
-                       try {
-                               return ch.Receive (timeout - (DateTime.Now - start));
-                       } finally {
-                               ch.Close ();
-                       }
+                       if (queue.Count > 0)
+                               return queue.Dequeue ();
+                       receive_handle.WaitOne ();
+                       return queue.Dequeue ();
                }
 
                public override bool WaitForMessage (TimeSpan timeout)
@@ -189,8 +239,8 @@ namespace System.ServiceModel.Channels
                        peers.Clear ();
                        resolver.Unregister (node.RegisteredId, timeout - (DateTime.Now - start));
                        node.SetOffline ();
-                       if (channel_listener != null)
-                               channel_listener.Close (timeout - (DateTime.Now - start));
+                       if (listener_host != null)
+                               listener_host.Close (timeout - (DateTime.Now - start));
                        node.RegisteredId = null;
                }
 
@@ -226,8 +276,15 @@ namespace System.ServiceModel.Channels
                        string name = Dns.GetHostName ();
                        var uri = new Uri ("net.tcp://" + name + ":" + port + "/PeerChannelEndpoints/" + Guid.NewGuid ());
 
-                       channel_listener = binding.BuildChannelListener<IDuplexSessionChannel> (uri, new object [0]);
-                       channel_listener.Open (timeout - (DateTime.Now - start));
+                       var peer_receiver = new LocalPeerReceiver (this);
+                       listener_host = new ServiceHost (peer_receiver);
+                       var sba = listener_host.Description.Behaviors.Find<ServiceBehaviorAttribute> ();
+                       sba.InstanceContextMode = InstanceContextMode.Single;
+                       sba.IncludeExceptionDetailInFaults = true;
+
+                       var se = listener_host.AddServiceEndpoint (typeof (IPeerReceiverContract), binding, "net.p2p://" + node.MeshId);
+                       se.ListenUri = uri;
+                       listener_host.Open (timeout - (DateTime.Now - start));
 
                        var nid = new Random ().Next (0, int.MaxValue);
                        var ea = new EndpointAddress (uri);