using System; using System.Text; using System.Reflection; using System.ComponentModel; using System.Collections; using System.Collections.Generic; using System.Collections.Specialized; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Xml; using System.Xml.XPath; using System.Xml.Schema; using System.IO; using System.Runtime.Serialization.Formatters.Binary; using System.Threading; using System.Timers; using System.Security.Permissions; using System.Security.Cryptography; using System.Workflow.Runtime; using System.Workflow.ComponentModel; using System.Workflow.Runtime.Hosting; using System.Workflow.Runtime.Tracking; namespace System.Workflow.Runtime { /// /// Creates TrackingListener instances /// internal class TrackingListenerFactory { private List _services = null; private bool _initialized = false; private Dictionary _listeners = new Dictionary(); private volatile object _listenerLock = new object(); private System.Timers.Timer _timer = null; private double _interval = 60000; private TrackingProfileManager _profileManager = new TrackingProfileManager(); internal TrackingListenerFactory() { } internal TrackingProfileManager TrackingProfileManager { get { return _profileManager; } } /// /// Must be called /// /// internal void Initialize(WorkflowRuntime runtime) { lock (this) { _services = runtime.TrackingServices; _profileManager.Initialize(runtime); runtime.WorkflowExecutorInitializing += WorkflowExecutorInitializing; _timer = new System.Timers.Timer(); _timer.Interval = _interval; _timer.AutoReset = false; // ensure that only one timer thread at a time _timer.Elapsed += new ElapsedEventHandler(Cleanup); _timer.Start(); _initialized = true; } } /// /// Clean up static state created in Initialize /// internal void Uninitialize(WorkflowRuntime runtime) { lock (this) { _profileManager.Uninitialize(); runtime.WorkflowExecutorInitializing -= WorkflowExecutorInitializing; _timer.Elapsed -= new ElapsedEventHandler(Cleanup); _timer.Stop(); _services = null; _initialized = false; _timer.Dispose(); _timer = null; } } /// /// Callback for associating tracking listeners to in memory instances. Fires for new and loading instances. /// /// WorkflowExecutor /// void WorkflowExecutorInitializing(object sender, WorkflowRuntime.WorkflowExecutorInitializingEventArgs e) { if (null == sender) throw new ArgumentNullException("sender"); if (null == e) throw new ArgumentNullException("e"); if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) throw new ArgumentException("sender"); WorkflowExecutor exec = (WorkflowExecutor)sender; // // Add an event to clean up the WeakRef entry exec.WorkflowExecutionEvent += new EventHandler(WorkflowExecutionEvent); TrackingCallingState trackingCallingState = exec.TrackingCallingState; TrackingListenerBroker listenerBroker = (TrackingListenerBroker)exec.RootActivity.GetValue(WorkflowExecutor.TrackingListenerBrokerProperty); if (listenerBroker != null) { listenerBroker.ReplaceServices(exec.WorkflowRuntime.TrackingServiceReplacement); } TrackingListener listener = null; // // Check if we still have a weakref to the listener for this instance WeakReference weakref = null; if (e.Loading) { bool found = false; lock (_listenerLock) { found = _listeners.TryGetValue(exec.InstanceId, out weakref); } if (found) { try { // // Instead of checking IsAlive take a ref to the Target // so that it isn't GC'd underneath us. listener = weakref.Target as TrackingListener; } catch (InvalidOperationException) { // // This seems weird but according to msdn // accessing Target can throw ??? // Ignore because it's the same as a null target. } } // // If listener is null because we didn't find the wr in the cache // or because the Target has been GC'd create a new listener if (null != listener) { listener.Broker = listenerBroker; } else { Debug.Assert(null != listenerBroker, "TrackingListenerBroker should not be null during loading"); listener = GetTrackingListener(exec.WorkflowDefinition, exec, listenerBroker); if (null != listener) { if (null != weakref) weakref.Target = listener; else { lock (_listenerLock) { _listeners.Add(exec.ID, new WeakReference(listener)); } } } } } else { // // New instance is being created listener = GetTrackingListener(exec.WorkflowDefinition, exec); if (null != listener) { exec.RootActivity.SetValue(WorkflowExecutor.TrackingListenerBrokerProperty, listener.Broker); lock (_listenerLock) { _listeners.Add(exec.ID, new WeakReference(listener)); } } else exec.RootActivity.SetValue(WorkflowExecutor.TrackingListenerBrokerProperty, new TrackingListenerBroker()); } if (null != listener) { exec.WorkflowExecutionEvent += new EventHandler(listener.WorkflowExecutionEvent); } } void WorkflowExecutionEvent(object sender, WorkflowExecutor.WorkflowExecutionEventArgs e) { switch (e.EventType) { case WorkflowEventInternal.Aborted: case WorkflowEventInternal.Completed: case WorkflowEventInternal.Terminated: // // The instance is done - remove // the WeakRef from our list WorkflowExecutor exec = (WorkflowExecutor)sender; lock (_listenerLock) { _listeners.Remove(exec.ID); } break; default: return; } } void Cleanup(object sender, ElapsedEventArgs e) { List _toRemove = new List(); if ((null != _listeners) || (_listeners.Count > 0)) { lock (_listenerLock) { foreach (KeyValuePair kvp in _listeners) { if (null == kvp.Value.Target) _toRemove.Add(kvp.Key); } if (_toRemove.Count > 0) { foreach (Guid g in _toRemove) _listeners.Remove(g); } } } lock (this) { if (_timer != null) { _timer.Start(); } } } /// /// Return a tracking listener for a new instance /// /// SequentialWorkflow for which the tracking listener will be associated /// ScheduleExecutor for the schedule instance /// New TrackingListener instance internal TrackingListener GetTrackingListener(Activity sked, WorkflowExecutor skedExec) { if (!_initialized) Initialize(skedExec.WorkflowRuntime); return GetListener(sked, skedExec, null); } /// /// Return a tracking listener for an existing instance (normally used during loading) /// /// SequentialWorkflow for which the tracking listener will be associated /// ScheduleExecutor for the schedule instance /// TrackingListenerBroker /// New TrackingListener instance internal TrackingListener GetTrackingListener(Activity sked, WorkflowExecutor skedExec, TrackingListenerBroker broker) { if (!_initialized) Initialize(skedExec.WorkflowRuntime); if (null == broker) { WorkflowTrace.Tracking.TraceEvent(TraceEventType.Error, 0, ExecutionStringManager.NullTrackingBroker); return null; } return GetListener(sked, skedExec, broker); } private TrackingListener GetListenerFromWRCache(Guid instanceId) { WeakReference wr = null; TrackingListener listener = null; lock (_listenerLock) { if (!_listeners.TryGetValue(instanceId, out wr)) throw new InvalidOperationException(string.Format(System.Globalization.CultureInfo.InvariantCulture, ExecutionStringManager.ListenerNotInCache, instanceId)); listener = wr.Target as TrackingListener; if (null == listener) throw new ObjectDisposedException(string.Format(System.Globalization.CultureInfo.InvariantCulture, ExecutionStringManager.ListenerNotInCacheDisposed, instanceId)); } return listener; } internal void ReloadProfiles(WorkflowExecutor exec) { // Keep control events from other threads out using (new ServiceEnvironment(exec.RootActivity)) { using (exec.ExecutorLock.Enter()) { // check if this is a valid in-memory instance if (!exec.IsInstanceValid) throw new InvalidOperationException(ExecutionStringManager.WorkflowNotValid); // suspend the instance bool localSuspend = exec.Suspend(ExecutionStringManager.TrackingProfileUpdate); try { // // Get new profiles TrackingListener listener = GetListenerFromWRCache(exec.InstanceId); listener.ReloadProfiles(exec, exec.InstanceId); } finally { if (localSuspend) { // @undone: for now this will not return till the instance is done // Once Kumar has fixed 4335, we can enable this. exec.Resume(); } } } } } internal void ReloadProfiles(WorkflowExecutor exec, Guid instanceId, ref TrackingListenerBroker broker, ref List channels) { Type workflowType = exec.WorkflowDefinition.GetType(); // // Ask every tracking service if they want to reload // even if they originally returned null for a profile foreach (TrackingService service in _services) { TrackingProfile profile = null; TrackingChannelWrapper w = null; // // Check if the service wants to reload a profile if (service.TryReloadProfile(workflowType, instanceId, out profile)) { bool found = false; int i; for (i = 0; i < channels.Count; i++) { if (service.GetType() == channels[i].TrackingServiceType) { w = channels[i]; found = true; break; } } // // If we don't have a profile, remove what we had for this service type (if anything) if (null == profile) { if (found) { broker.RemoveService(w.TrackingServiceType); channels.RemoveAt(i); } continue; } // // Parse the new profile - instance only, the cache is not involved RTTrackingProfile rtp = new RTTrackingProfile(profile, exec.WorkflowDefinition, workflowType); rtp.IsPrivate = true; if (!found) { // // This is a new profile, create new channel, channelwrapper and broker item List activityCallPath = null; Guid callerInstanceId = Guid.Empty; TrackingCallingState trackingCallingState = exec.TrackingCallingState; Debug.Assert((null != trackingCallingState), "WorkflowState is null"); IList path = null; Guid context = GetContext(exec.RootActivity), callerContext = Guid.Empty, callerParentContext = Guid.Empty; // // Use CallerActivityPathProxy to determine if this is an invoked instance if (trackingCallingState != null) { path = trackingCallingState.CallerActivityPathProxy; if ((null != path) && (path.Count > 0)) { activityCallPath = new List(path); Debug.Assert(Guid.Empty != trackingCallingState.CallerWorkflowInstanceId, "Instance has an ActivityCallPath but CallerInstanceId is empty"); callerInstanceId = trackingCallingState.CallerWorkflowInstanceId; callerContext = trackingCallingState.CallerContextGuid; callerParentContext = trackingCallingState.CallerParentContextGuid; } } TrackingParameters tp = new TrackingParameters(instanceId, workflowType, exec.WorkflowDefinition, activityCallPath, callerInstanceId, context, callerContext, callerParentContext); TrackingChannel channel = service.GetTrackingChannel(tp); TrackingChannelWrapper wrapper = new TrackingChannelWrapper(channel, service.GetType(), workflowType, rtp); channels.Add(wrapper); Type t = service.GetType(); broker.AddService(t, rtp.Version); broker.MakeProfileInstance(t); } else { // // Don't need to call MakeProfilePrivate on the wrapper // because we've already marked it as private and we already // have a private copy of it. //w.MakeProfilePrivate( exec ); w.SetTrackingProfile(rtp); broker.MakeProfileInstance(w.TrackingServiceType); } } } } internal Guid GetContext(Activity activity) { return ((ActivityExecutionContextInfo)ContextActivityUtils.ContextActivity(activity).GetValue(Activity.ActivityExecutionContextInfoProperty)).ContextGuid; } private TrackingListener GetListener(Activity sked, WorkflowExecutor skedExec, TrackingListenerBroker broker) { if ((null == sked) || (null == skedExec)) { WorkflowTrace.Tracking.TraceEvent(TraceEventType.Error, 0, ExecutionStringManager.NullParameters); return null; } if ((null == _services) || (_services.Count <= 0)) return null; bool load = (null != broker); List channels = GetChannels(sked, skedExec, skedExec.InstanceId, sked.GetType(), ref broker); if ((null == channels) || (0 == channels.Count)) return null; return new TrackingListener(this, sked, skedExec, channels, broker, load); } private List GetChannels(Activity schedule, WorkflowExecutor exec, Guid instanceID, Type workflowType, ref TrackingListenerBroker broker) { if (null == _services) return null; bool initBroker = false; if (null == broker) { broker = new TrackingListenerBroker(); initBroker = true; } List channels = new List(); List activityCallPath = null; Guid callerInstanceId = Guid.Empty; Guid context = GetContext(exec.RootActivity), callerContext = Guid.Empty, callerParentContext = Guid.Empty; Debug.Assert(exec is WorkflowExecutor, "Executor is not WorkflowExecutor"); TrackingCallingState trackingCallingState = exec.TrackingCallingState; TrackingListenerBroker trackingListenerBroker = (TrackingListenerBroker)exec.RootActivity.GetValue(WorkflowExecutor.TrackingListenerBrokerProperty); IList path = trackingCallingState != null ? trackingCallingState.CallerActivityPathProxy : null; // // Use CallerActivityPathProxy to determine if this is an invoked instance if ((null != path) && (path.Count > 0)) { activityCallPath = new List(path); Debug.Assert(Guid.Empty != trackingCallingState.CallerWorkflowInstanceId, "Instance has an ActivityCallPath but CallerInstanceId is empty"); callerInstanceId = trackingCallingState.CallerWorkflowInstanceId; callerContext = trackingCallingState.CallerContextGuid; callerParentContext = trackingCallingState.CallerParentContextGuid; } TrackingParameters parameters = new TrackingParameters(instanceID, workflowType, exec.WorkflowDefinition, activityCallPath, callerInstanceId, context, callerContext, callerParentContext); for (int i = 0; i < _services.Count; i++) { TrackingChannel channel = null; Type serviceType = _services[i].GetType(); // // See if the service has a profile for this schedule type // If not we don't do any tracking for the service // RTTrackingProfile profile = null; // // If we've created the broker get the current version of the profile if (initBroker) { profile = _profileManager.GetProfile(_services[i], schedule); if (null == profile) continue; broker.AddService(serviceType, profile.Version); } else { // // Only reload the services that are in the broker // If services that weren't originally associated to an instance // wish to join that instance they should call ReloadTrackingProfiles if (!broker.ContainsService(serviceType)) continue; if (broker.IsProfileInstance(serviceType)) { profile = _profileManager.GetProfile(_services[i], schedule, instanceID); if (null == profile) throw new InvalidOperationException(ExecutionStringManager.MissingProfileForService + serviceType.ToString()); profile.IsPrivate = true; } else { Version versionId; if (broker.TryGetProfileVersionId(serviceType, out versionId)) { profile = _profileManager.GetProfile(_services[i], schedule, versionId); if (null == profile) throw new InvalidOperationException(ExecutionStringManager.MissingProfileForService + serviceType.ToString() + ExecutionStringManager.MissingProfileForVersion + versionId.ToString()); // // If the profile is marked as private clone the instance we got from the cache // The cloned instance is marked as private during the cloning if (broker.IsProfilePrivate(serviceType)) { profile = profile.Clone(); profile.IsPrivate = true; } } else continue; } } // // If profile is not null get a channel channel = _services[i].GetTrackingChannel(parameters); if (null == channel) throw new InvalidOperationException(ExecutionStringManager.NullChannel); channels.Add(new TrackingChannelWrapper(channel, _services[i].GetType(), workflowType, profile)); } return channels; } } /// /// Handles subscribing to status change events and receiving event notifications. /// internal class TrackingListener { private List _channels = null; private TrackingListenerBroker _broker = null; private TrackingListenerFactory _factory = null; protected TrackingListener() { } internal TrackingListener(TrackingListenerFactory factory, Activity sked, WorkflowExecutor exec, List channels, TrackingListenerBroker broker, bool load) { if ((null == sked) || (null == broker)) { WorkflowTrace.Tracking.TraceEvent(TraceEventType.Error, 0, ExecutionStringManager.NullParameters); return; } _factory = factory; _channels = channels; // // Keep a reference to the broker so that we can hand it out when adding subscriptions _broker = broker; // // Give the broker our reference so that it can call us back on behalf of subscriptions _broker.TrackingListener = this; } internal TrackingListenerBroker Broker { get { return _broker; } set { _broker = value; } } internal void ReloadProfiles(WorkflowExecutor exec, Guid instanceId) { // // Ask the factory to redo the channels and broker _factory.ReloadProfiles(exec, instanceId, ref _broker, ref _channels); } #region Event Handlers internal void ActivityStatusChange(object sender, WorkflowExecutor.ActivityStatusChangeEventArgs e) { WorkflowTrace.Tracking.TraceInformation("TrackingListener::ActivityStatusChange - Received Activity Status Change Event for activity {0}", e.Activity.QualifiedName); if (null == sender) throw new ArgumentNullException("sender"); if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) throw new ArgumentException("sender"); if (null == e) throw new ArgumentNullException("e"); WorkflowExecutor exec = (WorkflowExecutor)sender; if ((null == _channels) || (_channels.Count <= 0)) { WorkflowTrace.Tracking.TraceEvent(TraceEventType.Error, 0, ExecutionStringManager.NoChannels); return; } Activity activity = e.Activity; if (!SubscriptionRequired(activity, exec)) return; // // Get the shared data that is the same for each tracking channel that gets a record Guid parentContextGuid = Guid.Empty, contextGuid = Guid.Empty; GetContext(activity, exec, out contextGuid, out parentContextGuid); DateTime dt = DateTime.UtcNow; int eventOrderId = _broker.GetNextEventOrderId(); foreach (TrackingChannelWrapper wrapper in _channels) { // // Create a record for each tracking channel // Each channel gets a distinct record because extract data will almost always be different. ActivityTrackingRecord record = new ActivityTrackingRecord(activity.GetType(), activity.QualifiedName, contextGuid, parentContextGuid, activity.ExecutionStatus, dt, eventOrderId, null); bool extracted = wrapper.GetTrackingProfile(exec).TryTrackActivityEvent(activity, activity.ExecutionStatus, exec, record); // // Only send the record to the channel if the profile indicates that it is interested // This doesn't mean that the Body will always have data in it, // it may be an empty extraction (just header info) if (extracted) wrapper.TrackingChannel.Send(record); } } internal void UserTrackPoint(object sender, WorkflowExecutor.UserTrackPointEventArgs e) { if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) throw new ArgumentException("sender is not WorkflowExecutor"); WorkflowExecutor exec = (WorkflowExecutor)sender; Activity activity = e.Activity; DateTime dt = DateTime.UtcNow; int eventOrderId = _broker.GetNextEventOrderId(); Guid parentContextGuid, contextGuid; GetContext(activity, exec, out contextGuid, out parentContextGuid); foreach (TrackingChannelWrapper wrapper in _channels) { UserTrackingRecord record = new UserTrackingRecord(activity.GetType(), activity.QualifiedName, contextGuid, parentContextGuid, dt, eventOrderId, e.Key, e.Args); if (wrapper.GetTrackingProfile(exec).TryTrackUserEvent(activity, e.Key, e.Args, exec, record)) wrapper.TrackingChannel.Send(record); } } internal void WorkflowExecutionEvent(object sender, WorkflowExecutor.WorkflowExecutionEventArgs e) { if (null == sender) throw new ArgumentNullException("sender"); WorkflowExecutor exec = sender as WorkflowExecutor; if (null == exec) throw new ArgumentException(ExecutionStringManager.InvalidSenderWorkflowExecutor); // // Many events are mapped "forward" and sent to tracking services // (Persisting->Persisted, SchedulerEmpty->Idle) // This is so that a batch is always available when a tracking service gets an event. // Without this tracking data could be inconsistent with the state of the instance. switch (e.EventType) { case WorkflowEventInternal.Creating: NotifyChannels(TrackingWorkflowEvent.Created, e, exec); return; case WorkflowEventInternal.Starting: NotifyChannels(TrackingWorkflowEvent.Started, e, exec); return; case WorkflowEventInternal.Suspending: NotifyChannels(TrackingWorkflowEvent.Suspended, e, exec); return; case WorkflowEventInternal.Resuming: NotifyChannels(TrackingWorkflowEvent.Resumed, e, exec); return; case WorkflowEventInternal.Persisting: NotifyChannels(TrackingWorkflowEvent.Persisted, e, exec); return; case WorkflowEventInternal.Unloading: NotifyChannels(TrackingWorkflowEvent.Unloaded, e, exec); return; case WorkflowEventInternal.Loading: NotifyChannels(TrackingWorkflowEvent.Loaded, e, exec); return; case WorkflowEventInternal.Completing: NotifyChannels(TrackingWorkflowEvent.Completed, e, exec); NotifyChannelsOfCompletionOrTermination(); return; case WorkflowEventInternal.Aborting: NotifyChannels(TrackingWorkflowEvent.Aborted, e, exec); return; case WorkflowEventInternal.Terminating: NotifyChannels(TrackingWorkflowEvent.Terminated, e, exec); NotifyChannelsOfCompletionOrTermination(); return; case WorkflowEventInternal.Exception: NotifyChannels(TrackingWorkflowEvent.Exception, e, exec); return; case WorkflowEventInternal.SchedulerEmpty: NotifyChannels(TrackingWorkflowEvent.Idle, e, exec); return; case WorkflowEventInternal.UserTrackPoint: UserTrackPoint(exec, (WorkflowExecutor.UserTrackPointEventArgs)e); return; case WorkflowEventInternal.ActivityStatusChange: ActivityStatusChange(exec, (WorkflowExecutor.ActivityStatusChangeEventArgs)e); return; case WorkflowEventInternal.DynamicChangeBegin: DynamicUpdateBegin(exec, (WorkflowExecutor.DynamicUpdateEventArgs)e); return; case WorkflowEventInternal.DynamicChangeRollback: DynamicUpdateRollback(exec, (WorkflowExecutor.DynamicUpdateEventArgs)e); return; case WorkflowEventInternal.DynamicChangeCommit: DynamicUpdateCommit(exec, (WorkflowExecutor.DynamicUpdateEventArgs)e); return; default: return; } } internal void DynamicUpdateBegin(object sender, WorkflowExecutor.DynamicUpdateEventArgs e) { if (null == sender) throw new ArgumentNullException("sender"); if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) throw new ArgumentException("sender"); WorkflowExecutor exec = (WorkflowExecutor)sender; // // WorkflowChangeEventArgs may be null or the WorkflowChanges may be null or empty // If so there's no work to do here if (null == e.ChangeActions) return; // // Clone the profiles to create instance specific copies (if they aren't already) MakeProfilesPrivate(exec); // // Give the profiles the changes. At this point we are in a volatile state. // Profiles must act as if the changes will succeed but roll back any internal changes if they do not. foreach (TrackingChannelWrapper channel in _channels) { channel.GetTrackingProfile(exec).WorkflowChangeBegin(e.ChangeActions); } } internal void DynamicUpdateRollback(object sender, WorkflowExecutor.DynamicUpdateEventArgs e) { if (null == sender) throw new ArgumentNullException("sender"); if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) throw new ArgumentException("sender"); WorkflowExecutor exec = (WorkflowExecutor)sender; foreach (TrackingChannelWrapper channel in _channels) { channel.GetTrackingProfile(exec).WorkflowChangeRollback(); } } internal void DynamicUpdateCommit(object sender, WorkflowExecutor.DynamicUpdateEventArgs e) { if (null == sender) throw new ArgumentNullException("sender"); if (!typeof(WorkflowExecutor).IsInstanceOfType(sender)) throw new ArgumentException("sender"); WorkflowExecutor exec = (WorkflowExecutor)sender; DateTime dt = DateTime.UtcNow; foreach (TrackingChannelWrapper channel in _channels) { channel.GetTrackingProfile(exec).WorkflowChangeCommit(); } // // Notify tracking channels of changes int eventOrderId = _broker.GetNextEventOrderId(); foreach (TrackingChannelWrapper wrapper in _channels) { WorkflowTrackingRecord rec = new WorkflowTrackingRecord(TrackingWorkflowEvent.Changed, dt, eventOrderId, new TrackingWorkflowChangedEventArgs(e.ChangeActions, exec.WorkflowDefinition)); if (wrapper.GetTrackingProfile(exec).TryTrackInstanceEvent(TrackingWorkflowEvent.Changed, rec)) wrapper.TrackingChannel.Send(rec); } } #endregion Event Handlers #region Private Methods private void NotifyChannels(TrackingWorkflowEvent evt, WorkflowExecutor.WorkflowExecutionEventArgs e, WorkflowExecutor exec) { DateTime dt = DateTime.UtcNow; int eventOrderId = _broker.GetNextEventOrderId(); foreach (TrackingChannelWrapper wrapper in _channels) { EventArgs args = null; switch (evt) { case TrackingWorkflowEvent.Suspended: args = new TrackingWorkflowSuspendedEventArgs(((WorkflowExecutor.WorkflowExecutionSuspendingEventArgs)e).Error); break; case TrackingWorkflowEvent.Terminated: WorkflowExecutor.WorkflowExecutionTerminatingEventArgs wtea = (WorkflowExecutor.WorkflowExecutionTerminatingEventArgs)e; if (null != wtea.Exception) args = new TrackingWorkflowTerminatedEventArgs(wtea.Exception); else args = new TrackingWorkflowTerminatedEventArgs(wtea.Error); break; case TrackingWorkflowEvent.Exception: WorkflowExecutor.WorkflowExecutionExceptionEventArgs weea = (WorkflowExecutor.WorkflowExecutionExceptionEventArgs)e; args = new TrackingWorkflowExceptionEventArgs(weea.Exception, weea.CurrentPath, weea.OriginalPath, weea.ContextGuid, weea.ParentContextGuid); break; } WorkflowTrackingRecord rec = new WorkflowTrackingRecord(evt, dt, eventOrderId, args); if (wrapper.GetTrackingProfile(exec).TryTrackInstanceEvent(evt, rec)) wrapper.TrackingChannel.Send(rec); } } private void NotifyChannelsOfCompletionOrTermination() { foreach (TrackingChannelWrapper wrapper in _channels) wrapper.TrackingChannel.InstanceCompletedOrTerminated(); } private void GetContext(Activity activity, WorkflowExecutor exec, out Guid contextGuid, out Guid parentContextGuid) { contextGuid = _factory.GetContext(activity); if (null != activity.Parent) parentContextGuid = _factory.GetContext(activity.Parent); else parentContextGuid = contextGuid; Debug.Assert(contextGuid != Guid.Empty, "TrackingContext is empty"); Debug.Assert(parentContextGuid != Guid.Empty, "Parent TrackingContext is empty"); } /// /// Clone all profiles to create private versions in order to hold subscriptions for dynamic changes /// private void MakeProfilesPrivate(WorkflowExecutor exec) { foreach (TrackingChannelWrapper channel in _channels) { channel.MakeProfilePrivate(exec); _broker.MakeProfilePrivate(channel.TrackingServiceType); } } /// /// Determine if subscriptions are needed /// /// Activity for which to check subscription needs /// private bool SubscriptionRequired(Activity activity, WorkflowExecutor exec) { // // Give each channel a chance to prep itself bool needed = false; foreach (TrackingChannelWrapper channel in _channels) { if ((channel.GetTrackingProfile(exec).ActivitySubscriptionNeeded(activity)) && (!needed)) needed = true; } return needed; } #endregion } /// /// This is a lightweight class that is serialized so that the TrackingListener doesn't have to be. /// Every subscription that the listener adds holds a reference to this class. /// When an instance is loaded the broker is given to the listener factory and the listener factory /// gives the broker the new listener. This saves us from having to persist the listener itself which /// means that while we do need to persist a list of service types and their profile version we don't /// have to persist the channels themselves (and we can't control how heavy channels get as they are host defined). /// [Serializable] internal class TrackingListenerBroker : System.Runtime.Serialization.ISerializable { [NonSerialized] private TrackingListener _listener = null; private int _eventOrderId = 0; private Dictionary _services = new Dictionary(); internal TrackingListenerBroker() { } internal TrackingListenerBroker(TrackingListener listener) { _listener = listener; } internal TrackingListener TrackingListener { // // FxCops minbar complains because this isn't used. // The Setter is required; seems weird not to have a getter. //get { return _listener; } set { _listener = value; } } internal bool ContainsService(Type trackingServiceType) { return _services.ContainsKey(HashHelper.HashServiceType(trackingServiceType)); } internal void AddService(Type trackingServiceType, Version profileVersionId) { _services.Add(HashHelper.HashServiceType(trackingServiceType), new ServiceProfileContainer(profileVersionId)); } internal void ReplaceServices(Dictionary replacements) { if (replacements != null && replacements.Count > 0) { ServiceProfileContainer item; foreach (KeyValuePair replacement in replacements) { Guid previous = HashHelper.HashServiceType(replacement.Key); if (_services.TryGetValue(previous, out item)) { _services.Remove(previous); Guid current = HashHelper.HashServiceType(replacement.Value); if (!_services.ContainsKey(current)) { _services.Add(current, item); } } } } } internal void RemoveService(Type trackingServiceType) { _services.Remove(HashHelper.HashServiceType(trackingServiceType)); } internal bool TryGetProfileVersionId(Type trackingServiceType, out Version profileVersionId) { profileVersionId = new Version(0, 0); ServiceProfileContainer service = null; if (_services.TryGetValue(HashHelper.HashServiceType(trackingServiceType), out service)) { profileVersionId = service.ProfileVersionId; return true; } return false; } internal void MakeProfilePrivate(Type trackingServiceType) { ServiceProfileContainer service = null; if (!_services.TryGetValue(HashHelper.HashServiceType(trackingServiceType), out service)) throw new ArgumentException(ExecutionStringManager.InvalidTrackingService); service.IsPrivate = true; } internal bool IsProfilePrivate(Type trackingServiceType) { ServiceProfileContainer service = null; if (!_services.TryGetValue(HashHelper.HashServiceType(trackingServiceType), out service)) throw new ArgumentException(ExecutionStringManager.InvalidTrackingService); return service.IsPrivate; } internal void MakeProfileInstance(Type trackingServiceType) { ServiceProfileContainer service = null; if (!_services.TryGetValue(HashHelper.HashServiceType(trackingServiceType), out service)) throw new ArgumentException(ExecutionStringManager.InvalidTrackingService); // // Can't be instance without being private service.IsPrivate = true; service.IsInstance = true; } internal bool IsProfileInstance(Type trackingServiceType) { ServiceProfileContainer service = null; if (!_services.TryGetValue(HashHelper.HashServiceType(trackingServiceType), out service)) throw new ArgumentException(ExecutionStringManager.InvalidTrackingService); return service.IsInstance; } internal int GetNextEventOrderId() { checked { return ++_eventOrderId; } } [Serializable] internal class ServiceProfileContainer { Version _profileVersionId = new Version(0, 0); bool _isPrivate = false; bool _isInstance = false; protected ServiceProfileContainer() { } internal ServiceProfileContainer(Version profileVersionId) { _profileVersionId = profileVersionId; } internal Version ProfileVersionId { get { return _profileVersionId; } } internal bool IsPrivate { get { return _isPrivate; } set { _isPrivate = value; } } internal bool IsInstance { get { return _isInstance; } set { _isInstance = value; } } } #region ISerializable Members [SecurityPermissionAttribute(SecurityAction.Demand, SerializationFormatter = true)] public void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { info.AddValue("eventOrderId", this._eventOrderId); info.AddValue("services", this._services.Count == 0 ? null : this._services); } private TrackingListenerBroker(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { this._eventOrderId = info.GetInt32("eventOrderId"); this._services = (Dictionary)info.GetValue("services", typeof(Dictionary)); if (this._services == null) this._services = new Dictionary(); } #endregion } /// /// Manages profile requests, caching profiles and creating RTTrackingProfile instances. /// internal class TrackingProfileManager { // // This is a dictionary keyed by tracking service type // that returns a dictionary that is key by schedule type // that returns a Set of profile versions for that schedule type // The set is constrained by VersionId private Dictionary> _cacheLookup; // // Protects _cacheLookup private object _cacheLock = new object(); // // Values assigned in Initialize private bool _init = false; private List _services = null; private WorkflowRuntime _runtime = null; internal TrackingProfileManager() { } /// /// Clears all entries from the cache by reinitializing the member /// public static void ClearCache() { WorkflowRuntime.ClearTrackingProfileCache(); } internal void ClearCacheImpl() { lock (_cacheLock) { _cacheLookup = new Dictionary>(); } } /// /// Create static state /// /// internal void Initialize(WorkflowRuntime runtime) { lock (_cacheLock) { if (null == runtime) throw new ArgumentException(ExecutionStringManager.NullEngine); _runtime = runtime; // // Initialize the cache // Do this every time the runtime starts/stops to make life easier // for IProfileNotification tracking services that might have updated // profiles while we were stopped - we'll go get new versions since nothing is cached // without them having to fire updated events. _cacheLookup = new Dictionary>(); if (null != runtime.TrackingServices) { _services = runtime.TrackingServices; foreach (TrackingService service in _services) { if (service is IProfileNotification) { ((IProfileNotification)service).ProfileUpdated += new EventHandler(ProfileUpdated); ((IProfileNotification)service).ProfileRemoved += new EventHandler(ProfileRemoved); } } } _init = true; } } /// /// Clean up static state /// internal void Uninitialize() { lock (_cacheLock) { if (null != _runtime) { foreach (TrackingService service in _services) { if (service is IProfileNotification) { ((IProfileNotification)service).ProfileUpdated -= new EventHandler(ProfileUpdated); ((IProfileNotification)service).ProfileRemoved -= new EventHandler(ProfileRemoved); } } } _runtime = null; _services = null; _init = false; } } /// /// Retrieves the current version of a profile from the specified service /// internal RTTrackingProfile GetProfile(TrackingService service, Activity schedule) { if (!_init) throw new ApplicationException(ExecutionStringManager.TrackingProfileManagerNotInitialized); if ((null == service) || (null == schedule)) { WorkflowTrace.Tracking.TraceEvent(TraceEventType.Error, 0, ExecutionStringManager.NullParameters); return null; } Type workflowType = schedule.GetType(); RTTrackingProfile tp = null; if (service is IProfileNotification) { // // If we found the profile in the cache return it, it may be null, this is OK // (no profile for this service type/schedule type combination) if (TryGetFromCache(service.GetType(), workflowType, out tp)) return tp; } // // Either we don't have anything in the cache for this schedule/service combination // or this is a base TrackingService that doesn't notify of profile updates // Get the profile from the service TrackingProfile profile = null; if (!service.TryGetProfile(workflowType, out profile)) { // // No profile for this schedule from this service // RemoveProfile will just mark this service/schedule as not currently having a profile in the cache RemoveProfile(workflowType, service.GetType()); return null; } // // Check the cache to see if we already have this version // For TrackingService types this is necessary. // For IProfileNotification types this is a bit redundant // but another threadcould have inserted the profile into the cache // so check again before acquiring the writer lock if (TryGetFromCache(service.GetType(), workflowType, profile.Version, out tp)) return tp; // // No profile, create it string xaml = schedule.GetValue(Activity.WorkflowXamlMarkupProperty) as string; if (null != xaml && xaml.Length > 0) { // // Never add xaml only workflows to the cache // Each one must be handled distinctly return CreateProfile(profile, schedule, service.GetType()); } else { tp = CreateProfile(profile, workflowType, service.GetType()); } lock (_cacheLock) { // // Recheck the cache with exclusive access RTTrackingProfile tmp = null; if (TryGetFromCache(service.GetType(), workflowType, profile.Version, out tmp)) return tmp; // // Add it to the cache if (!AddToCache(tp, service.GetType())) throw new ApplicationException(ExecutionStringManager.ProfileCacheInsertFailure); return tp; } } /// /// Retrieves the specified version of a profile from the specified service /// internal RTTrackingProfile GetProfile(TrackingService service, Activity workflow, Version versionId) { if (null == service) throw new ArgumentNullException("service"); if (null == workflow) throw new ArgumentNullException("workflow"); if (!_init) throw new InvalidOperationException(ExecutionStringManager.TrackingProfileManagerNotInitialized); Type workflowType = workflow.GetType(); RTTrackingProfile tp = null; // // Looking for a specific version, see if it is in the cache if (TryGetFromCache(service.GetType(), workflowType, versionId, out tp)) return tp; TrackingProfile profile = service.GetProfile(workflowType, versionId); // // No profile, create it string xaml = workflow.GetValue(Activity.WorkflowXamlMarkupProperty) as string; if (null != xaml && xaml.Length > 0) { // // Never add xaml only workflows to the cache // Each one must be handled distinctly return CreateProfile(profile, workflow, service.GetType()); } else { tp = CreateProfile(profile, workflowType, service.GetType()); } lock (_cacheLock) { // // Recheck the cache with exclusive access RTTrackingProfile tmp = null; if (TryGetFromCache(service.GetType(), workflowType, versionId, out tmp)) return tmp; // // Add it to the cache if (!AddToCache(tp, service.GetType())) throw new ApplicationException(ExecutionStringManager.ProfileCacheInsertFailure); return tp; } } internal RTTrackingProfile GetProfile(TrackingService service, Activity workflow, Guid instanceId) { // // An instance based profile will never be in the cache TrackingProfile profile = service.GetProfile(instanceId); if (null == profile) return null; return new RTTrackingProfile(profile, workflow, service.GetType()); } #region Private Methods private RTTrackingProfile CreateProfile(TrackingProfile profile, Type workflowType, Type serviceType) { // // Can't use the activity definition that we have here, it may have been updated // Get the base definition and use it to create the profile. Activity tmpSchedule = _runtime.GetWorkflowDefinition(workflowType); return new RTTrackingProfile(profile, tmpSchedule, serviceType); } private RTTrackingProfile CreateProfile(TrackingProfile profile, Activity schedule, Type serviceType) { // // This is called for Xaml only workflows return new RTTrackingProfile(profile, schedule, serviceType); } /// /// Add a profile to the cache but do not reset the NoProfiles flag for the schedule type /// /// RTTrackingProfile to add /// TrackingService type /// True if the profile was successfully added; false if not private bool AddToCache(RTTrackingProfile profile, Type serviceType) { return AddToCache(profile, serviceType, false); } /// /// Adds a profile to the cache and optionally resets the NoProfiles flag for the schedule type /// /// RTTrackingProfile to add /// TrackingService type /// true will reset NoProfiles (to false); false will leave NoProfiles as is /// True if the profile was successfully added; false if not private bool AddToCache(RTTrackingProfile profile, Type serviceType, bool resetNoProfiles) { // // Profile may be null, serviceType may not if (null == serviceType) return false; lock (_cacheLock) { Dictionary schedules = null; // // Get the dictionary for the service type, // create it if it doesn't exist if (!_cacheLookup.TryGetValue(serviceType, out schedules)) { schedules = new Dictionary(); _cacheLookup.Add(serviceType, schedules); } // // The the ProfileList for the schedule type, // create it if it doesn't exist ProfileList profiles = null; if (!schedules.TryGetValue(profile.WorkflowType, out profiles)) { profiles = new ProfileList(); schedules.Add(profile.WorkflowType, profiles); } if (resetNoProfiles) profiles.NoProfile = false; return profiles.Profiles.TryAdd(new CacheItem(profile)); } } /// /// Gets a profile from the cache /// private bool TryGetFromCache(Type serviceType, Type workflowType, out RTTrackingProfile profile) { return TryGetFromCache(serviceType, workflowType, new Version(0, 0), out profile); // 0 is an internal signal to get the most current } /// /// Gets a profile from the cache /// private bool TryGetFromCache(Type serviceType, Type workflowType, Version versionId, out RTTrackingProfile profile) { profile = null; CacheItem item = null; lock (_cacheLock) { Dictionary schedules = null; if (!_cacheLookup.TryGetValue(serviceType, out schedules)) return false; ProfileList profiles = null; if (!schedules.TryGetValue(workflowType, out profiles)) return false; // // 0 means get the current version if (0 == versionId.Major) { // // Currently the schedule type doesn't have a profile associated to it if (profiles.NoProfile) return true; if ((null == profiles.Profiles) || (0 == profiles.Profiles.Count)) return false; // // Current version is highest versionId // which means it is at the end of the Set int endPos = profiles.Profiles.Count - 1; if (null == profiles.Profiles[endPos]) return false; profile = profiles.Profiles[endPos].TrackingProfile; return true; } else { if ((null == profiles.Profiles) || (0 == profiles.Profiles.Count)) return false; if (profiles.Profiles.TryGetValue(new CacheItem(workflowType, versionId), out item)) { profile = item.TrackingProfile; return true; } else return false; } } } #endregion #region Event Handlers /// /// Listens on ProfileUpdated events from IProfileNotification services /// /// Type of the tracking service sending the update /// ProfileUpdatedEventArgs containing the new profile and the schedule type private void ProfileUpdated(object sender, ProfileUpdatedEventArgs e) { if (null == sender) throw new ArgumentNullException("sender"); Type t = sender.GetType(); if (null == e.WorkflowType) throw new ArgumentNullException("e"); if (null == e.TrackingProfile) { RemoveProfile(e.WorkflowType, t); return; } RTTrackingProfile profile = CreateProfile(e.TrackingProfile, e.WorkflowType, t); // // If AddToCache fails this version is already in the cache and we don't care AddToCache(profile, t, true); } private void ProfileRemoved(object sender, ProfileRemovedEventArgs e) { if (null == sender) throw new ArgumentNullException("sender"); if (null == e.WorkflowType) throw new ArgumentNullException("e"); RemoveProfile(e.WorkflowType, sender.GetType()); } private void RemoveProfile(Type workflowType, Type serviceType) { lock (_cacheLock) { Dictionary schedules = null; if (!_cacheLookup.TryGetValue(serviceType, out schedules)) { schedules = new Dictionary(); _cacheLookup.Add(serviceType, schedules); } ProfileList profiles = null; if (!schedules.TryGetValue(workflowType, out profiles)) { profiles = new ProfileList(); schedules.Add(workflowType, profiles); } // // Finally indicate that there isn't a profile for this schedule type // Calling UpdateProfile for this type will result in resetting this field // regardless of whether the version of the profile passed is in the cache or not profiles.NoProfile = true; } } #endregion #region Private Classes private class ProfileList { internal bool NoProfile = false; internal Set Profiles = new Set(5); } private class CacheItem : IComparable { internal RTTrackingProfile TrackingProfile = null; internal DateTime LastAccess = DateTime.UtcNow; // // VersionId and ScheduleType are stored separately from the profile so that they // can be used to identify the profile if it has been pushed from the cache. internal Version VersionId = new Version(0, 0); internal Type ScheduleType = null; internal CacheItem() { } internal CacheItem(RTTrackingProfile profile) { if (null == profile) throw new ArgumentNullException("profile"); ScheduleType = profile.WorkflowType; this.TrackingProfile = profile; VersionId = profile.Version; } internal CacheItem(Type workflowType, Version versionId) { VersionId = versionId; ScheduleType = workflowType; } #region IComparable Members public int CompareTo(object obj) { if (!(obj is CacheItem)) throw new ArgumentException(ExecutionStringManager.InvalidCacheItem); CacheItem item = (CacheItem)obj; if ((VersionId == item.VersionId) && (ScheduleType == item.ScheduleType)) return 0; else return (VersionId > item.VersionId) ? 1 : -1; } #endregion } #endregion } /// /// Represents a wrapper around a channel and its artifacts, such as its tracking service type and profile /// internal class TrackingChannelWrapper { private Type _serviceType = null, _scheduleType = null; private TrackingChannel _channel = null; [NonSerialized] private RTTrackingProfile _profile = null; private Version _profileVersionId; private TrackingChannelWrapper() { } public TrackingChannelWrapper(TrackingChannel channel, Type serviceType, Type workflowType, RTTrackingProfile profile) { _serviceType = serviceType; _scheduleType = workflowType; _channel = channel; _profile = profile; _profileVersionId = profile.Version; } internal Type TrackingServiceType { get { return _serviceType; } } internal TrackingChannel TrackingChannel { get { return _channel; } } /// /// Get the tracking profile for the channel /// /// BaseExecutor /// RTTrackingProfile internal RTTrackingProfile GetTrackingProfile(WorkflowExecutor skedExec) { if (null != _profile) return _profile; else throw new InvalidOperationException(String.Format(System.Globalization.CultureInfo.CurrentCulture, ExecutionStringManager.NullProfileForChannel, this._scheduleType.AssemblyQualifiedName)); } internal void SetTrackingProfile(RTTrackingProfile profile) { _profile = profile; } /// /// Clone the tracking profile stored in the cache and /// /// internal void MakeProfilePrivate(WorkflowExecutor exec) { if (null != _profile) { // // If the profile is not already a private copy make it so if (!_profile.IsPrivate) { _profile = _profile.Clone(); _profile.IsPrivate = true; } } else { // // We're not holding a reference to a profile // so get it from the cache and clone it into a private copy RTTrackingProfile tmp = GetTrackingProfile(exec); _profile = tmp.Clone(); _profile.IsPrivate = true; } } } internal class Set : IEnumerable where T : IComparable { List list = null; public Set() { list = new List(); } public Set(int capacity) { list = new List(capacity); } public int Count { get { return list.Count; } } public void Add(T item) { int pos = -1; if (!Search(item, out pos)) list.Insert(pos, item); else throw new ArgumentException(ExecutionStringManager.ItemAlreadyExist); } public bool TryAdd(T item) { int pos = -1; if (!Search(item, out pos)) { list.Insert(pos, item); return true; } else return false; } public bool Contains(T item) { int pos = -1; return Search(item, out pos); } public IEnumerator GetEnumerator() { return list.GetEnumerator(); } System.Collections.IEnumerator IEnumerable.GetEnumerator() { return list.GetEnumerator(); } public bool TryGetValue(T item, out T value) { int pos = -1; if (Search(item, out pos)) { value = list[pos]; return true; } else { value = default(T); return false; } } public T this[int index] { get { return list[index]; } } private bool Search(T item, out int insertPos) { insertPos = -1; int pos = 0, high = list.Count, low = -1, diff = 0; while (high - low > 1) { pos = (high + low) / 2; diff = list[pos].CompareTo(item); if (0 == diff) { insertPos = pos; return true; } else if (diff > 0) high = pos; else low = pos; } if (low == -1) { insertPos = 0; return false; } if (0 != diff) { insertPos = (diff < 0) ? pos + 1 : pos; return false; } return true; } } /// /// Persisted tracking State pertaining to workflow invoking for an individual schedule. There could be multiple called schedules under /// an instance. /// [Serializable] internal class TrackingCallingState { #region data members private IList callerActivityPathProxy; private Guid callerInstanceId; private Guid callerContextGuid; private Guid callerParentContextGuid; #endregion data members #region Property accessors /// /// Activity proxy of the caller/execer activity, if any /// //@@Undone for Ashishmi: Hold on to ActivityPath Proxy in one of your class impl /// /// internal IList CallerActivityPathProxy { get { return callerActivityPathProxy; } set { callerActivityPathProxy = value; } } /// /// Instance ID of the caller/exec'er schedule, if any /// /// public Guid CallerWorkflowInstanceId { get { return callerInstanceId; } set { callerInstanceId = value; } } /// /// Context of the caller's invoke activity /// /// int public Guid CallerContextGuid { get { return callerContextGuid; } set { callerContextGuid = value; } } /// /// ParentContext of the caller's invoke activity /// /// int public Guid CallerParentContextGuid { get { return callerParentContextGuid; } set { callerParentContextGuid = value; } } #endregion Property accessors } internal static class HashHelper { internal static Guid HashServiceType(Type serviceType) { return HashServiceType(serviceType.AssemblyQualifiedName); } [SuppressMessage("Microsoft.Cryptographic.Standard", "CA5350:MD5CannotBeUsed", Justification = "Design has been approved. We are not using MD5 for any security or cryptography purposes but rather as a hash.")] internal static Guid HashServiceType(String serviceFullTypeName) { byte[] data; byte[] result; UnicodeEncoding ue = new UnicodeEncoding(); data = ue.GetBytes(serviceFullTypeName); if (AppSettings.FIPSRequired) { result = MD5PInvokeHelper.CalculateHash(data); } else { MD5 md5 = new MD5CryptoServiceProvider(); result = md5.ComputeHash(data); } return new Guid(result); } } }