/* ======================================================================== * Copyright (c) 2005-2013 The OPC Foundation, Inc. All rights reserved. * * OPC Foundation MIT License 1.00 * * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated documentation * files (the "Software"), to deal in the Software without * restriction, including without limitation the rights to use, * copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the * Software is furnished to do so, subject to the following * conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR * OTHER DEALINGS IN THE SOFTWARE. * * The complete license agreement can be found here: * http://opcfoundation.org/License/MIT/1.00/ * ======================================================================*/ using System; using System.Collections; using System.Collections.Generic; using System.Text; using System.Xml; using System.ServiceModel; using System.Runtime.Serialization; using System.Threading; namespace Opc.Ua.Client { /// <summary> /// A subscription /// </summary> [DataContract(Namespace = Namespaces.OpcUaXsd)] public class Subscription : IDisposable { #region Constructors /// <summary> /// Creates a empty object. /// </summary> public Subscription() { Initialize(); } /// <summary> /// Initializes the subscription from a template. /// </summary> public Subscription(Subscription template) : this(template, false) { } /// <summary> /// Initializes the subscription from a template. /// </summary> /// <param name="template">The template.</param> /// <param name="copyEventHandlers">if set to <c>true</c> the event handlers are copied.</param> public Subscription(Subscription template, bool copyEventHandlers) { Initialize(); if (template != null) { string displayName = template.DisplayName; if (String.IsNullOrEmpty(displayName)) { displayName = m_displayName; } // remove any existing numeric suffix. int index = displayName.LastIndexOf(' '); if (index != -1) { try { displayName = displayName.Substring(0, index); } catch { // not a numeric suffix. } } m_displayName = Utils.Format("{0} {1}", displayName, Utils.IncrementIdentifier(ref s_globalSubscriptionCounter)); m_publishingInterval = template.m_publishingInterval; m_keepAliveCount = template.m_keepAliveCount; m_lifetimeCount = template.m_lifetimeCount; m_minLifetimeInterval = template.m_minLifetimeInterval; m_maxNotificationsPerPublish = template.m_maxNotificationsPerPublish; m_publishingEnabled = template.m_publishingEnabled; m_priority = template.m_priority; m_timestampsToReturn = template.m_timestampsToReturn; m_maxMessageCount = template.m_maxMessageCount; m_defaultItem = (MonitoredItem)template.m_defaultItem.Clone(); m_defaultItem = template.m_defaultItem; m_handle = template.m_handle; m_maxMessageCount = template.m_maxMessageCount; m_disableMonitoredItemCache = template.m_disableMonitoredItemCache; if (copyEventHandlers) { m_StateChanged = template.m_StateChanged; m_PublishStatusChanged = template.m_PublishStatusChanged; m_fastDataChangeCallback = template.m_fastDataChangeCallback; m_fastEventCallback = template.m_fastEventCallback; } // copy the list of monitored items. foreach (MonitoredItem monitoredItem in template.MonitoredItems) { MonitoredItem clone = new MonitoredItem(monitoredItem, copyEventHandlers); clone.Subscription = this; m_monitoredItems.Add(clone.ClientHandle, clone); } } } /// <summary> /// Called by the .NET framework during deserialization. /// </summary> [OnDeserializing] private void Initialize(StreamingContext context) { m_cache = new object(); Initialize(); } /// <summary> /// Sets the private members to default values. /// </summary> private void Initialize() { m_id = 0; m_displayName = "Subscription"; m_publishingInterval = 0; m_keepAliveCount = 0; m_lifetimeCount = 0; m_maxNotificationsPerPublish = 0; m_publishingEnabled = false; m_timestampsToReturn = TimestampsToReturn.Both; m_maxMessageCount = 10; m_messageCache = new LinkedList<NotificationMessage>(); m_monitoredItems = new SortedDictionary<uint,MonitoredItem>(); m_deletedItems = new List<MonitoredItem>(); m_defaultItem = new MonitoredItem(); m_defaultItem.DisplayName = "MonitoredItem"; m_defaultItem.SamplingInterval = -1; m_defaultItem.MonitoringMode = MonitoringMode.Reporting; m_defaultItem.QueueSize = 0; m_defaultItem.DiscardOldest = true; } #endregion #region IDisposable Members /// <summary> /// Frees any unmanaged resources. /// </summary> public void Dispose() { Dispose(true); } /// <summary> /// An overrideable version of the Dispose. /// </summary> [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2213:DisposableFieldsShouldBeDisposed", MessageId = "m_publishTimer")] protected virtual void Dispose(bool disposing) { if (disposing) { Utils.SilentDispose(m_publishTimer); m_publishTimer = null; } } #endregion #region Events /// <summary> /// Raised to indicate that the state of the subscription has changed. /// </summary> [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1009:DeclareEventHandlersCorrectly")] public event SubscriptionStateChangedEventHandler StateChanged { add { m_StateChanged += value; } remove { m_StateChanged -= value; } } /// <summary> /// Raised to indicate the publishing state for the subscription has stopped or resumed (see PublishingStopped property). /// </summary> public event EventHandler PublishStatusChanged { add { lock (m_cache) { m_PublishStatusChanged += value; } } remove { lock (m_cache) { m_PublishStatusChanged -= value; } } } #endregion #region Persistent Properties /// <summary> /// A display name for the subscription. /// </summary> [DataMember(Order = 1)] public string DisplayName { get { return m_displayName; } set { m_displayName = value; } } /// <summary> /// The publishing interval. /// </summary> [DataMember(Order = 2)] public int PublishingInterval { get { return m_publishingInterval; } set { m_publishingInterval = value; } } /// <summary> /// The keep alive count. /// </summary> [DataMember(Order = 3)] public uint KeepAliveCount { get { return m_keepAliveCount; } set { m_keepAliveCount = value; } } /// <summary> /// The maximum number of notifications per publish request. /// </summary> [DataMember(Order = 4)] public uint LifetimeCount { get { return m_lifetimeCount; } set { m_lifetimeCount = value; } } /// <summary> /// The maximum number of notifications per publish request. /// </summary> [DataMember(Order = 5)] public uint MaxNotificationsPerPublish { get { return m_maxNotificationsPerPublish; } set { m_maxNotificationsPerPublish = value; } } /// <summary> /// Whether publishing is enabled. /// </summary> [DataMember(Order = 6)] public bool PublishingEnabled { get { return m_publishingEnabled; } set { m_publishingEnabled = value; } } /// <summary> /// The priority assigned to subscription. /// </summary> [DataMember(Order = 7)] public byte Priority { get { return m_priority; } set { m_priority = value; } } /// <summary> /// The timestamps to return with the notification messages. /// </summary> [DataMember(Order = 8)] public TimestampsToReturn TimestampsToReturn { get { return m_timestampsToReturn; } set { m_timestampsToReturn = value; } } /// <summary> /// The maximum number of messages to keep in the internal cache. /// </summary> [DataMember(Order = 9)] public int MaxMessageCount { get { lock (m_cache) { return m_maxMessageCount; } } set { lock (m_cache) { m_maxMessageCount = value; } } } /// <summary> /// The default monitored item. /// </summary> [DataMember(Order = 10)] public MonitoredItem DefaultItem { get { return m_defaultItem; } set { m_defaultItem = value; } } /// <summary> /// The minimum lifetime for subscriptions in milliseconds. /// </summary> [DataMember(Order = 11)] public uint MinLifetimeInterval { get { return m_minLifetimeInterval; } set { m_minLifetimeInterval = value; } } /// <summary> /// Gets or sets a value indicating whether the notifications are cached within the monitored items. /// </summary> /// <value> /// <c>true</c> if monitored item cache is disabled; otherwise, <c>false</c>. /// </value> /// <remarks> /// Applications must process the Session.Notication event if this is set to true. /// This flag improves performance by eliminating the processing involved in updating the cache. /// </remarks> [DataMember(Order = 12)] public bool DisableMonitoredItemCache { get { return m_disableMonitoredItemCache; } set { m_disableMonitoredItemCache = value; } } /// <summary> /// Gets or sets the fast data change callback. /// </summary> /// <value>The fast data change callback.</value> /// <remarks> /// Only one callback is allowed at a time but it is more efficient to call than an event. /// </remarks> public FastDataChangeNotificationEventHandler FastDataChangeCallback { get { return m_fastDataChangeCallback; } set { m_fastDataChangeCallback = value; } } /// <summary> /// Gets or sets the fast event callback. /// </summary> /// <value>The fast event callback.</value> /// <remarks> /// Only one callback is allowed at a time but it is more efficient to call than an event. /// </remarks> public FastEventNotificationEventHandler FastEventCallback { get { return m_fastEventCallback; } set { m_fastEventCallback = value; } } /// <summary> /// The items to monitor. /// </summary> public IEnumerable<MonitoredItem> MonitoredItems { get { lock (m_cache) { return new List<MonitoredItem>(m_monitoredItems.Values); } } } /// <summary> /// Allows the list of monitored items to be saved/restored when the object is serialized. /// </summary> [DataMember(Name = "MonitoredItems", Order = 11)] [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] private List<MonitoredItem> SavedMonitoredItems { get { lock (m_cache) { return new List<MonitoredItem>(m_monitoredItems.Values); } } set { if (this.Created) { throw new InvalidOperationException("Cannot update a subscription that has been created on the server."); } lock (m_cache) { m_monitoredItems.Clear(); foreach (MonitoredItem monitoredItem in value) { AddItem(monitoredItem); } } } } #endregion #region Dynamic Properties /// <summary> /// Returns true if the subscription has changes that need to be applied. /// </summary> public bool ChangesPending { get { if (m_deletedItems.Count > 0) { return true; } foreach (MonitoredItem monitoredItem in m_monitoredItems.Values) { if (Created && !monitoredItem.Status.Created) { return true; } if (monitoredItem.AttributesModified) { return true; } } return false; } } /// <summary> /// Returns the number of monitored items. /// </summary> public uint MonitoredItemCount { get { lock (m_cache) { return (uint)m_monitoredItems.Count; } } } /// <summary> /// The session that owns the subscription item. /// </summary> public Session Session { get { return m_session; } internal set { m_session = value; } } /// <summary> /// A local handle assigned to the subscription /// </summary> public object Handle { get { return m_handle; } set { m_handle = value; } } /// <summary> /// The unique identifier assigned by the server. /// </summary> public uint Id { get { return m_id; } } /// <summary> /// Whether the subscription has been created on the server. /// </summary> public bool Created { get { return m_id != 0; } } /// <summary> /// The current publishing interval. /// </summary> public double CurrentPublishingInterval { get { return m_currentPublishingInterval; } } /// <summary> /// The current keep alive count. /// </summary> public uint CurrentKeepAliveCount { get { return m_currentKeepAliveCount; } } /// <summary> /// The current lifetime count. /// </summary> public uint CurrentLifetimeCount { get { return m_currentLifetimeCount; } } /// <summary> /// Whether publishing is currently enabled. /// </summary> public bool CurrentPublishingEnabled { get { return m_currentPublishingEnabled; } } /// <summary> /// The priority assigned to subscription when it was created. /// </summary> public byte CurrentPriority { get { return m_currentPriority; } } /// <summary> /// The when that the last notification received was published. /// </summary> public DateTime PublishTime { get { lock (m_cache) { if (m_messageCache.Count > 0) { return m_messageCache.Last.Value.PublishTime; } } return DateTime.MinValue; } } /// <summary> /// The when that the last notification was received. /// </summary> public DateTime LastNotificationTime { get { lock (m_cache) { return m_lastNotificationTime; } } } /// <summary> /// The sequence number assigned to the last notification message. /// </summary> public uint SequenceNumber { get { lock (m_cache) { if (m_messageCache.Count > 0) { return m_messageCache.Last.Value.SequenceNumber; } } return 0; } } /// <summary> /// The number of notifications contained in the last notification message. /// </summary> public uint NotificationCount { get { lock (m_cache) { if (m_messageCache.Count > 0) { return (uint)m_messageCache.Last.Value.NotificationData.Count; } } return 0; } } /// <summary> /// The last notification received from the server. /// </summary> public NotificationMessage LastNotification { get { lock (m_cache) { if (m_messageCache.Count > 0) { return m_messageCache.Last.Value; } return null; } } } /// <summary> /// The cached notifications. /// </summary> public IEnumerable<NotificationMessage> Notifications { get { lock (m_cache) { // make a copy to ensure the state of the last cannot change during enumeration. return new List<NotificationMessage>(m_messageCache); } } } /// <summary> /// The sequence numbers that are available for republish requests. /// </summary> public IEnumerable<uint> AvailableSequenceNumbers { get { lock (m_cache) { return m_availableSequenceNumbers; } } } /// <summary> /// Sends a notification that the state of the subscription has changed. /// </summary> public void ChangesCompleted() { if (m_StateChanged != null) { m_StateChanged(this, new SubscriptionStateChangedEventArgs(m_changeMask)); } m_changeMask = SubscriptionChangeMask.None; } /// <summary> /// Returns true if the subscription is not receiving publishes. /// </summary> public bool PublishingStopped { get { lock (m_cache) { int keepAliveInterval = (int)(m_currentPublishingInterval*m_currentKeepAliveCount); if (m_lastNotificationTime.AddMilliseconds(keepAliveInterval+500) < DateTime.UtcNow) { return true; } return false; } } } #endregion #region Public Methods /// <summary> /// Ensures sensible values for the counts. /// </summary> private void AdjustCounts(ref uint keepAliveCount, ref uint lifetimeCount) { // keep alive count must be at least 1. if (keepAliveCount == 0) { keepAliveCount = 1; } // ensure the lifetime is sensible given the sampling interval. if (m_publishingInterval > 0) { uint minLifetimeCount = (uint)(m_minLifetimeInterval/m_publishingInterval); if (lifetimeCount < minLifetimeCount) { lifetimeCount = minLifetimeCount; if (m_minLifetimeInterval%m_publishingInterval != 0) { lifetimeCount++; } Utils.Trace("Adjusted LifetimeCount to value={0}, for subscription {1}. ", lifetimeCount, Id); } } // don't know what the sampling interval will be - use something large enough // to ensure the user does not experience unexpected drop outs. else { Utils.Trace("Adjusted LifetimeCount from value={0}, to value={1}, for subscription {2}. ", lifetimeCount, 1000, Id); lifetimeCount = 1000; } // lifetime must be greater than the keep alive count. if (lifetimeCount < keepAliveCount) { Utils.Trace("Adjusted LifetimeCount from value={0}, to value={1}, for subscription {2}. ", lifetimeCount, keepAliveCount, Id); lifetimeCount = keepAliveCount; } } /// <summary> /// Creates a subscription on the server. /// </summary> public void Create() { VerifySubscriptionState(false); // create the subscription. uint subscriptionId; double revisedPublishingInterval; uint revisedKeepAliveCount = m_keepAliveCount; uint revisedLifetimeCounter = m_lifetimeCount; AdjustCounts(ref revisedKeepAliveCount, ref revisedLifetimeCounter); m_session.CreateSubscription( null, m_publishingInterval, revisedLifetimeCounter, revisedKeepAliveCount, m_maxNotificationsPerPublish, m_publishingEnabled, m_priority, out subscriptionId, out revisedPublishingInterval, out revisedLifetimeCounter, out revisedKeepAliveCount); // update current state. m_id = subscriptionId; m_currentPublishingInterval = revisedPublishingInterval; m_currentKeepAliveCount = revisedKeepAliveCount; m_currentLifetimeCount = revisedLifetimeCounter; m_currentPublishingEnabled = m_publishingEnabled; m_currentPriority = m_priority; StartKeepAliveTimer(); m_changeMask |= SubscriptionChangeMask.Created; if (m_keepAliveCount != revisedKeepAliveCount) { Utils.Trace("For subscription {0}, Keep alive count was revised from {1} to {2}", Id, m_keepAliveCount, revisedKeepAliveCount); } if (m_lifetimeCount != revisedLifetimeCounter) { Utils.Trace("For subscription {0}, Lifetime count was revised from {1} to {2}", Id, m_lifetimeCount, revisedLifetimeCounter); } if (m_publishingInterval != revisedPublishingInterval) { Utils.Trace("For subscription {0}, Publishing interval was revised from {1} to {2}", Id, m_publishingInterval, revisedPublishingInterval); } if (revisedLifetimeCounter < revisedKeepAliveCount * 3) { Utils.Trace("For subscription {0}, Revised lifetime counter (value={1}) is less than three times the keep alive count (value={2})", Id, revisedLifetimeCounter, revisedKeepAliveCount); } if (m_currentPriority == 0) { Utils.Trace("For subscription {0}, the priority was set to 0.", Id); } CreateItems(); ChangesCompleted(); } /// <summary> /// Starts a timer to ensure publish requests are sent frequently enough to detect network interruptions. /// </summary> private void StartKeepAliveTimer() { // stop the publish timer. if (m_publishTimer != null) { m_publishTimer.Dispose(); m_publishTimer = null; } lock (m_cache) { m_lastNotificationTime = DateTime.MinValue; } int keepAliveInterval = (int)(m_currentPublishingInterval*m_currentKeepAliveCount); m_lastNotificationTime = DateTime.UtcNow; m_publishTimer = new Timer(OnKeepAlive, keepAliveInterval, keepAliveInterval, keepAliveInterval); // send initial publish. m_session.BeginPublish(keepAliveInterval*3); } /// <summary> /// Checks if a notification has arrived. Sends a publish if it has not. /// </summary> private void OnKeepAlive(object state) { // check if a publish has arrived. EventHandler callback = null; lock (m_cache) { if (!PublishingStopped) { return; } callback = m_PublishStatusChanged; m_publishLateCount++; } TraceState("PUBLISHING STOPPED"); if (callback != null) { try { callback(this, null); } catch (Exception e) { Utils.Trace(e, "Error while raising PublishStateChanged event."); } } } /// <summary> /// Dumps the current state of the session queue. /// </summary> internal void TraceState(string context) { if ((Utils.TraceMask & Utils.TraceMasks.Information) == 0) { return; } StringBuilder buffer = new StringBuilder(); buffer.AppendFormat("Subscription {0}", context); buffer.AppendFormat(", Id={0}", m_id); buffer.AppendFormat(", LastNotificationTime={0:HH:mm:ss}", m_lastNotificationTime); if (m_session != null) { buffer.AppendFormat(", GoodPublishRequestCount={0}", m_session.GoodPublishRequestCount); } buffer.AppendFormat(", PublishingInterval={0}", m_currentPublishingInterval); buffer.AppendFormat(", KeepAliveCount={0}", m_currentKeepAliveCount); buffer.AppendFormat(", PublishingEnabled={0}", m_currentPublishingEnabled); buffer.AppendFormat(", MonitoredItemCount={0}", MonitoredItemCount); Utils.Trace("{0}", buffer.ToString()); } /// <summary> /// Deletes a subscription on the server. /// </summary> public void Delete(bool silent) { if (!silent) { VerifySubscriptionState(true); } // nothing to do if not created. if (!this.Created) { return; } try { // stop the publish timer. if (m_publishTimer != null) { m_publishTimer.Dispose(); m_publishTimer = null; } // delete the subscription. UInt32Collection subscriptionIds = new uint[] { m_id }; StatusCodeCollection results; DiagnosticInfoCollection diagnosticInfos; ResponseHeader responseHeader = m_session.DeleteSubscriptions( null, subscriptionIds, out results, out diagnosticInfos); // validate response. ClientBase.ValidateResponse(results, subscriptionIds); ClientBase.ValidateDiagnosticInfos(diagnosticInfos, subscriptionIds); if (StatusCode.IsBad(results[0])) { throw new ServiceResultException(ClientBase.GetResult(results[0], 0, diagnosticInfos, responseHeader)); } } // supress exception if silent flag is set. catch (Exception e) { if (!silent) { throw new ServiceResultException(e, StatusCodes.BadUnexpectedError); } } // always put object in disconnected state even if an error occurs. finally { m_id = 0; m_currentPublishingInterval = 0; m_currentKeepAliveCount = 0; m_currentPublishingEnabled = false; m_currentPriority = 0; // update items. lock (m_cache) { foreach (MonitoredItem monitoredItem in m_monitoredItems.Values) { monitoredItem.SetDeleteResult(StatusCodes.Good, -1, null, null); } } m_deletedItems.Clear(); m_changeMask |= SubscriptionChangeMask.Deleted; } ChangesCompleted(); } /// <summary> /// Modifies a subscription on the server. /// </summary> public void Modify() { VerifySubscriptionState(true); // modify the subscription. double revisedPublishingInterval; uint revisedKeepAliveCount = m_keepAliveCount; uint revisedLifetimeCounter = m_lifetimeCount; AdjustCounts(ref revisedKeepAliveCount, ref revisedLifetimeCounter); m_session.ModifySubscription( null, m_id, m_publishingInterval, revisedLifetimeCounter, revisedKeepAliveCount, m_maxNotificationsPerPublish, m_priority, out revisedPublishingInterval, out revisedLifetimeCounter, out revisedKeepAliveCount); // update current state. m_currentPublishingInterval = revisedPublishingInterval; m_currentKeepAliveCount = revisedKeepAliveCount; m_currentLifetimeCount = revisedLifetimeCounter; m_currentPriority = m_priority; m_changeMask |= SubscriptionChangeMask.Modified; ChangesCompleted(); } /// <summary> /// Changes the publishing enabled state for the subscription. /// </summary> public void SetPublishingMode(bool enabled) { VerifySubscriptionState(true); // modify the subscription. UInt32Collection subscriptionIds = new uint[] { m_id }; StatusCodeCollection results; DiagnosticInfoCollection diagnosticInfos; ResponseHeader responseHeader = m_session.SetPublishingMode( null, enabled, new uint[] { m_id }, out results, out diagnosticInfos); // validate response. ClientBase.ValidateResponse(results, subscriptionIds); ClientBase.ValidateDiagnosticInfos(diagnosticInfos, subscriptionIds); if (StatusCode.IsBad(results[0])) { throw new ServiceResultException(ClientBase.GetResult(results[0], 0, diagnosticInfos, responseHeader)); } // update current state. m_currentPublishingEnabled = m_publishingEnabled = enabled; m_changeMask |= SubscriptionChangeMask.Modified; ChangesCompleted(); } /// <summary> /// Republishes the specified notification message. /// </summary> public NotificationMessage Republish(uint sequenceNumber) { VerifySubscriptionState(true); NotificationMessage message; m_session.Republish( null, m_id, sequenceNumber, out message); return message; } /// <summary> /// Applies any changes to the subscription items. /// </summary> public void ApplyChanges() { DeleteItems(); ModifyItems(); CreateItems(); } /// <summary> /// Resolves all relative paths to nodes on the server. /// </summary> public void ResolveItemNodeIds() { VerifySubscriptionState(true); // collect list of browse paths. BrowsePathCollection browsePaths = new BrowsePathCollection(); List<MonitoredItem> itemsToBrowse = new List<MonitoredItem>(); lock (m_cache) { foreach (MonitoredItem monitoredItem in m_monitoredItems.Values) { if (!String.IsNullOrEmpty(monitoredItem.RelativePath) && NodeId.IsNull(monitoredItem.ResolvedNodeId)) { // cannot change the relative path after an item is created. if (monitoredItem.Created) { throw new ServiceResultException(StatusCodes.BadInvalidState, "Cannot modify item path after it is created."); } BrowsePath browsePath = new BrowsePath(); browsePath.StartingNode = monitoredItem.StartNodeId; // parse the relative path. try { browsePath.RelativePath = RelativePath.Parse(monitoredItem.RelativePath, m_session.TypeTree); } catch (Exception e) { monitoredItem.SetError(new ServiceResult(e)); continue; } browsePaths.Add(browsePath); itemsToBrowse.Add(monitoredItem); } } } // nothing to do. if (browsePaths.Count == 0) { return; } // translate browse paths. BrowsePathResultCollection results; DiagnosticInfoCollection diagnosticInfos; ResponseHeader responseHeader = m_session.TranslateBrowsePathsToNodeIds( null, browsePaths, out results, out diagnosticInfos); ClientBase.ValidateResponse(results, browsePaths); ClientBase.ValidateDiagnosticInfos(diagnosticInfos, browsePaths); // update results. for (int ii = 0; ii < results.Count; ii++) { itemsToBrowse[ii].SetResolvePathResult(results[ii], ii, diagnosticInfos, responseHeader); } m_changeMask |= SubscriptionChangeMask.ItemsModified; } /// <summary> /// Creates all items that have not already been created. /// </summary> public IList<MonitoredItem> CreateItems() { VerifySubscriptionState(true); ResolveItemNodeIds(); MonitoredItemCreateRequestCollection requestItems = new MonitoredItemCreateRequestCollection(); List<MonitoredItem> itemsToCreate = new List<MonitoredItem>(); lock (m_cache) { foreach (MonitoredItem monitoredItem in m_monitoredItems.Values) { // ignore items that have been created. if (monitoredItem.Status.Created) { continue; } // build item request. MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(); request.ItemToMonitor.NodeId = monitoredItem.ResolvedNodeId; request.ItemToMonitor.AttributeId = monitoredItem.AttributeId; request.ItemToMonitor.IndexRange = monitoredItem.IndexRange; request.ItemToMonitor.DataEncoding = monitoredItem.Encoding; request.MonitoringMode = monitoredItem.MonitoringMode; request.RequestedParameters.ClientHandle = monitoredItem.ClientHandle; request.RequestedParameters.SamplingInterval = monitoredItem.SamplingInterval; request.RequestedParameters.QueueSize = monitoredItem.QueueSize; request.RequestedParameters.DiscardOldest = monitoredItem.DiscardOldest; if (monitoredItem.Filter != null) { request.RequestedParameters.Filter = new ExtensionObject(monitoredItem.Filter); } requestItems.Add(request); itemsToCreate.Add(monitoredItem); } } if (requestItems.Count == 0) { return itemsToCreate; } // modify the subscription. MonitoredItemCreateResultCollection results; DiagnosticInfoCollection diagnosticInfos; ResponseHeader responseHeader = m_session.CreateMonitoredItems( null, m_id, m_timestampsToReturn, requestItems, out results, out diagnosticInfos); ClientBase.ValidateResponse(results, itemsToCreate); ClientBase.ValidateDiagnosticInfos(diagnosticInfos, itemsToCreate); // update results. for (int ii = 0; ii < results.Count; ii++) { itemsToCreate[ii].SetCreateResult(requestItems[ii], results[ii], ii, diagnosticInfos, responseHeader); } m_changeMask |= SubscriptionChangeMask.ItemsCreated; ChangesCompleted(); // return the list of items affected by the change. return itemsToCreate; } /// <summary> /// Modies all items that have been changed. /// </summary> public IList<MonitoredItem> ModifyItems() { VerifySubscriptionState(true); MonitoredItemModifyRequestCollection requestItems = new MonitoredItemModifyRequestCollection(); List<MonitoredItem> itemsToModify = new List<MonitoredItem>(); lock (m_cache) { foreach (MonitoredItem monitoredItem in m_monitoredItems.Values) { // ignore items that have been created or modified. if (!monitoredItem.Status.Created || !monitoredItem.AttributesModified) { continue; } // build item request. MonitoredItemModifyRequest request = new MonitoredItemModifyRequest(); request.MonitoredItemId = monitoredItem.Status.Id; request.RequestedParameters.ClientHandle = monitoredItem.ClientHandle; request.RequestedParameters.SamplingInterval = monitoredItem.SamplingInterval; request.RequestedParameters.QueueSize = monitoredItem.QueueSize; request.RequestedParameters.DiscardOldest = monitoredItem.DiscardOldest; if (monitoredItem.Filter != null) { request.RequestedParameters.Filter = new ExtensionObject(monitoredItem.Filter); } requestItems.Add(request); itemsToModify.Add(monitoredItem); } } if (requestItems.Count == 0) { return itemsToModify; } // modify the subscription. MonitoredItemModifyResultCollection results; DiagnosticInfoCollection diagnosticInfos; ResponseHeader responseHeader = m_session.ModifyMonitoredItems( null, m_id, m_timestampsToReturn, requestItems, out results, out diagnosticInfos); ClientBase.ValidateResponse(results, itemsToModify); ClientBase.ValidateDiagnosticInfos(diagnosticInfos, itemsToModify); // update results. for (int ii = 0; ii < results.Count; ii++) { itemsToModify[ii].SetModifyResult(requestItems[ii], results[ii], ii, diagnosticInfos, responseHeader); } m_changeMask |= SubscriptionChangeMask.ItemsCreated; ChangesCompleted(); // return the list of items affected by the change. return itemsToModify; } /// <summary> /// Deletes all items that have been marked for deletion. /// </summary> public IList<MonitoredItem> DeleteItems() { VerifySubscriptionState(true); if (m_deletedItems.Count == 0) { return new List<MonitoredItem>(); } List<MonitoredItem> itemsToDelete = m_deletedItems; m_deletedItems = new List<MonitoredItem>(); UInt32Collection monitoredItemIds = new UInt32Collection(); foreach (MonitoredItem monitoredItem in itemsToDelete) { monitoredItemIds.Add(monitoredItem.Status.Id); } StatusCodeCollection results; DiagnosticInfoCollection diagnosticInfos; ResponseHeader responseHeader = m_session.DeleteMonitoredItems( null, m_id, monitoredItemIds, out results, out diagnosticInfos); ClientBase.ValidateResponse(results, monitoredItemIds); ClientBase.ValidateDiagnosticInfos(diagnosticInfos, monitoredItemIds); // update results. for (int ii = 0; ii < results.Count; ii++) { itemsToDelete[ii].SetDeleteResult(results[ii], ii, diagnosticInfos, responseHeader); } m_changeMask |= SubscriptionChangeMask.ItemsDeleted; ChangesCompleted(); // return the list of items affected by the change. return itemsToDelete; } /// <summary> /// Deletes all items that have been marked for deletion. /// </summary> public List<ServiceResult> SetMonitoringMode( MonitoringMode monitoringMode, IList<MonitoredItem> monitoredItems) { if (monitoredItems == null) throw new ArgumentNullException("monitoredItems"); VerifySubscriptionState(true); if (monitoredItems.Count == 0) { return null; } // get list of items to update. UInt32Collection monitoredItemIds = new UInt32Collection(); foreach (MonitoredItem monitoredItem in monitoredItems) { monitoredItemIds.Add(monitoredItem.Status.Id); } StatusCodeCollection results; DiagnosticInfoCollection diagnosticInfos; ResponseHeader responseHeader = m_session.SetMonitoringMode( null, m_id, monitoringMode, monitoredItemIds, out results, out diagnosticInfos); ClientBase.ValidateResponse(results, monitoredItemIds); ClientBase.ValidateDiagnosticInfos(diagnosticInfos, monitoredItemIds); // update results. bool noErrors = true; List<ServiceResult> errors = new List<ServiceResult>(); for (int ii = 0; ii < results.Count; ii++) { ServiceResult error = null; if (StatusCode.IsBad(results[ii])) { error = ClientBase.GetResult(results[ii], ii, diagnosticInfos, responseHeader); noErrors = false; } else { monitoredItems[ii].MonitoringMode = monitoringMode; monitoredItems[ii].Status.SetMonitoringMode(monitoringMode); } errors.Add(error); } // raise state changed event. m_changeMask |= SubscriptionChangeMask.ItemsModified; ChangesCompleted(); // return null list if no errors occurred. if (noErrors) { return null; } return errors; } /// <summary> /// Adds the notification message to internal cache. /// </summary> public void SaveMessageInCache( IList<uint> availableSequenceNumbers, NotificationMessage message, IList<string> stringTable) { EventHandler callback = null; lock (m_cache) { if (availableSequenceNumbers != null) { m_availableSequenceNumbers = availableSequenceNumbers; } if (message == null) { return; } // check if a publish error was previously reported. if (PublishingStopped) { callback = m_PublishStatusChanged; TraceState("PUBLISHING RECOVERED"); } m_lastNotificationTime = DateTime.UtcNow; // save the string table that came with notification. message.StringTable = new List<string>(stringTable); // create queue for the first time. if (m_incomingMessages == null) { m_incomingMessages = new LinkedList<IncomingMessage>(); } // find or create an entry for the incoming sequence number. IncomingMessage entry = null; LinkedListNode<IncomingMessage> node = m_incomingMessages.Last; while (node != null) { entry = node.Value; LinkedListNode<IncomingMessage> previous = node.Previous; if (entry.SequenceNumber == message.SequenceNumber) { entry.Timestamp = DateTime.UtcNow; break; } if (entry.SequenceNumber < message.SequenceNumber) { entry = new IncomingMessage(); entry.SequenceNumber = message.SequenceNumber; entry.Timestamp = DateTime.UtcNow; m_incomingMessages.AddAfter(node, entry); break; } node = previous; entry = null; } if (entry == null) { entry = new IncomingMessage(); entry.SequenceNumber = message.SequenceNumber; entry.Timestamp = DateTime.UtcNow; m_incomingMessages.AddLast(entry); } // check for keep alive. if (message.NotificationData.Count > 0) { entry.Message = message; entry.Processed = false; } // fill in any gaps in the queue node = m_incomingMessages.First; while (node != null) { entry = node.Value; LinkedListNode<IncomingMessage> next = node.Next; if (next != null && next.Value.SequenceNumber > entry.SequenceNumber+1) { IncomingMessage placeholder = new IncomingMessage(); placeholder.SequenceNumber = entry.SequenceNumber+1; placeholder.Timestamp = DateTime.UtcNow; node = m_incomingMessages.AddAfter(node, placeholder); continue; } node = next; } // clean out processed values. node = m_incomingMessages.First; while (node != null) { entry = node.Value; LinkedListNode<IncomingMessage> next = node.Next; // can only pull off processed or expired messages. if (!entry.Processed && !(entry.Republished && entry.Timestamp.AddSeconds(10) < DateTime.UtcNow)) { break; } if (next != null) { m_incomingMessages.Remove(node); } node = next; } // process messages. ThreadPool.QueueUserWorkItem(OnMessageRecieved, null); } // send notification that publishing has recovered. if (callback != null) { try { callback(this, null); } catch (Exception e) { Utils.Trace(e, "Error while raising PublishStateChanged event."); } } } /// <summary> /// Processes the incoming messages. /// </summary> private void OnMessageRecieved(object state) { try { Session session = null; uint subscriptionId = 0; EventHandler callback = null; // get list of new messages to process. List<NotificationMessage> messagesToProcess = null; // get list of new messages to republish. List<IncomingMessage> messagesToRepublish = null; lock (m_cache) { for (LinkedListNode<IncomingMessage> ii = m_incomingMessages.First; ii != null; ii = ii.Next) { // update monitored items with unprocessed messages. if (ii.Value.Message != null && !ii.Value.Processed) { if (messagesToProcess == null) { messagesToProcess = new List<NotificationMessage>(); } messagesToProcess.Add(ii.Value.Message); // remove the oldest items. while (m_messageCache.Count > m_maxMessageCount) { m_messageCache.RemoveFirst(); } m_messageCache.AddLast(ii.Value.Message); ii.Value.Processed = true; } // check for missing messages. if (ii.Next != null && ii.Value.Message == null && !ii.Value.Processed && !ii.Value.Republished) { if (ii.Value.Timestamp.AddSeconds(2) < DateTime.UtcNow) { if (messagesToRepublish == null) { messagesToRepublish = new List<IncomingMessage>(); } messagesToRepublish.Add(ii.Value); ii.Value.Republished = true; } } } session = m_session; subscriptionId = m_id; callback = m_PublishStatusChanged; } if (callback != null) { try { callback(this, null); } catch (Exception e) { Utils.Trace(e, "Error while raising PublishStateChanged event."); } } // process new messages. if (messagesToProcess != null) { FastDataChangeNotificationEventHandler datachangeCallback = m_fastDataChangeCallback; FastEventNotificationEventHandler eventCallback = m_fastEventCallback; int noNotificationsReceived = 0; for (int ii = 0; ii < messagesToProcess.Count; ii++) { NotificationMessage message = messagesToProcess[ii]; noNotificationsReceived = 0; try { for (int jj = 0; jj < message.NotificationData.Count; jj++) { DataChangeNotification datachange = message.NotificationData[jj].Body as DataChangeNotification; if (datachange != null) { noNotificationsReceived += datachange.MonitoredItems.Count; if (!m_disableMonitoredItemCache) { SaveDataChange(message, datachange, message.StringTable); } if (datachangeCallback != null) { datachangeCallback(this, datachange, message.StringTable); } } EventNotificationList events = message.NotificationData[jj].Body as EventNotificationList; if (events != null) { noNotificationsReceived += events.Events.Count; if (!m_disableMonitoredItemCache) { SaveEvents(message, events, message.StringTable); } if (eventCallback != null) { eventCallback(this, events, message.StringTable); } } StatusChangeNotification statusChanged = message.NotificationData[jj].Body as StatusChangeNotification; if (statusChanged != null) { Utils.Trace("StatusChangeNotification received with Status = {0} for SubscriptionId={1}.", statusChanged.Status.ToString(), Id); } } } catch (Exception e) { Utils.Trace(e, "Error while processing incoming message #{0}.", message.SequenceNumber); } if (MaxNotificationsPerPublish != 0 && noNotificationsReceived > MaxNotificationsPerPublish) { Utils.Trace("For subscription {0}, more notifications were received={1} than the max notifications per publish value={2}", Id, noNotificationsReceived, MaxNotificationsPerPublish); } } } // do any re-publishes. if (messagesToRepublish != null && session != null && subscriptionId != 0) { for (int ii = 0; ii < messagesToRepublish.Count; ii++) { if (!session.Republish(subscriptionId, messagesToRepublish[ii].SequenceNumber)) { messagesToRepublish[ii].Republished = false; } } } } catch (Exception e) { Utils.Trace(e, "Error while processing incoming messages."); } } /// <summary> /// Adds an item to the subscription. /// </summary> public void AddItem(MonitoredItem monitoredItem) { if (monitoredItem == null) throw new ArgumentNullException("monitoredItem"); lock (m_cache) { if (m_monitoredItems.ContainsKey(monitoredItem.ClientHandle)) { return; } m_monitoredItems.Add(monitoredItem.ClientHandle, monitoredItem); monitoredItem.Subscription = this; } m_changeMask |= SubscriptionChangeMask.ItemsAdded; ChangesCompleted(); } /// <summary> /// Adds an item to the subscription. /// </summary> public void AddItems(IEnumerable<MonitoredItem> monitoredItems) { if (monitoredItems == null) throw new ArgumentNullException("monitoredItems"); bool added = false; lock (m_cache) { foreach (MonitoredItem monitoredItem in monitoredItems) { if (!m_monitoredItems.ContainsKey(monitoredItem.ClientHandle)) { m_monitoredItems.Add(monitoredItem.ClientHandle, monitoredItem); monitoredItem.Subscription = this; added = true; } } } if (added) { m_changeMask |= SubscriptionChangeMask.ItemsAdded; ChangesCompleted(); } } /// <summary> /// Removes an item from the subscription. /// </summary> public void RemoveItem(MonitoredItem monitoredItem) { if (monitoredItem == null) throw new ArgumentNullException("monitoredItem"); lock (m_cache) { if (!m_monitoredItems.Remove(monitoredItem.ClientHandle)) { return; } monitoredItem.Subscription = null; } if (monitoredItem.Status.Created) { m_deletedItems.Add(monitoredItem); } m_changeMask |= SubscriptionChangeMask.ItemsRemoved; ChangesCompleted(); } /// <summary> /// Removes an item from the subscription. /// </summary> public void RemoveItems(IEnumerable<MonitoredItem> monitoredItems) { if (monitoredItems == null) throw new ArgumentNullException("monitoredItems"); bool changed = false; lock (m_cache) { foreach (MonitoredItem monitoredItem in monitoredItems) { if (m_monitoredItems.Remove(monitoredItem.ClientHandle)) { monitoredItem.Subscription = null; if (monitoredItem.Status.Created) { m_deletedItems.Add(monitoredItem); } changed = true; } } } if (changed) { m_changeMask |= SubscriptionChangeMask.ItemsRemoved; ChangesCompleted(); } } /// <summary> /// Returns the monitored item identified by the client handle. /// </summary> public MonitoredItem FindItemByClientHandle(uint clientHandle) { lock (m_cache) { MonitoredItem monitoredItem = null; if (m_monitoredItems.TryGetValue(clientHandle, out monitoredItem)) { return monitoredItem; } return null; } } /// <summary> /// Tells the server to refresh all conditions being monitored by the subscription. /// </summary> public void ConditionRefresh() { VerifySubscriptionState(true); m_session.Call( ObjectTypeIds.ConditionType, MethodIds.ConditionType_ConditionRefresh, m_id); } #endregion #region Private Methods /// <summary> /// Throws an exception if the subscription is not in the correct state. /// </summary> private void VerifySubscriptionState(bool created) { if (created && m_id == 0) { throw new ServiceResultException(StatusCodes.BadInvalidState, "Subscription has not been created."); } if (!created && m_id != 0) { throw new ServiceResultException(StatusCodes.BadInvalidState, "Subscription has alredy been created."); } } /// <summary> /// Saves a data change in the monitored item cache. /// </summary> private void SaveDataChange(NotificationMessage message, DataChangeNotification notifications, IList<string> stringTable) { // check for empty monitored items list. if (notifications.MonitoredItems == null || notifications.MonitoredItems.Count == 0) { Utils.Trace("Publish response contains empty MonitoredItems list for SubscritpionId = {0}.", m_id); } for (int ii = 0; ii < notifications.MonitoredItems.Count; ii++) { MonitoredItemNotification notification = notifications.MonitoredItems[ii]; // lookup monitored item, MonitoredItem monitoredItem = null; lock (m_cache) { if (!m_monitoredItems.TryGetValue(notification.ClientHandle, out monitoredItem)) { Utils.Trace("Publish response contains invalid MonitoredItem.SubscritpionId = {0}, ClientHandle = {1}", m_id, notification.ClientHandle); continue; } } // save the message. notification.Message = message; // get diagnostic info. if (notifications.DiagnosticInfos.Count > ii) { notification.DiagnosticInfo = notifications.DiagnosticInfos[ii]; } // save in cache. monitoredItem.SaveValueInCache(notification); } } /// <summary> /// Saves events in the monitored item cache. /// </summary> private void SaveEvents(NotificationMessage message, EventNotificationList notifications, IList<string> stringTable) { for (int ii = 0; ii < notifications.Events.Count; ii++) { EventFieldList eventFields = notifications.Events[ii]; MonitoredItem monitoredItem = null; lock (m_cache) { if (!m_monitoredItems.TryGetValue(eventFields.ClientHandle, out monitoredItem)) { Utils.Trace("Publish response contains invalid MonitoredItem.SubscritpionId = {0}, ClientHandle = {1}", m_id, eventFields.ClientHandle); continue; } } // save the message. eventFields.Message = message; // save in cache. monitoredItem.SaveValueInCache(eventFields); } } #endregion #region Private Fields private string m_displayName; private int m_publishingInterval; private uint m_keepAliveCount; private uint m_lifetimeCount; private uint m_minLifetimeInterval; private uint m_maxNotificationsPerPublish; private bool m_publishingEnabled; private byte m_priority; private TimestampsToReturn m_timestampsToReturn; private List<MonitoredItem> m_deletedItems; private event SubscriptionStateChangedEventHandler m_StateChanged; private MonitoredItem m_defaultItem; private SubscriptionChangeMask m_changeMask; private Session m_session; private object m_handle; private uint m_id; private double m_currentPublishingInterval; private uint m_currentKeepAliveCount; private uint m_currentLifetimeCount; private bool m_currentPublishingEnabled; private byte m_currentPriority; private Timer m_publishTimer; private DateTime m_lastNotificationTime; private int m_publishLateCount; private event EventHandler m_PublishStatusChanged; private object m_cache = new object(); private LinkedList<NotificationMessage> m_messageCache; private IList<uint> m_availableSequenceNumbers; private int m_maxMessageCount; private SortedDictionary<uint,MonitoredItem> m_monitoredItems; private bool m_disableMonitoredItemCache; private FastDataChangeNotificationEventHandler m_fastDataChangeCallback; private FastEventNotificationEventHandler m_fastEventCallback; /// <summary> /// A message received from the server cached until is processed or discarded. /// </summary> private class IncomingMessage { public uint SequenceNumber; public DateTime Timestamp; public NotificationMessage Message; public bool Processed; public bool Republished; } private LinkedList<IncomingMessage> m_incomingMessages; private static long s_globalSubscriptionCounter; #endregion } #region SubscriptionChangeMask Enumeration /// <summary> /// Flags indicating what has changed in a subscription. /// </summary> [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1714:FlagsEnumsShouldHavePluralNames"), Flags] public enum SubscriptionChangeMask { /// <summary> /// The subscription has not changed. /// </summary> None = 0x00, /// <summary> /// The subscription was created on the server. /// </summary> Created = 0x01, /// <summary> /// The subscription was deleted on the server. /// </summary> Deleted = 0x02, /// <summary> /// The subscription was modified on the server. /// </summary> Modified = 0x04, /// <summary> /// Monitored items were added to the subscription (but not created on the server) /// </summary> ItemsAdded = 0x08, /// <summary> /// Monitored items were removed to the subscription (but not deleted on the server) /// </summary> ItemsRemoved = 0x10, /// <summary> /// Monitored items were created on the server. /// </summary> ItemsCreated = 0x20, /// <summary> /// Monitored items were deleted on the server. /// </summary> ItemsDeleted = 0x40, /// <summary> /// Monitored items were modified on the server. /// </summary> ItemsModified = 0x80 } #endregion /// <summary> /// The delegate used to receive data change notifications via a direct function call instead of a .NET Event. /// </summary> public delegate void FastDataChangeNotificationEventHandler(Subscription subscription, DataChangeNotification notification, IList<string> stringTable); /// <summary> /// The delegate used to receive event notifications via a direct function call instead of a .NET Event. /// </summary> public delegate void FastEventNotificationEventHandler(Subscription subscription, EventNotificationList notification, IList<string> stringTable); #region SubscriptionStateChangedEventArgs Class /// <summary> /// The event arguments provided when the state of a subscription changes. /// </summary> public class SubscriptionStateChangedEventArgs : EventArgs { #region Constructors /// <summary> /// Creates a new instance. /// </summary> internal SubscriptionStateChangedEventArgs(SubscriptionChangeMask changeMask) { m_changeMask = changeMask; } #endregion #region Public Properties /// <summary> /// The changes that have affected the subscription. /// </summary> public SubscriptionChangeMask Status { get { return m_changeMask; } } #endregion #region Private Fields private SubscriptionChangeMask m_changeMask; #endregion } /// <summary> /// The delegate used to receive subscription state change notifications. /// </summary> public delegate void SubscriptionStateChangedEventHandler(Subscription subscription, SubscriptionStateChangedEventArgs e); #endregion /// <summary> /// A collection of subscriptions. /// </summary> [CollectionDataContract(Name = "ListOfSubscription", Namespace = Namespaces.OpcUaXsd, ItemName = "Subscription")] public partial class SubscriptionCollection : List<Subscription> { #region Constructors /// <summary> /// Initializes an empty collection. /// </summary> public SubscriptionCollection() {} /// <summary> /// Initializes the collection from another collection. /// </summary> /// <param name="collection">The existing collection to use as the basis of creating this collection</param> public SubscriptionCollection(IEnumerable<Subscription> collection) : base(collection) {} /// <summary> /// Initializes the collection with the specified capacity. /// </summary> /// <param name="capacity">The max. capacity of the collection</param> public SubscriptionCollection(int capacity) : base(capacity) {} #endregion } }