//
using System;
using System.Collections.Generic;
+using System.Collections.ObjectModel;
using System.IO;
using System.Net;
using System.Net.Security;
+using System.Net.Sockets;
using System.ServiceModel;
using System.ServiceModel.Description;
using System.ServiceModel.Security;
EndpointAddress local_address;
PeerResolver resolver;
PeerNode node;
+ TcpListener listener;
+ TcpChannelInfo info;
+ List<PeerNodeAddress> peers = new List<PeerNodeAddress> ();
public PeerDuplexChannel (IPeerChannelManager factory, EndpointAddress address, Uri via, PeerResolver resolver)
: base ((ChannelFactoryBase) factory, address, via)
{
binding = factory.Source;
this.resolver = factory.Resolver;
+ info = new TcpChannelInfo (binding, factory.MessageEncoder, null); // FIXME: fill properties correctly.
// It could be opened even with empty list of PeerNodeAddresses.
// So, do not create PeerNode per PeerNodeAddress, but do it with PeerNodeAddress[].
- node = new PeerNodeImpl (resolver, RemoteAddress, factory.Source.ListenIPAddress, factory.Source.Port);
+ node = new PeerNodeImpl (RemoteAddress, factory.Source.ListenIPAddress, factory.Source.Port);
}
// FIXME: receive local_address too
{
binding = listener.Source;
this.resolver = listener.Resolver;
- // FIXME: set resolver and node.
+ info = new TcpChannelInfo (binding, listener.MessageEncoder, null); // FIXME: fill properties correctly.
- node = new PeerNodeImpl (resolver, null, listener.Source.ListenIPAddress, listener.Source.Port);
+ node = new PeerNodeImpl (null, listener.Source.ListenIPAddress, listener.Source.Port);
}
public override EndpointAddress LocalAddress {
// DuplexChannelBase
+ TcpDuplexSessionChannel CreateInnerChannel (PeerNodeAddress pna)
+ {
+ var cfb = Manager as ChannelFactoryBase;
+ if (cfb != null)
+ return new TcpDuplexSessionChannel (cfb, info, pna.EndpointAddress, Via);
+ else
+ return new TcpDuplexSessionChannel ((ChannelListenerBase) Manager, info, listener.AcceptTcpClient ());
+ }
+
public override void Send (Message message, TimeSpan timeout)
{
- throw new NotImplementedException ();
+ ThrowIfDisposedOrNotOpen ();
+
+ DateTime start = DateTime.Now;
+
+ foreach (var pna in peers) {
+ var inner = CreateInnerChannel (pna);
+ inner.Open (timeout - (DateTime.Now - start));
+ inner.Send (message, timeout);
+ }
}
public override Message Receive (TimeSpan timeout)
{
+ ThrowIfDisposedOrNotOpen ();
+
throw new NotImplementedException ();
}
public override bool WaitForMessage (TimeSpan timeout)
{
+ ThrowIfDisposedOrNotOpen ();
+
throw new NotImplementedException ();
}
// CommunicationObject
- [MonoTODO]
protected override void OnAbort ()
{
- throw new NotImplementedException ();
+ OnClose (TimeSpan.Zero);
}
protected override void OnClose (TimeSpan timeout)
{
- node.Close (timeout);
+ DateTime start = DateTime.Now;
+ peers.Clear ();
+ resolver.Unregister (node.RegisteredId, timeout - (DateTime.Now - start));
+ node.SetOffline ();
+ if (listener != null)
+ listener.Stop ();
+ node.RegisteredId = null;
}
- // At some stage I should unify this class with PeerOutputChannel (and probably PeerInputChannel). Too much duplicate.
protected override void OnOpen (TimeSpan timeout)
{
- node.Open (timeout);
+ DateTime start = DateTime.Now;
+
+ // FIXME: supply maxAddresses
+ peers.AddRange (resolver.Resolve (node.MeshId, 3, timeout));
+
+ listener = node.GetTcpListener ();
+ var ep = (IPEndPoint) listener.LocalEndpoint;
+ string name = Dns.GetHostName ();
+ var nid = new Random ().Next (0, int.MaxValue);
+ var ea = new EndpointAddress ("net.tcp://" + name + ":" + ep.Port + "/PeerChannelEndpoints/" + Guid.NewGuid ());
+ var pna = new PeerNodeAddress (ea, new ReadOnlyCollection<IPAddress> (Dns.GetHostEntry (ep.Address).AddressList));
+ node.RegisteredId = resolver.Register (node.MeshId, pna, timeout - (DateTime.Now - start));
+ node.NodeId = nid;
+
+ node.SetOnline ();
}
}
}
internal int NodeId { get; set; }
- internal abstract bool IsOpen { get; }
+ internal bool IsOpen {
+ get { return RegisteredId != null; }
+ }
+
+ internal object RegisteredId { get; set; }
public int Port { get; private set; }
public abstract PeerMessagePropagationFilter MessagePropagationFilter { get; set; }
- internal abstract void Open (TimeSpan timeout);
- internal abstract void Close (TimeSpan timeout);
+ internal abstract TcpListener GetTcpListener ();
public void RefreshConnection ()
{
public PeerNodeAddress Address { get; set; }
}
- internal PeerNodeImpl (PeerResolver resolver, EndpointAddress remoteAddress, IPAddress fixedListenAddress, int port)
+ internal PeerNodeImpl (EndpointAddress remoteAddress, IPAddress fixedListenAddress, int port)
: base (remoteAddress.Uri.Host, port)
{
- this.resolver = resolver;
this.listen_address = fixedListenAddress ?? IPAddress.Any;
this.remote_address = remoteAddress;
}
- PeerResolver resolver;
EndpointAddress remote_address;
IPAddress listen_address;
- object registered_id;
- TcpListener listener;
// FIXME: implement
public override PeerMessagePropagationFilter MessagePropagationFilter { get; set; }
- internal override bool IsOpen {
- get { return registered_id != null; }
- }
-
- internal override void Open (TimeSpan timeout)
+ internal override TcpListener GetTcpListener ()
{
- DateTime startTime = DateTime.Now;
-
int p = Port;
var rnd = Port != 0 ? null : new Random ();
+ TcpListener listener = null;
while (p < 51000) {
if (rnd != null)
p = rnd.Next (50000, 51000); // This range is picked up sorta randomly.
}
if (listener == null)
throw new InvalidOperationException (String.Format ("The specified endpoint is available for a peer node to listen. IP address: {0}, Port {1}", listen_address, Port));
- var ep = (IPEndPoint) listener.LocalEndpoint;
- string name = Dns.GetHostName ();
- var nid = new Random ().Next (0, int.MaxValue);
- var pna = new PeerNodeAddress (new EndpointAddress ("net.tcp://" + name + ":" + ep.Port + "/PeerChannelEndpoints/" + Guid.NewGuid ()), new ReadOnlyCollection<IPAddress> (Dns.GetHostEntry (listen_address).AddressList));
- registered_id = resolver.Register (MeshId, pna, timeout - (DateTime.Now - startTime));
- NodeId = nid;
- SetOnline ();
- }
-
- internal override void Close (TimeSpan timeout)
- {
- resolver.Unregister (registered_id, timeout);
- SetOffline ();
- if (listener != null)
- listener.Stop ();
- registered_id = null;
+ return listener;
}
}
}