1 //------------------------------------------------------------
2 // Copyright (c) Microsoft Corporation. All rights reserved.
3 //------------------------------------------------------------
4 namespace System.ServiceModel.Channels
6 using System.Collections.Generic;
7 using System.Collections.ObjectModel;
8 using System.Diagnostics;
11 using System.ServiceModel;
12 using System.ServiceModel.Description;
13 using System.ServiceModel.Diagnostics;
14 using System.ServiceModel.Dispatcher;
15 using System.Threading;
17 // Neighbor Manager is responsible for managing a set of neighbors for a node.
18 class PeerNeighborManager
20 public event EventHandler<PeerNeighborCloseEventArgs> NeighborClosed;
21 public event EventHandler<PeerNeighborCloseEventArgs> NeighborClosing;
22 public event EventHandler NeighborConnected;
23 public event EventHandler NeighborOpened;
24 public event EventHandler Offline;
25 public event EventHandler Online;
27 // Delegate to determine if neighbor manager is closing or closed
28 delegate bool ClosedCallback();
39 PeerNodeConfig config;
42 // Contains the neighbors in connected state
43 // We maintain a connectedNeighborList in addition to neighborList for two reasons:
44 // (a) Several operations are on connected neighbors
45 // (b) To correctly handle online/offline conditions
47 List<IPeerNeighbor> connectedNeighborList;
50 PeerIPHelper ipHelper;
51 List<PeerNeighbor> neighborList; // contains all the neighbors known to neighbor manager
52 ManualResetEvent shutdownEvent;
55 PeerNodeTraceRecord traceRecord;
57 Binding serviceBinding;
58 IPeerNodeMessageHandling messageHandler;
60 public PeerNeighborManager(PeerIPHelper ipHelper, PeerNodeConfig config)
62 this(ipHelper, config, null) { }
63 public PeerNeighborManager(PeerIPHelper ipHelper, PeerNodeConfig config, IPeerNodeMessageHandling messageHandler)
65 Fx.Assert(ipHelper != null, "Non-null ipHelper is expected");
66 Fx.Assert(config != null, "Non-null Config is expected");
68 this.neighborList = new List<PeerNeighbor>();
69 this.connectedNeighborList = new List<IPeerNeighbor>();
70 this.ipHelper = ipHelper;
71 this.messageHandler = messageHandler;
73 this.thisLock = new object();
74 this.traceRecord = new PeerNodeTraceRecord(config.NodeId);
75 this.state = State.Created;
78 // Returns the count of connected neighbors
79 public int ConnectedNeighborCount
83 return this.connectedNeighborList.Count;
87 public int NonClosingNeighborCount
92 foreach (PeerNeighbor neighbor in this.connectedNeighborList)
94 if (!neighbor.IsClosing) count++;
100 // Returns true if Neighbor Manager is online
101 // (i.e., has one or more connected neighbors)
106 return this.isOnline;
110 // Returns the count of connected neighbors
111 public int NeighborCount
115 return this.neighborList.Count;
123 return this.thisLock;
127 // Ungracefully shutdown the neighbor manager
128 void Abort(PeerNeighbor[] neighbors)
130 foreach (PeerNeighbor neighbor in neighbors)
131 neighbor.Abort(PeerCloseReason.LeavingMesh, PeerCloseInitiator.LocalNode);
134 public IAsyncResult BeginOpenNeighbor(PeerNodeAddress remoteAddress, TimeSpan timeout, AsyncCallback callback, object asyncState)
138 // It's okay if neighbor manager is shutdown and closed after the above check
139 // because the new neighbor is only added to neighborList in NeighborOpened
140 // handler if the neighbor manager is still open.
142 // Sort the IP addresses
143 ReadOnlyCollection<IPAddress> sortedAddresses = this.ipHelper.SortAddresses(remoteAddress.IPAddresses);
144 PeerNodeAddress address = new PeerNodeAddress(remoteAddress.EndpointAddress, sortedAddresses);
145 return BeginOpenNeighborInternal(address, timeout, callback, asyncState);
148 internal IAsyncResult BeginOpenNeighborInternal(PeerNodeAddress remoteAddress, TimeSpan timeout, AsyncCallback callback, object asyncState)
150 PeerNeighbor neighbor = new PeerNeighbor(this.config, this.messageHandler);
151 RegisterForNeighborEvents(neighbor);
153 return new NeighborOpenAsyncResult(neighbor, remoteAddress, this.serviceBinding, this.service,
154 new ClosedCallback(Closed), timeout, callback, asyncState);
157 // Cleanup after shutdown
158 void Cleanup(bool graceful)
162 // In case of g----ful shutdown, we wait for neighbor list to become empty. connectedNeighborList should become
166 Fx.Assert(this.neighborList.Count == 0, "neighbor count should be 0");
167 Fx.Assert(this.connectedNeighborList.Count == 0, "Connected neighbor count should be 0");
169 // shutdownEvent is only relevant for a g----ful close. And should be closed by the thread
170 // performing g----ful close
171 if (this.shutdownEvent != null)
172 this.shutdownEvent.Close();
174 this.state = State.Shutdown;
178 // To clear the neighbor lists in case of unexpected exceptions during shutdown
179 void ClearNeighborList()
183 this.neighborList.Clear();
184 this.connectedNeighborList.Clear();
188 // Close the neighbor manager. It should be called after Shutdown().
189 // Can also be called before Open.
194 this.state = State.Closed;
198 // Returns true if neighbor manager is closing or closed
201 return this.state != State.Opened;
205 // Close the specified neighbor. Ok to call multiple times, but NeighborClosing
206 // and NeighborClosed events are fired just once.
207 // If the closeReason specified is InvalidNeighbor, it will be closed ungracefully
209 public void CloseNeighbor(IPeerNeighbor neighbor, PeerCloseReason closeReason,
210 PeerCloseInitiator closeInitiator)
212 CloseNeighbor(neighbor, closeReason, closeInitiator, null);
215 public void CloseNeighbor(IPeerNeighbor neighbor, PeerCloseReason closeReason,
216 PeerCloseInitiator closeInitiator, Exception closeException)
218 PeerNeighbor nbr = (PeerNeighbor)neighbor;
222 if (!(this.state != State.Created))
224 throw Fx.AssertAndThrow("Neighbor Manager is not expected to be in Created state");
227 // Check that the neighbor is known to neighbor manager
228 if (!this.neighborList.Contains(nbr))
232 // initiate closing of the neighbor
233 if (closeReason != PeerCloseReason.InvalidNeighbor)
236 InvokeAsyncNeighborClose(nbr, closeReason, closeInitiator, closeException, null);
238 else // Call abort even if neighbor is already closing
240 nbr.Abort(closeReason, closeInitiator);
244 public IPeerNeighbor EndOpenNeighbor(IAsyncResult result)
246 return NeighborOpenAsyncResult.End(result);
249 static void FireEvent(EventHandler handler, PeerNeighborManager manager)
252 handler(manager, EventArgs.Empty);
255 static void FireEvent(EventHandler handler, PeerNeighbor neighbor)
258 handler(neighbor, EventArgs.Empty);
261 static void FireEvent(EventHandler<PeerNeighborCloseEventArgs> handler,
262 PeerNeighbor neighbor, PeerCloseReason closeReason,
263 PeerCloseInitiator closeInitiator, Exception closeException)
267 PeerNeighborCloseEventArgs args = new PeerNeighborCloseEventArgs(
268 closeReason, closeInitiator, closeException);
269 handler(neighbor, args);
273 // Find a duplicate neighbor matching the nodeId
274 public IPeerNeighbor FindDuplicateNeighbor(ulong nodeId)
276 return FindDuplicateNeighbor(nodeId, null);
279 // Find a duplicate neighbor (excluding skipNeighbor) matching the nodeID.
280 public IPeerNeighbor FindDuplicateNeighbor(ulong nodeId, IPeerNeighbor skipNeighbor)
282 PeerNeighbor duplicateNeighbor = null;
284 if (nodeId != PeerTransportConstants.InvalidNodeId)
288 foreach (PeerNeighbor neighbor in this.neighborList)
290 // We restrict search to neighbors that are not yet closing.
291 if (neighbor != (PeerNeighbor)skipNeighbor && neighbor.NodeId == nodeId &&
292 !neighbor.IsClosing &&
293 neighbor.State < PeerNeighborState.Disconnecting)
295 duplicateNeighbor = neighbor;
301 return duplicateNeighbor;
304 public bool PingNeighbor(IPeerNeighbor peer)
307 Message message = Message.CreateMessage(MessageVersion.Soap12WSAddressing10, PeerStrings.PingAction);
314 if (Fx.IsFatal(e)) throw;
315 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
316 peer.Abort(PeerCloseReason.InternalFailure, PeerCloseInitiator.LocalNode);
323 public void PingNeighbors()
325 List<IPeerNeighbor> neighbors = GetConnectedNeighbors();
326 foreach (IPeerNeighbor neighbor in neighbors)
328 PingNeighbor(neighbor);
332 // Find a duplicate neighbor matching the address
333 public IPeerNeighbor FindDuplicateNeighbor(PeerNodeAddress address)
335 return FindDuplicateNeighbor(address, null);
338 // Find a duplicate neighbor (excluding skipNeighbor) matching the address.
339 public IPeerNeighbor FindDuplicateNeighbor(PeerNodeAddress address, IPeerNeighbor skipNeighbor)
341 PeerNeighbor duplicateNeighbor = null;
345 foreach (PeerNeighbor neighbor in this.neighborList)
347 // We restrict search to neighbors that are not yet closing.
348 if (neighbor != (PeerNeighbor)skipNeighbor &&
349 neighbor.ListenAddress != null &&
350 neighbor.ListenAddress.ServicePath == address.ServicePath &&
351 !neighbor.IsClosing &&
352 neighbor.State < PeerNeighborState.Disconnecting)
354 duplicateNeighbor = neighbor;
359 return duplicateNeighbor;
362 // Returns a copy of the list of connected neighbors.
363 public List<IPeerNeighbor> GetConnectedNeighbors()
367 return new List<IPeerNeighbor>(this.connectedNeighborList);
371 // Used to retrieve a neighbor given the proxy.
372 // Maps the proxy from the incoming message's service context to a neighbor instance.
373 public IPeerNeighbor GetNeighborFromProxy(IPeerProxy proxy)
375 PeerNeighbor neighbor = null;
379 if (state == State.Opened)
381 // Find the neighbor containing the specified proxy.
382 foreach (PeerNeighbor nbr in this.neighborList)
384 if (nbr.Proxy == proxy)
396 // Calls neighbor.BeginClose or EndClose and catches appropriate exceptions for any cleanup.
397 // We use a single method for both BeginClose and EndClose processing since exception handling
398 // is very similar in both cases.
399 void InvokeAsyncNeighborClose(PeerNeighbor neighbor, PeerCloseReason closeReason,
400 PeerCloseInitiator closeInitiator, Exception closeException, IAsyncResult endResult)
402 // initiate invoking BeginClose or EndClose
405 if (endResult == null)
407 IAsyncResult beginResult = neighbor.BeginClose(closeReason, closeInitiator,
408 closeException, Fx.ThunkCallback(new AsyncCallback(OnNeighborClosedCallback)), neighbor);
409 if (beginResult.CompletedSynchronously)
410 neighbor.EndClose(beginResult);
414 neighbor.EndClose(endResult);
419 if (Fx.IsFatal(e)) throw;
420 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
422 neighbor.TraceEventHelper(TraceEventType.Warning, TraceCode.PeerNeighborCloseFailed, SR.GetString(SR.TraceCodePeerNeighborCloseFailed), e);
423 // May get InvalidOperationException or ObjectDisposedException due to simultaneous close from both sides (and autoclose is enabled)
424 if (e is InvalidOperationException || e is CommunicationException || e is TimeoutException)
436 // Handler for processing neighbor closed event.
438 // We should allow this event to be processed even if the neighbor manager is shutting
439 // down because neighbor manager will be waiting on the shutdown event which is set in
440 // this handler once the last neighbor's close event is processed.
442 void OnNeighborClosed(object source, EventArgs args)
444 RemoveNeighbor((PeerNeighbor)source);
447 // Callback that is invoked when BeginClose completes.
448 void OnNeighborClosedCallback(IAsyncResult result)
450 if (!result.CompletedSynchronously)
452 // Call neighbor.EndClose -- PeerCloseReason and PeerCloseInitiator are dummy values
453 InvokeAsyncNeighborClose((PeerNeighbor)result.AsyncState, PeerCloseReason.None,
454 PeerCloseInitiator.LocalNode, null, result);
458 // Handles neighbor disconnecting or disconnected events
459 void OnNeighborClosing(object source, EventArgs args)
462 // Remove the neighbor from connected list. But closed and offline events are
463 // fired upon processing closed event. If, due to thread scheduling issues,
464 // closed handler executes before this handler, it will have already done the
465 // work and Remove() below is a NOP.
469 this.connectedNeighborList.Remove((IPeerNeighbor)source);
473 // handler to process neighbor connected event
474 void OnNeighborConnected(object source, EventArgs args)
476 PeerNeighbor neighbor = (PeerNeighbor)source;
477 bool fireConnected = false;
478 bool fireOnline = false;
480 // we may get this event after the neighbor has been closed. So, we check to see if
481 // the neighbor exists in the neighbor list before processing the event.
484 if (this.neighborList.Contains(neighbor))
486 fireConnected = true;
488 // Add the neighbor to connected list and determine if online should be fired
489 this.connectedNeighborList.Add(neighbor);
493 this.isOnline = true;
501 FireEvent(NeighborConnected, neighbor);
504 neighbor.TraceEventHelper(TraceEventType.Warning, TraceCode.PeerNeighborNotFound, SR.GetString(SR.TraceCodePeerNeighborNotFound));
508 if (DiagnosticUtility.ShouldTraceInformation)
510 TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNeighborManagerOnline,
511 SR.GetString(SR.TraceCodePeerNeighborManagerOnline), this.traceRecord, this, null);
513 FireEvent(Online, this);
517 // handler to process neighbor opened event
518 void OnNeighborOpened(object source, EventArgs args)
520 PeerNeighbor neighbor = (PeerNeighbor)source;
525 // Add the neighbor to neighborList if neighbor manager is still open
526 if (this.state == State.Opened)
528 // StateManager assures that neighbor Opened and Closed events are
529 // serialized. So, we should never have to process a closed event handler
530 // before opened is complete.
531 if (!(neighbor.State == PeerNeighborState.Opened))
533 throw Fx.AssertAndThrow("Neighbor expected to be in Opened state");
535 this.neighborList.Add(neighbor);
541 FireEvent(NeighborOpened, neighbor);
543 else // close the neighbor ungracefully
546 neighbor.TraceEventHelper(TraceEventType.Warning, TraceCode.PeerNeighborNotAccepted, SR.GetString(SR.TraceCodePeerNeighborNotAccepted));
550 // Opens the neighbor manager. When this method returns the neighbor manager is ready
551 // to accept incoming neighbor requests and to establish outgoing neighbors.
552 public void Open(Binding serviceBinding, PeerService service)
554 Fx.Assert(serviceBinding != null, "serviceBinding must not be null");
555 Fx.Assert(service != null, "service must not be null");
559 this.service = service;
560 this.serviceBinding = serviceBinding;
561 if (!(this.state == State.Created))
563 throw Fx.AssertAndThrow("Neighbor Manager is expected to be in Created state");
565 this.state = State.Opened;
569 // Process an inbound channel
570 public bool ProcessIncomingChannel(IClientChannel channel)
572 bool accepted = false;
573 IPeerProxy proxy = (IPeerProxy)channel;
575 Fx.Assert(GetNeighborFromProxy(proxy) == null, "Channel should not map to an existing neighbor");
576 if (this.state == State.Opened)
578 // It is okay if neighbor manager is closed after the above check because the
579 // new neighbor is only added to neighborList in neighbor Opened handler if the
580 // neighbor manager is still open.
581 PeerNeighbor neighbor = new PeerNeighbor(this.config, this.messageHandler);
582 RegisterForNeighborEvents(neighbor);
583 neighbor.Open(proxy);
588 if (DiagnosticUtility.ShouldTraceWarning)
590 TraceUtility.TraceEvent(TraceEventType.Warning, TraceCode.PeerNeighborNotAccepted,
591 SR.GetString(SR.TraceCodePeerNeighborNotAccepted), this.traceRecord, this, null);
598 void RegisterForNeighborEvents(PeerNeighbor neighbor)
600 neighbor.Opened += OnNeighborOpened;
601 neighbor.Connected += OnNeighborConnected;
602 neighbor.Closed += OnNeighborClosed;
604 // We want the neighbor to call Closing handlers directly, so we delegate
605 neighbor.Closing += this.NeighborClosing;
607 // Disconnecting and Disconnected are treated the same way
608 neighbor.Disconnecting += OnNeighborClosing;
609 neighbor.Disconnected += OnNeighborClosing;
612 // Remove neighbor from the list and fire relevant events
613 void RemoveNeighbor(PeerNeighbor neighbor)
615 bool fireClosed = false;
616 bool fireOffline = false;
620 if (this.neighborList.Contains(neighbor))
624 // Remove neighbor from our lists and determine if offline should be fired.
625 this.neighborList.Remove(neighbor);
626 this.connectedNeighborList.Remove(neighbor);
627 if (this.isOnline && this.connectedNeighborList.Count == 0)
629 this.isOnline = false;
632 // If in the process of shutting down neighbor manager, signal completion
633 // upon closing of the last remaining neighbor
634 if (this.neighborList.Count == 0 && this.shutdownEvent != null)
636 this.shutdownEvent.Set();
644 FireEvent(NeighborClosed, neighbor, neighbor.CloseReason,
645 neighbor.CloseInitiator, neighbor.CloseException);
649 if (DiagnosticUtility.ShouldTraceWarning)
651 neighbor.TraceEventHelper(TraceEventType.Warning, TraceCode.PeerNeighborNotFound, SR.GetString(SR.TraceCodePeerNeighborNotFound));
656 if (DiagnosticUtility.ShouldTraceInformation)
658 TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNeighborManagerOffline,
659 SR.GetString(SR.TraceCodePeerNeighborManagerOffline), this.traceRecord, this, null);
661 FireEvent(Offline, this);
666 // Shutdown the neighbor manager. Shutdown should be called prior to Close(). It stops
667 // processing inbound neighbor sessions and closes all the neighbors. Outbound neighbor
668 // sessions are also disabled as a result of setting the state to ShuttingDown
669 // (and then Shutdown).
671 public void Shutdown(bool graceful, TimeSpan timeout)
673 PeerNeighbor[] neighbors = null;
674 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
680 if (this.state == State.Shutdown || this.state == State.Closed)
682 this.state = State.ShuttingDown;
684 // Create a copy of neighbor list in order to close neighbors outside lock.
685 neighbors = this.neighborList.ToArray();
687 // In case of g----ful shutdown, if there are any neighbors to close, create an event
688 // to wait until they are closed
689 if (graceful && neighbors.Length > 0)
690 this.shutdownEvent = new ManualResetEvent(false);
693 // Close each neighbor. Do this outside the lock due to Closing and Closed event handlers being invoked
695 Shutdown(neighbors, timeoutHelper.RemainingTime());
701 // Purge neighbor list in case of unexpected exceptions
702 if (Fx.IsFatal(e)) throw;
709 if (Fx.IsFatal(ee)) throw;
710 DiagnosticUtility.TraceHandledException(ee, TraceEventType.Information);
721 void Shutdown(PeerNeighbor[] neighbors, TimeSpan timeout)
723 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
724 foreach (PeerNeighbor neighbor in neighbors)
725 CloseNeighbor(neighbor, PeerCloseReason.LeavingMesh, PeerCloseInitiator.LocalNode, null);
727 // Wait for all the neighbors to close (the event object is set when the last
728 // neighbor is closed). Specify a timeout for wait event handle in case event.Set
729 // fails for some reason (it doesn't throw exception). Bail out of the loop when
730 // the neighbor count reaches 0. This ensures that Shutdown() doesn't hang.
731 if (neighbors.Length > 0)
733 if (!TimeoutHelper.WaitOne(this.shutdownEvent, timeoutHelper.RemainingTime()))
735 Abort(neighbors); // abort neighbors that haven't been closed yet
736 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException());
741 void ThrowIfNotOpen()
743 if (!(this.state != State.Created))
745 throw Fx.AssertAndThrow("Neighbor manager not expected to be in Created state");
749 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(this.ToString()));
753 class PeerNeighbor : IPeerNeighbor, IInputSessionShutdown
755 public event EventHandler Closed;
756 public event EventHandler<PeerNeighborCloseEventArgs> Closing;
757 public event EventHandler Connected;
758 public event EventHandler Disconnected;
759 public event EventHandler Disconnecting;
760 public event EventHandler Opened;
762 ChannelFactory<IPeerProxy> channelFactory;
764 // Used after closing the neighbor to find details of the close reason and who
765 // initiated closing.
766 Exception closeException;
767 PeerCloseInitiator closeInitiator;
768 PeerCloseReason closeReason;
770 PeerNodeConfig config;
771 IPAddress connectIPAddress; // relevant for initiator neighbor. Indicates the IP address used for connection
772 ExtensionCollection<IPeerNeighbor> extensions;
774 bool isClosing; // If true, the neighbor is being closed or already closed
775 PeerNodeAddress listenAddress; // The address that the remote endpoint is listening on
776 ulong nodeId; // The nodeID of the remote endpoint
777 IPeerProxy proxy; // Proxy channel to talk to the remote endpoint
778 IClientChannel proxyChannel; // To access inner Channel from proxy w/o casting
779 PeerNeighborState state; // Current state of the neighbor
780 object thisLock = new object();
781 IPeerNodeMessageHandling messageHandler;
782 UtilityExtension utility;
784 // Dictates if attempt to set neighbor state should throw exception on failure.
785 enum SetStateBehavior
791 public PeerNeighbor(PeerNodeConfig config,
792 IPeerNodeMessageHandling messageHandler)
794 this.closeReason = PeerCloseReason.None;
795 this.closeInitiator = PeerCloseInitiator.LocalNode;
796 this.config = config;
797 this.state = PeerNeighborState.Created;
798 this.extensions = new ExtensionCollection<IPeerNeighbor>(this, thisLock);
799 this.messageHandler = messageHandler;
802 public IPAddress ConnectIPAddress
806 return this.connectIPAddress;
810 this.connectIPAddress = value;
814 // To retrieve reason for closing the neighbor
815 public PeerCloseReason CloseReason
819 return this.closeReason;
823 // Indicates if close was initiated by local or remote node
824 public PeerCloseInitiator CloseInitiator
828 return this.closeInitiator;
832 // If an exception during processing caused the neighbor to be closed
833 public Exception CloseException
837 return this.closeException;
841 public IExtensionCollection<IPeerNeighbor> Extensions
849 // Returns true if the neighbor is currently closing or already closed
850 public bool IsClosing
858 // Returns true if neighbor is in connected, synchronizing, or synchronized states
859 public bool IsConnected
863 return PeerNeighborStateHelper.IsConnected(this.state);
867 // NOTE: If the property is accessed before the neighbor transitions to connected
868 // state, the returned listen address may be null for the accepting neighbor.
869 public PeerNodeAddress ListenAddress
873 // Return a copy since the scope ID is settable
874 PeerNodeAddress address = this.listenAddress;
876 return new PeerNodeAddress(address.EndpointAddress, PeerIPHelper.CloneAddresses(address.IPAddresses, true));
885 if (!(!this.initiator))
887 throw Fx.AssertAndThrow("Cannot be set for initiator neighbors");
893 this.listenAddress = value;
899 // Returns true if the neighbor is an initiator
900 public bool IsInitiator
904 return this.initiator;
908 // Returns the node ID of the neighbor. If this property is accessed before the
909 // neighbor transitions to connected state, the returned node ID may be 0.
927 // Returns the proxy for the neighbor (i.e., the channel that SFx maintains to the
928 // remote node associated with this neighbor instance).
929 public IPeerProxy Proxy
938 this.proxyChannel = (IClientChannel)this.proxy;
939 RegisterForChannelEvents();
943 // The only states that are settable are connecting, connected, synchronizing,
944 // synchronized, disconnecting, and disconnected.
945 public PeerNeighborState State
954 if (!(PeerNeighborStateHelper.IsSettable(value)))
956 throw Fx.AssertAndThrow("A valid settable state is expected");
958 SetState(value, SetStateBehavior.ThrowException);
966 return this.thisLock;
970 // NOTE: Closing handlers not invoked when a neighbor is aborted; but Closed handlers are.
971 public void Abort(PeerCloseReason reason, PeerCloseInitiator closeInit)
975 // Set close reason etc. if they are not already set.
978 this.isClosing = true;
979 this.closeReason = reason;
980 this.closeInitiator = closeInit;
988 if (this.channelFactory != null)
989 this.channelFactory.Abort();
991 this.proxyChannel.Abort();
994 // Close a neighbor gracefully
995 public IAsyncResult BeginClose(PeerCloseReason reason,
996 PeerCloseInitiator closeInit, Exception exception,
997 AsyncCallback callback, object asyncState)
999 bool callClosing = false;
1003 // Set close reason etc. if they are not already set.
1004 if (!this.isClosing)
1007 this.isClosing = true;
1008 this.closeReason = reason;
1009 this.closeInitiator = closeInit;
1010 this.closeException = exception;
1014 // Initiate close, if another thread has not already done so....
1015 // NOTE: NeighborClosing handlers should not throw any catchable exceptions.
1018 EventHandler<PeerNeighborCloseEventArgs> handler = this.Closing;
1019 if (handler != null)
1023 PeerNeighborCloseEventArgs args = new PeerNeighborCloseEventArgs(
1024 reason, closeInitiator, exception);
1025 handler(this, args);
1029 if (Fx.IsFatal(e)) throw;
1036 if (this.channelFactory != null)
1037 return this.channelFactory.BeginClose(callback, asyncState);
1039 return this.proxyChannel.BeginClose(callback, asyncState);
1042 // Begin opening of a neighbor channel to 'to'. instanceContext is where the remote
1043 // endpoint should send messages to (it will be a reference to PeerNeighborManager).
1044 public IAsyncResult BeginOpen(PeerNodeAddress remoteAddress, Binding binding,
1045 PeerService service, ClosedCallback closedCallback, TimeSpan timeout,
1046 AsyncCallback callback, object asyncState)
1048 this.initiator = true;
1049 this.listenAddress = remoteAddress;
1050 OpenAsyncResult result = new OpenAsyncResult(this, remoteAddress, binding, service,
1051 closedCallback, timeout, callback, state);
1055 // Called by OpenAsyncResult
1056 public IAsyncResult BeginOpenProxy(EndpointAddress remoteAddress, Binding binding,
1057 InstanceContext instanceContext, TimeSpan timeout, AsyncCallback callback, object state)
1059 TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
1060 if (this.channelFactory != null)
1061 Abort(); // to close previously created factory, if any
1063 EndpointAddressBuilder meshEprBuilder = new EndpointAddressBuilder(remoteAddress);
1064 meshEprBuilder.Uri = config.GetMeshUri();
1065 this.channelFactory = new DuplexChannelFactory<IPeerProxy>(instanceContext, binding, meshEprBuilder.ToEndpointAddress());
1066 this.channelFactory.Endpoint.Behaviors.Add(new ClientViaBehavior(remoteAddress.Uri));
1067 this.channelFactory.Endpoint.Behaviors.Add(new PeerNeighborBehavior(this));
1068 this.channelFactory.Endpoint.Contract.Behaviors.Add(new PeerOperationSelectorBehavior(this.messageHandler));
1069 this.config.SecurityManager.ApplyClientSecurity(channelFactory);
1070 this.channelFactory.Open(timeoutHelper.RemainingTime());
1071 this.Proxy = this.channelFactory.CreateChannel();
1073 IAsyncResult result = this.proxyChannel.BeginOpen(timeoutHelper.RemainingTime(), callback, state);
1074 if (result.CompletedSynchronously)
1075 this.proxyChannel.EndOpen(result);
1080 public IAsyncResult BeginSend(Message message,
1081 AsyncCallback callback, object asyncState)
1083 return this.proxy.BeginSend(message, callback, asyncState);
1086 public IAsyncResult BeginSend(Message message,
1087 TimeSpan timeout, AsyncCallback callback, object asyncState)
1089 return this.proxy.BeginSend(message, timeout, callback, asyncState);
1092 public void Send(Message message)
1094 this.proxy.Send(message);
1097 // Called to Abort channelFactory in case BeginOpenProxy or EndOpenProxy throw
1098 public void CleanupProxy()
1100 this.channelFactory.Abort();
1103 public void EndClose(IAsyncResult result)
1105 if (this.channelFactory != null)
1106 this.channelFactory.EndClose(result);
1108 this.proxyChannel.EndClose(result);
1111 public void EndOpen(IAsyncResult result)
1113 OpenAsyncResult.End(result);
1116 // Called by OpenAsyncResult
1117 public void EndOpenProxy(IAsyncResult result)
1119 if (!result.CompletedSynchronously)
1120 this.proxyChannel.EndOpen(result);
1123 public void EndSend(IAsyncResult result)
1125 this.proxy.EndSend(result);
1129 public Message RequestSecurityToken(Message request)
1131 return this.proxy.ProcessRequestSecurityToken(request);
1134 public void Ping(Message request)
1136 this.proxy.Ping(request);
1139 // Service channel closed event handler.
1140 void OnChannelClosed(object source, EventArgs args)
1142 if (this.state < PeerNeighborState.Closed)
1143 OnChannelClosedOrFaulted(PeerCloseReason.Closed);
1145 // If the other side closed the channel, abort the factory (if one exists)
1146 if (this.closeInitiator != PeerCloseInitiator.LocalNode && this.channelFactory != null)
1147 this.channelFactory.Abort();
1150 // Does heavy-lifting of processing closed/faulted events
1151 void OnChannelClosedOrFaulted(PeerCloseReason reason)
1153 PeerNeighborState oldState;
1157 // We don't call SetState here because it should not be called inside lock,
1158 // and to avoid race conditions, we need to set the state before the lock
1160 oldState = this.state;
1161 this.state = PeerNeighborState.Closed;
1163 // Set close reason etc. if they are not already set (as a result of local
1164 // node initiating Close)
1165 if (!this.isClosing)
1167 this.isClosing = true;
1168 this.closeReason = reason;
1169 this.closeInitiator = PeerCloseInitiator.RemoteNode;
1171 TraceClosedEvent(oldState);
1174 // Update traces and counters and notify interested parties
1175 OnStateChanged(PeerNeighborState.Closed);
1178 // Service channel faulted event handler.
1179 void OnChannelFaulted(object source, EventArgs args)
1183 OnChannelClosedOrFaulted(PeerCloseReason.Faulted);
1191 // Service channel opened event handler.
1192 void OnChannelOpened(object source, EventArgs args)
1194 // TrySetState is not used because it asserts for a settable state
1195 // and is meant for use by upper layers. Only PeerNeighbor can set
1196 // the state to Opened. So, it calls SetState directly.
1197 SetState(PeerNeighborState.Opened, SetStateBehavior.TrySet);
1201 // Invokes the appropriate state changed event handler.
1202 // WARNING: This method should not be called within lock.
1204 void OnStateChanged(PeerNeighborState newState)
1206 EventHandler handler = null;
1209 case PeerNeighborState.Opened:
1210 handler = this.Opened;
1212 case PeerNeighborState.Closed:
1213 handler = this.Closed;
1215 case PeerNeighborState.Connected:
1216 handler = this.Connected;
1218 case PeerNeighborState.Disconnecting:
1219 handler = this.Disconnecting;
1221 case PeerNeighborState.Disconnected:
1222 handler = this.Disconnected;
1225 if (handler != null)
1226 handler(this, EventArgs.Empty);
1229 // Open an accepting (incoming) neighbor. callbackInstance is where msgs meant for
1230 // remote endpoint should be sent.
1231 public void Open(IPeerProxy callbackInstance)
1233 this.initiator = false;
1234 this.Proxy = callbackInstance;
1237 // Register for channel events
1238 void RegisterForChannelEvents()
1240 this.state = PeerNeighborState.Created; // reset state if the previous proxy failed
1241 this.proxyChannel.Opened += OnChannelOpened;
1242 this.proxyChannel.Closed += OnChannelClosed;
1243 this.proxyChannel.Faulted += OnChannelFaulted;
1246 // WARNING: This method should not be called within the lock -- it may invoke state
1247 // changed event handlers
1248 bool SetState(PeerNeighborState newState, SetStateBehavior behavior)
1250 bool stateChanged = false;
1251 PeerNeighborState oldState;
1253 // Attempt to set the state
1256 oldState = this.State;
1257 if (behavior == SetStateBehavior.ThrowException)
1258 ThrowIfInvalidState(newState);
1259 if (newState > this.state)
1261 this.state = newState;
1262 stateChanged = true;
1263 if (DiagnosticUtility.ShouldTraceInformation)
1265 TraceEventHelper(TraceEventType.Information, TraceCode.PeerNeighborStateChanged, SR.GetString(SR.TraceCodePeerNeighborStateChanged), null, null, newState, oldState);
1270 if (DiagnosticUtility.ShouldTraceInformation)
1272 TraceEventHelper(TraceEventType.Information, TraceCode.PeerNeighborStateChangeFailed, SR.GetString(SR.TraceCodePeerNeighborStateChangeFailed), null, null, oldState, newState);
1279 // Pass state change notification on to interested subscribers.
1280 OnStateChanged(newState);
1283 return stateChanged;
1286 // Attempts to set to specified state.
1287 // Returns true if succeed and false otherwise.
1288 public bool TrySetState(PeerNeighborState newState)
1290 if (!(PeerNeighborStateHelper.IsSettable(newState)))
1292 throw Fx.AssertAndThrow("A valid settable state is expected");
1294 return SetState(newState, SetStateBehavior.TrySet);
1297 public void ThrowIfClosed()
1299 if (this.state == PeerNeighborState.Closed)
1301 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(
1306 // Throws if the new state being set on the neighbor is invalid compared to the
1307 // current state (such as setting state to connecting when it is already in
1308 // disconnected state). Also throws if neighbor is already closed.
1309 // NOTE: This method should be called within the lock.
1310 void ThrowIfInvalidState(PeerNeighborState newState)
1312 if (this.state == PeerNeighborState.Closed)
1314 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(
1317 if (this.state >= newState)
1319 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(
1320 SR.GetString(SR.PeerNeighborInvalidState, this.state.ToString(),
1321 newState.ToString())));
1325 public void TraceClosedEvent(PeerNeighborState previousState)
1327 if (DiagnosticUtility.ShouldTraceInformation)
1329 TraceEventType severity = TraceEventType.Information;
1331 // Override tracing severity based on close reason
1332 switch (this.closeReason)
1334 case PeerCloseReason.InvalidNeighbor:
1335 case PeerCloseReason.DuplicateNodeId:
1336 severity = TraceEventType.Error;
1339 case PeerCloseReason.ConnectTimedOut:
1340 case PeerCloseReason.InternalFailure:
1341 case PeerCloseReason.Faulted:
1342 severity = TraceEventType.Warning;
1346 PeerNeighborCloseTraceRecord record = new PeerNeighborCloseTraceRecord(
1347 this.nodeId, this.config.NodeId, null, null,
1348 this.GetHashCode(), this.initiator,
1349 PeerNeighborState.Closed.ToString(), previousState.ToString(), null,
1350 this.closeInitiator.ToString(), this.closeReason.ToString()
1353 TraceUtility.TraceEvent(severity, TraceCode.PeerNeighborStateChanged,
1354 SR.GetString(SR.TraceCodePeerNeighborStateChanged), record, this, this.closeException);
1358 public void TraceEventHelper(TraceEventType severity, int traceCode, string traceDescription)
1360 PeerNeighborState nbrState = this.state;
1361 this.TraceEventHelper(severity, traceCode, traceDescription, null, null, nbrState, nbrState);
1364 public void TraceEventHelper(TraceEventType severity, int traceCode, string traceDescription, Exception e)
1366 PeerNeighborState nbrState = this.state;
1367 this.TraceEventHelper(severity, traceCode, traceDescription, e, null, nbrState, nbrState);
1370 public void TraceEventHelper(TraceEventType severity, int traceCode, string traceDescription, Exception e,
1371 string action, PeerNeighborState nbrState, PeerNeighborState previousOrAttemptedState)
1373 if (DiagnosticUtility.ShouldTrace(severity))
1375 string attemptedState = null;
1376 string previousState = null;
1377 PeerNodeAddress listenAddr = null;
1378 IPAddress connectIPAddr = null;
1380 if (nbrState >= PeerNeighborState.Opened && nbrState <= PeerNeighborState.Connected)
1382 listenAddr = this.ListenAddress;
1383 connectIPAddr = this.ConnectIPAddress;
1386 if (traceCode == TraceCode.PeerNeighborStateChangeFailed)
1387 attemptedState = previousOrAttemptedState.ToString();
1388 else if (traceCode == TraceCode.PeerNeighborStateChanged)
1389 previousState = previousOrAttemptedState.ToString();
1391 PeerNeighborTraceRecord record = new PeerNeighborTraceRecord(this.nodeId,
1392 this.config.NodeId, listenAddr, connectIPAddr, this.GetHashCode(),
1393 this.initiator, nbrState.ToString(), previousState, attemptedState, action);
1395 if (severity == TraceEventType.Verbose && e != null)
1396 severity = TraceEventType.Information; // need to be >= info for exceptions
1398 TraceUtility.TraceEvent(severity, traceCode, traceDescription, record, this, e);
1402 // Helper class to implement PeerNeighbor's AsyncOpen by iterating over the IPAddress array
1403 class OpenAsyncResult : AsyncResult
1405 bool completedSynchronously;
1406 ClosedCallback closed;
1407 int currentIndex; // index into the ipAddress array
1408 PeerNeighbor neighbor;
1409 PeerNodeAddress remoteAddress;
1411 PeerService service;
1412 AsyncCallback onOpen;
1413 Exception lastException;
1414 TimeoutHelper timeoutHelper;
1416 public OpenAsyncResult(PeerNeighbor neighbor, PeerNodeAddress remoteAddress, Binding binding,
1417 PeerService service, ClosedCallback closedCallback, TimeSpan timeout,
1418 AsyncCallback callback, object state)
1419 : base(callback, state)
1421 Fx.Assert(remoteAddress != null && remoteAddress.IPAddresses.Count > 0, "Non-empty IPAddress collection expected");
1423 this.timeoutHelper = new TimeoutHelper(timeout);
1424 this.neighbor = neighbor;
1425 this.currentIndex = 0;
1426 this.completedSynchronously = true; // initially
1427 this.remoteAddress = remoteAddress;
1428 this.service = service;
1429 this.binding = binding;
1430 this.onOpen = Fx.ThunkCallback(new AsyncCallback(OnOpen));
1431 this.closed = closedCallback;
1437 bool success = false;
1441 while (this.currentIndex < this.remoteAddress.IPAddresses.Count)
1443 EndpointAddress remoteAddress = PeerIPHelper.GetIPEndpointAddress(
1444 this.remoteAddress.EndpointAddress, this.remoteAddress.IPAddresses[this.currentIndex]);
1447 throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(this.GetType().ToString()));
1451 this.neighbor.ConnectIPAddress = this.remoteAddress.IPAddresses[this.currentIndex];
1452 IAsyncResult result = this.neighbor.BeginOpenProxy(remoteAddress, binding, new InstanceContext(null, service, false), this.timeoutHelper.RemainingTime(), onOpen, null);
1453 if (!result.CompletedSynchronously)
1458 this.neighbor.EndOpenProxy(result);
1459 this.lastException = null;
1461 neighbor.isClosing = false;
1464 #pragma warning suppress 56500 // covered by FxCOP
1467 if (Fx.IsFatal(e)) throw;
1470 this.neighbor.CleanupProxy();
1472 catch (Exception ee)
1474 if (Fx.IsFatal(ee)) throw;
1475 DiagnosticUtility.TraceHandledException(ee, TraceEventType.Information);
1477 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1478 if (!ContinuableException(e)) throw;
1484 if (Fx.IsFatal(e)) throw;
1485 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1486 this.lastException = e;
1489 // Indicate completion to the caller
1492 Fx.Assert(this.lastException == null, "lastException expected to be null");
1496 Fx.Assert(this.lastException != null, "lastException expected to be non-null");
1498 base.Complete(this.completedSynchronously, this.lastException);
1501 public static void End(IAsyncResult result)
1503 AsyncResult.End<OpenAsyncResult>(result);
1506 // Checks if the exception can be handled
1507 bool ContinuableException(Exception exception)
1511 exception is EndpointNotFoundException
1512 || exception is TimeoutException
1514 && timeoutHelper.RemainingTime() > TimeSpan.Zero
1517 this.lastException = exception;
1518 this.currentIndex++;
1524 // Open completion callback. If open failed, reattempts with the next IP address in the list
1525 void OnOpen(IAsyncResult result)
1527 Exception exception = null;
1528 bool completed = false;
1530 if (!result.CompletedSynchronously)
1532 this.completedSynchronously = false;
1535 this.neighbor.EndOpenProxy(result);
1537 neighbor.isClosing = false;
1539 #pragma warning suppress 56500 // covered by FxCOP
1542 if (Fx.IsFatal(e)) throw;
1545 this.neighbor.CleanupProxy();
1547 catch (Exception ee)
1549 if (Fx.IsFatal(ee)) throw;
1550 DiagnosticUtility.TraceHandledException(ee, TraceEventType.Information);
1553 if (ContinuableException(exception))
1555 // attempt connection with the next IP address
1560 catch (Exception ee)
1562 if (Fx.IsFatal(ee)) throw;
1563 DiagnosticUtility.TraceHandledException(ee, TraceEventType.Information);
1574 base.Complete(this.completedSynchronously, exception);
1578 #region IInputSessionShutdown Members
1580 void IInputSessionShutdown.ChannelFaulted(IDuplexContextChannel channel)
1585 void IInputSessionShutdown.DoneReceiving(IDuplexContextChannel channel)
1587 //Close it if the neighbor it was connected to has disconnected
1588 if (channel.State == CommunicationState.Opened)
1596 public UtilityExtension Utility
1600 if (this.utility == null)
1602 this.utility = this.Extensions.Find<UtilityExtension>();
1604 return this.utility;
1609 // Helper class to implement PeerNeighborManager's async neighbor open
1610 class NeighborOpenAsyncResult : AsyncResult
1612 PeerNeighbor neighbor;
1614 // ClosedCallback is a delegate to determine if caller has closed. If so, we bail out of open operation
1615 public NeighborOpenAsyncResult(PeerNeighbor neighbor, PeerNodeAddress remoteAddress, Binding binding,
1616 PeerService service, ClosedCallback closedCallback, TimeSpan timeout, AsyncCallback callback, object state)
1617 : base(callback, state)
1619 this.neighbor = neighbor;
1621 IAsyncResult result = null;
1624 result = neighbor.BeginOpen(remoteAddress, binding, service, closedCallback, timeout,
1625 Fx.ThunkCallback(new AsyncCallback(OnOpen)), null);
1626 if (result.CompletedSynchronously)
1628 neighbor.EndOpen(result);
1633 if (Fx.IsFatal(e)) throw;
1634 neighbor.TraceEventHelper(TraceEventType.Warning, TraceCode.PeerNeighborOpenFailed, SR.GetString(SR.TraceCodePeerNeighborOpenFailed));
1638 // Indicate sync completion to the caller
1639 if (result.CompletedSynchronously)
1640 base.Complete(true);
1643 void OnOpen(IAsyncResult result)
1645 if (!result.CompletedSynchronously)
1647 Exception exception = null;
1651 this.neighbor.EndOpen(result);
1653 #pragma warning suppress 56500 // covered by FxCOP
1656 if (Fx.IsFatal(e)) throw;
1657 DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
1658 neighbor.TraceEventHelper(TraceEventType.Warning, TraceCode.PeerNeighborOpenFailed, SR.GetString(SR.TraceCodePeerNeighborOpenFailed));
1662 base.Complete(result.CompletedSynchronously, exception);
1666 public static IPeerNeighbor End(IAsyncResult result)
1668 NeighborOpenAsyncResult asyncResult = AsyncResult.End<NeighborOpenAsyncResult>(result);
1669 return asyncResult.neighbor;
1672 class PeerNeighborBehavior : IEndpointBehavior
1674 PeerNeighbor neighbor;
1676 public PeerNeighborBehavior(PeerNeighbor neighbor)
1678 this.neighbor = neighbor;
1681 #region IEndpointBehavior Members
1683 public void Validate(ServiceEndpoint serviceEndpoint)
1687 public void AddBindingParameters(ServiceEndpoint serviceEndpoint, BindingParameterCollection bindingParameters)
1691 public void ApplyDispatchBehavior(ServiceEndpoint serviceEndpoint, EndpointDispatcher endpointDispatcher)
1695 public void ApplyClientBehavior(ServiceEndpoint serviceEndpoint, ClientRuntime behavior)
1697 behavior.DispatchRuntime.InputSessionShutdownHandlers.Add(this.neighbor);
1703 public IPeerNeighbor SlowestNeighbor()
1705 List<IPeerNeighbor> neighbors = this.GetConnectedNeighbors();
1706 IPeerNeighbor slowNeighbor = null;
1707 UtilityExtension utility = null;
1708 //if the neighbor has below this number, we wont consider for pruning
1709 int pending = PeerTransportConstants.MessageThreshold;
1710 foreach (IPeerNeighbor peer in neighbors)
1712 utility = peer.Utility;
1713 if (utility == null || !peer.IsConnected)
1715 if (utility.PendingMessages > pending)
1717 slowNeighbor = peer;
1718 pending = utility.PendingMessages;
1721 return slowNeighbor;