public IPeerConnectorClient Channel { get; set; }
}
+ class LocalPeerReceiver : IPeerReceiverContract
+ {
+ public LocalPeerReceiver (PeerDuplexChannel owner)
+ {
+ this.owner = owner;
+ }
+
+ PeerDuplexChannel owner;
+
+ public void Connect (ConnectInfo connect)
+ {
+ if (connect == null)
+ throw new ArgumentNullException ("connect");
+try {
+ var ch = OperationContext.Current.GetCallbackChannel<IPeerConnectorContract> ();
+ // FIXME: check and reject if inappropriate.
+ ch.Welcome (new WelcomeInfo () { NodeId = connect.NodeId });
+
+} catch (Exception ex) {
+Console.WriteLine ("Exception during Connect()");
+Console.WriteLine (ex);
+throw;
+}
+
+ }
+
+ public void Welcome (WelcomeInfo welcome)
+ {
+ }
+
+ public void Refuse (RefuseInfo refuse)
+ {
+ }
+
+ public void SendMessage (Message msg)
+ {
+ owner.EnqueueMessage (msg);
+ }
+ }
+
interface IPeerConnectorClient : IClientChannel, IPeerConnectorContract
{
}
PeerTransportBindingElement binding;
PeerResolver resolver;
PeerNode node;
- IChannelListener<IDuplexSessionChannel> channel_listener;
+ ServiceHost listener_host;
TcpChannelInfo info;
List<RemotePeerConnection> peers = new List<RemotePeerConnection> ();
}
// FIXME: EndpointAddress must be "net.p2p://{meshId}", eliminating the remaining path.
- return channel_factory.CreateChannel (RemoteAddress ?? LocalAddress, pna.EndpointAddress.Uri);
+ return channel_factory.CreateChannel (new EndpointAddress ("net.p2p://" + node.MeshId), pna.EndpointAddress.Uri);
}
public override void Send (Message message, TimeSpan timeout)
}
}
+ internal void EnqueueMessage (Message message)
+ {
+Console.WriteLine ("###########################");
+var mb = message.CreateBufferedCopy (0x10000);
+Console.WriteLine (mb.CreateMessage ());
+message = mb.CreateMessage ();
+ queue.Enqueue (message);
+ receive_handle.Set ();
+ }
+
+ Queue<Message> queue = new Queue<Message> ();
+ AutoResetEvent receive_handle = new AutoResetEvent (false);
+
public override Message Receive (TimeSpan timeout)
{
ThrowIfDisposedOrNotOpen ();
DateTime start = DateTime.Now;
- var ch = channel_listener.AcceptChannel (timeout);
- ch.Open (timeout - (DateTime.Now - start));
- try {
- return ch.Receive (timeout - (DateTime.Now - start));
- } finally {
- ch.Close ();
- }
+ if (queue.Count > 0)
+ return queue.Dequeue ();
+ receive_handle.WaitOne ();
+ return queue.Dequeue ();
}
public override bool WaitForMessage (TimeSpan timeout)
peers.Clear ();
resolver.Unregister (node.RegisteredId, timeout - (DateTime.Now - start));
node.SetOffline ();
- if (channel_listener != null)
- channel_listener.Close (timeout - (DateTime.Now - start));
+ if (listener_host != null)
+ listener_host.Close (timeout - (DateTime.Now - start));
node.RegisteredId = null;
}
string name = Dns.GetHostName ();
var uri = new Uri ("net.tcp://" + name + ":" + port + "/PeerChannelEndpoints/" + Guid.NewGuid ());
- channel_listener = binding.BuildChannelListener<IDuplexSessionChannel> (uri, new object [0]);
- channel_listener.Open (timeout - (DateTime.Now - start));
+ var peer_receiver = new LocalPeerReceiver (this);
+ listener_host = new ServiceHost (peer_receiver);
+ var sba = listener_host.Description.Behaviors.Find<ServiceBehaviorAttribute> ();
+ sba.InstanceContextMode = InstanceContextMode.Single;
+ sba.IncludeExceptionDetailInFaults = true;
+
+ var se = listener_host.AddServiceEndpoint (typeof (IPeerReceiverContract), binding, "net.p2p://" + node.MeshId);
+ se.ListenUri = uri;
+ listener_host.Open (timeout - (DateTime.Now - start));
var nid = new Random ().Next (0, int.MaxValue);
var ea = new EndpointAddress (uri);