class documentation

class NotificationManager(object):

View In Hierarchy

This class defines functionalities to handle subscriptions and notifications.

Method addCrossResourceSubscription Add a new crossResourceSubscription.
Method addSubscription Add a new subscription.
Method checkOperationSubscription Check for and perform a notification for an operation subscription.
Method checkPerformBlockingRetrieve Perform a blocking RETRIEVE. If this notification event type is configured a RETRIEVE operation to a resource causes a notification to a target. It is expected that the target is updating the resource ...
Method checkPerformBlockingUpdate Check for and perform a blocking update request for resource updates that have this event type configured.
Method checkSubscriptions Check and handle resource events.
Method countNotificationEvents This method count and stores the number of notification events for a subscription. It increments the count for each of the notification targets.
Method countSentReceivedNotification If Notification Stats are enabled for a <sub> or <crs> resource, then increase the count for sent notifications or received responses.
Method evaluatePolicyDeletionRule Evaluate a policy deletion rule.
Method getSubscriptionsByNetChty Returns a (possible empty) list of subscriptions for a resource.
Method initialize Initialization of a NotificationManager instance.
Method receivedCrossResourceSubscriptionNotification Handle a received notification for a <crs> resource.
Method removeCrossResourceSubscription Remove a crossResourcesubscription.
Method removeNotificationTarget Remove a notification target from a subscription and perform the necessary actions based on the associated policies.
Method removeSubscription Remove a subscription.
Method restart Restart the NotificationManager service.
Method sendDeletionNotification Send a Deletion Notification to a single or a list of target.
Method sendNotificationWithDict Send a notification to a single URI or a list of URIs.
Method sendVerificationRequest Define the callback function for verification notifications and send the notification.
Method shutdown Shutdown the Notification Manager.
Method startCRSPeriodicWindow Start a periodic window for a <crs> resource.
Method startCRSSlidingWindow Start a sliding window for a <crs> resource.
Method stopCRSPeriodicWindow Stop a periodic window for a <crs> resource.
Method stopCRSSlidingWindow Stop a sliding window for a <crs> resource.
Method updateCrossResourceSubscription Update a crossResourcesubscription.
Method updateOfNSEAttribute Handle an update of the notificationStatsEnable attribute of a <sub> or <crs> resource.
Method updateSubscription Update a subscription.
Method validateAndConstructNotificationStatsInfo Update and fill the notificationStatsInfo attribute of a <sub> or <crs> resource.
Class Variable __slots__ Slots for NotificationManager instance attributes.
Class Variable dispatcher Injected Dispatcher instance.
Class Variable request Injected RequestManager instance.
Class Variable storage Injected Storage instance.
Instance Variable lockBatchNotification An internal lock instance for locking certain batch notification methods.
Instance Variable lockNotificationEventStats An internal lock instance for locking the notification event stats.
Method _crsCheckForNotification Test whether a notification must be sent for a a <crs> window.
Method _crsPeriodicWindowMonitor Check for a <crs> periodic window and send notifications if the window requirements are met.
Method _crsSlidingWindowMonitor Actor function to monitor a sliding window for a <crs> resource.
Method _flushBatchNotifications Send and remove any outstanding batch notifications for a subscription.
Method _getPeriodicWorkerName Return the name of a periodic window worker.
Method _getSlidingWorkerName Return the name of a sliding window worker.
Method _handleSubscriptionNotification Send a subscription notification.
Method _sendNotification Send a notification to a single or to multiple targets if necessary.
Method _sendSubscriptionAggregatedBatchNotification Send and remove(!) the available BatchNotifications for an ri & nu.
Method _startNewBatchNotificationWorker Start a new batch notification worker.
Method _stopNotificationBatchWorker Stop a batch notification worker for a given ri and nu.
Method _storeBatchNotification Store a subscription's notification for later sending. For a single nu.
Method _verifyNusInSubscription Check all the notification URI's in a subscription.
Method _workerID Return an ID for a batch notification background worker.
Instance Variable _eventNotification Cached reference to the notification event for optimized access.
def addCrossResourceSubscription(self, crs: CRS, originator: str):

Add a new crossResourceSubscription.

Check each receipient in the nu attribute with verification requests.

Parameters
crs:CRSThe new <crs> resource to check.
originator:strThe request originator.
Returns
Result object.
def addSubscription(self, subscription: SUB, originator: str):

Add a new subscription.

Check each receipient with verification requests.

Parameters
subscription:SUBThe new <sub> resource.
originator:strThe request originator.
Raises
INTERNAL_SERVER_ERRORIn case there is an internal DB error.
def checkOperationSubscription(self, resource: Resource, op: Operation, originator: str):

Check for and perform a notification for an operation subscription.

Parameters
resource:ResourceThe resource that received the event resp. request.
op:OperationThe operation to check.
originator:strThe originator of the request that caused the event.
Raises
INTERNAL_SERVER_ERRORIf there are issues retrieving the necessary resources or if multiple default NTP resources are found.
def checkPerformBlockingRetrieve(self, resource: Resource, request: CSERequest, finished: Callable | None = None):

Perform a blocking RETRIEVE. If this notification event type is configured a RETRIEVE operation to a resource causes a notification to a target. It is expected that the target is updating the resource before responding to the notification.

A NOTIFY permission check is done against the originator of the <subscription> resource, not the originator of the request.

Note

This functionality is experimental and not part of the oneM2M spec yet.

Parameters
resource:ResourceThe resource that is the target of the RETRIEVE request.
request:CSERequestThe original request.
finished:Callable | NoneCallable that is called when the notifications were successfully sent and received.
Returns
Result instance indicating success or failure.
def checkPerformBlockingUpdate(self, resource: Resource, originator: str, updatedAttributes: JSON, finished: Callable | None = None):

Check for and perform a blocking update request for resource updates that have this event type configured.

Parameters
resource:ResourceThe updated resource.
originator:strThe originator of the original request.
updatedAttributes:JSONA structure of all the updated attributes.
finished:Callable | NoneCallable that is called when the notifications were successfully sent and a response was received.
def checkSubscriptions(self, resource: Resource | None, reason: NotificationEventType, originator: str, childResource: Resource | None = None, modifiedAttributes: JSON | None = None, ri: str | None = None, missingData: dict[str, MissingData] | None = None, operation: Operation | None = None):

Check and handle resource events.

This method looks for subscriptions of a resource and tests, whether an event, like update etc, will lead to a notification. It then creates and sends the notification to all targets.

Parameters
resource:Resource | NoneThe resource that received the event resp. request.
reason:NotificationEventTypeThe NotificationEventType to check.
originator:strThe originator of the request that caused the event.
childResource:Resource | NoneAn optional child resource of resource that might be updated or created etc.
modifiedAttributes:JSON | NoneAn optional JSON structure that contains updated attributes.
ri:str | NoneOptionally provided resource ID of Resource. If it is provided, then resource might be None. It will then be used to retrieve the resource.
missingData:dict[str, MissingData] | NoneAn optional dictionary of missing data structures in case the TimeSeries missing data functionality is handled.
operation:Operation | NoneAn optional operation that is checked against the subscription's operationMonitor. This overrides the reason.
def countNotificationEvents(self, ri: str, sub: SUB | CRS | None = None):

This method count and stores the number of notification events for a subscription. It increments the count for each of the notification targets.

After handling the resource is updated in the database.

Parameters
ri:strResource ID of a <sub> or <csr> resource to handle.
sub:SUB | CRS | NoneUndocumented
def countSentReceivedNotification(self, sub: SUB | CRS, target: str, isResponse: bool | None = False, count: int | None = 1):

If Notification Stats are enabled for a <sub> or <crs> resource, then increase the count for sent notifications or received responses.

Parameters
sub:SUB | CRS<sub> or <crs> resource.
target:strURI of the notification target.
isResponse:bool | NoneIndicates whether a sent notification or a received response should be counted for.
count:int | NoneNumber of notifications to count.
def evaluatePolicyDeletionRule(self, pdr: PDR) -> bool:

Evaluate a policy deletion rule.

Parameters
pdr:PDRThe <policyDeletionRules> resource.
Returns
boolTrue if the rule is satisfied, False otherwise.
def getSubscriptionsByNetChty(self, ri: str, net: list[NotificationEventType] | None = None, chty: ResourceTypes | None = None) -> JSONLIST:

Returns a (possible empty) list of subscriptions for a resource.

Parameters
ri:strUndocumented
net:list[NotificationEventType] | Noneoptional filter for enc/net
chty:ResourceTypes | Noneoptional single child resource typ
Returns
JSONLISTList of storage subscription documents, NOT Subscription resources.
def initialize(self):

Initialization of a NotificationManager instance.

def receivedCrossResourceSubscriptionNotification(self, sur: str, crs: Resource):

Handle a received notification for a <crs> resource.

Parameters
sur:strThe notification source received in the notification.
crs:ResourceThe <crs> resource for which the notification was received.
def removeCrossResourceSubscription(self, crs: CRS):

Remove a crossResourcesubscription.

Send a deletion request to the subscriberURI target.

Parameters
crs:CRSThe new <crs> resource to remove.
Returns
Result object.
def removeNotificationTarget(self, ntsr: NTSR, originator: str):

Remove a notification target from a subscription and perform the necessary actions based on the associated policies.

Parameters
ntsr:NTSRThe notification target subscription resource.
originator:strThe originator to be removed.
Raises
ORIGINATOR_HAS_NO_PRIVILEGEIf the policies evaluated to false for the given originator and the notification target subscription resource.
INTERNAL_SERVER_ERRORIf there are issues retrieving the necessary resources or if multiple default NTP resources are found.
def removeSubscription(self, subscription: SUB | CRS, originator: str):

Remove a subscription.

Send the deletion notifications, if possible.

Parameters
subscription:SUB | CRSThe <sub> resource to remove.
originator:strUndocumented
Returns
Result object.
@onEvent(eventManager.cseReset)
def restart(self, eventData: EventData):

Restart the NotificationManager service.

Parameters
eventData:EventDataThe event data. Not used in this handler.
def sendDeletionNotification(self, uri: str | list[str], ri: str, creator: str = None) -> bool:

Send a Deletion Notification to a single or a list of target.

Parameters
uri:str | list[str]Single or a list of notification target URIs.
ri:strResourceID of the subscription.
creator:strUndocumented
Returns
boolBoolean indicat
def sendNotificationWithDict(self, dct: JSON, nus: list[str] | str, originator: str | None = None, background: bool | None = False, preFunc: Callable | None = None, postFunc: Callable | None = None) -> RequestResponseList:

Send a notification to a single URI or a list of URIs.

A URI may be a resource ID, then the poa of that resource is taken. Also, the serialization is determined when each of the notifications is actually sent.

Pre- and post-functions can be given that are called before and after sending each notification.

Parameters
dct:JSONDictionary to send as the notification. It already contains the full request.
nus:list[str] | strA single URI or a list of URIs.
originator:str | NoneThe originator on which behalf to send the notification.
background:bool | NoneSend the notifications in a background task.
preFunc:Callable | NoneFunction that is called before each notification sending, with the notification target as a single argument.
postFunc:Callable | NoneFunction that is called after each notification sending, with the notification target as a single argument.
Returns
RequestResponseListA list of results for each notification sent. An empty list is returned if background is set to True.
def sendVerificationRequest(self, uri: str | list[str], ri: str, originator: str | None = None) -> bool:

Define the callback function for verification notifications and send the notification.

Parameters
uri:str | list[str]The URI to send the verification request to. This may be a list of URI's. Each URI could be a direct URL, or an entity.
ri:strThe resource ID of the subscription.
originator:str | NoneThe originator on which behalf to send the notification.
Returns
boolUndocumented
def shutdown(self) -> bool:

Shutdown the Notification Manager.

Returns
boolBoolean that indicates the success of the operation
def startCRSPeriodicWindow(self, crsRi: str, tws: str, expectedCount: int, eem: EventEvaluationMode = EventEvaluationMode.ALL_EVENTS_PRESENT):

Start a periodic window for a <crs> resource.

Parameters
crsRi:strResource ID of the <crs> resource for which to start the periodic window.
tws:strTime window size as a duration string.
expectedCount:intThe expected number of notifications.
eem:EventEvaluationModeEvent evaluation mode.
def startCRSSlidingWindow(self, crsRi: str, tws: str, sur: str, subCount: int, eem: EventEvaluationMode = EventEvaluationMode.ALL_EVENTS_PRESENT) -> BackgroundWorker:

Start a sliding window for a <crs> resource.

Parameters
crsRi:strResource ID of the <crs> resource for which to start the sliding window.
tws:strTime window size as a duration string.
sur:strThe notification source.
subCount:intThe expected number of notifications.
eem:EventEvaluationModeEvent evaluation mode.
Returns
BackgroundWorkerThe background worker handling the sliding window.
def stopCRSPeriodicWindow(self, crsRi: str):

Stop a periodic window for a <crs> resource.

Parameters
crsRi:strResource ID of the <crs> resource for which to stop the periodic window.
def stopCRSSlidingWindow(self, crsRi: str):

Stop a sliding window for a <crs> resource.

Parameters
crsRi:strResource ID of the <crs> resource for which to stop the sliding window.
def updateCrossResourceSubscription(self, crs: CRS, previousNus: list[str], originator: str):

Update a crossResourcesubscription.

Check each new receipient in the nu attribute with verification requests.

This method indirectly updates or rebuild the notificationStatsInfo attribute. It should be called add the end when updating a subscription.

Parameters
crs:CRSThe new <crs> resource to check.
previousNus:list[str]A list of the resource's previous NUs.
originator:strThe request originator.
Returns
Result object.
def updateOfNSEAttribute(self, sub: CRS | SUB, newNse: bool):

Handle an update of the notificationStatsEnable attribute of a <sub> or <crs> resource.

Note

This removes the notificationStatsEnable attribute, which must be added and filled later again, e.g. when validating the notificationURIs attribute. For this the notificationURIs attribute must be fully validated first.

Parameters
sub:CRS | SUBEither a <sub> or <crs> resource.
newNse:boolThe new value for the nse attribute. This may be empty if not present in the update.
def updateSubscription(self, subscription: SUB, previousNus: list[str], originator: str):

Update a subscription.

This method indirectly updates or rebuild the notificationStatsInfo attribute. It should be called add the end when updating a subscription.

Parameters
subscription:SUBThe <sub> resource to update.
previousNus:list[str]List of previous NUs of the same <sub> resoure.
originator:strThe request originator.
Returns
Result object.
def validateAndConstructNotificationStatsInfo(self, sub: SUB | CRS, add: bool | None = True):

Update and fill the notificationStatsInfo attribute of a <sub> or <crs> resource.

This method adds, if necessary, the necessarry stat info structures for each notification URI. It also removes structures for notification URIs that are not present anymore.

Note

For this the notificationURIs attribute must be fully validated first.

Parameters
sub:SUB | CRSThe <sub> or <crs> resource for whoich to validate the attribute.
add:bool | NoneIf True, add the notificationStatsInfo attribute if not present.
__slots__: tuple[str, ...] =

Slots for NotificationManager instance attributes.

dispatcher: Dispatcher =

Injected Dispatcher instance.

request: RequestManager =

Injected RequestManager instance.

storage: Storage =

Injected Storage instance.

lockBatchNotification =

An internal lock instance for locking certain batch notification methods.

lockNotificationEventStats =

An internal lock instance for locking the notification event stats.

def _crsCheckForNotification(self, data: list[str], crsRi: str, subCount: int, eem: EventEvaluationMode = EventEvaluationMode.ALL_EVENTS_PRESENT):

Test whether a notification must be sent for a a <crs> window.

This method also sends the notification(s) if the window requirements are met.

Parameters
data:list[str]List of unique resource IDs.
crsRi:strThe resource ID of the <crs> resource for the window.
subCount:intMaximum number of expected resource IDs in data.
eem:EventEvaluationModeEventEvaluationMode.
def _crsPeriodicWindowMonitor(self, _data: list[str], _worker: BackgroundWorker, crsRi: str, expectedCount: int, eem: EventEvaluationMode = EventEvaluationMode.ALL_EVENTS_PRESENT) -> bool:

Check for a <crs> periodic window and send notifications if the window requirements are met.

Parameters
_data:list[str]List of notification sources.
_worker:BackgroundWorkerThe background worker handling this periodic window.
crsRi:strResource ID of the <crs>.
expectedCount:intThe expected number of notifications.
eem:EventEvaluationModeEvent evaluation mode.
Returns
boolAlways returns True.
def _crsSlidingWindowMonitor(self, _data: Any, _worker: BackgroundWorker, crsRi: str, subCount: int, eem: EventEvaluationMode = EventEvaluationMode.ALL_EVENTS_PRESENT) -> bool:

Actor function to monitor a sliding window for a <crs> resource.

Parameters
_data:AnyList of notification sources.
_worker:BackgroundWorkerThe background worker handling this sliding window.
crsRi:strResource ID of the <crs>.
subCount:intThe expected number of notifications.
eem:EventEvaluationModeEvent evaluation mode.
Returns
boolAlways returns True.
def _flushBatchNotifications(self, subscription: Resource):

Send and remove any outstanding batch notifications for a subscription.

def _getPeriodicWorkerName(self, ri: str) -> str:

Return the name of a periodic window worker.

Parameters
ri:strResource ID for which the worker is running.
Returns
strString with the worker name.
def _getSlidingWorkerName(self, ri: str) -> str:

Return the name of a sliding window worker.

Parameters
ri:strResource ID for which the worker is running.
Returns
strString with the worker name.
def _handleSubscriptionNotification(self, sub: JSON, notificationEventType: NotificationEventType, resource: Resource | None = None, modifiedAttributes: JSON | None = None, missingData: MissingData | None = None, asynchronous: bool = False, operationMonitor: OperationMonitor | None = None, originator: str = None) -> bool:

Send a subscription notification.

Parameters
sub:JSONThe <sub> resource.
notificationEventType:NotificationEventTypeThe notification event type.
resource:Resource | NoneThe resource that triggered the notification.
modifiedAttributes:JSON | NoneThe modified attributes of the resource.
missingData:MissingData | NoneThe missing data of the resource.
asynchronous:boolIf True, send the notification in the background.
operationMonitor:OperationMonitor | NoneThe operationMonitor information.
originator:strThe originator on which behalf to send the notification.
Returns
boolTrue if the notification was sent successfully, False otherwise.
def _sendNotification(self, uris: str | list[str], senderFunction: SenderFunction) -> bool:

Send a notification to a single or to multiple targets if necessary.

Call the infividual callback functions to do the resource preparation and the the actual sending.

Parameters
uris:str | list[str]Either a string or a list of strings of notification receivers.
senderFunction:SenderFunctionA function that is called to perform the actual notification sending.
Returns
boolReturns True, even when nothing was sent, and False when any senderFunction returned False.
def _sendSubscriptionAggregatedBatchNotification(self, ri: str, nu: str, ln: bool, sub: JSON) -> bool:

Send and remove(!) the available BatchNotifications for an ri & nu.

While the sent notifications and the respective received responses are counted here, the expiration counter is not. It depends on the events, not the notifications.

Parameters
ri:strResource ID of the <sub> or <crs> resource.
nu:strA single notification URI.
ln:boollatestNotify, if True then only send the latest notification.
sub:JSONThe internal sub structure.
Returns
boolIndication of the success of the sending.
def _startNewBatchNotificationWorker(self, ri: str, nu: str, ln: bool, sub: JSON, dur: float) -> bool:

Start a new batch notification worker.

Parameters
ri:strResource ID of the subscription.
nu:strNotification URI of the notification target.
ln:boollatestNotify, if True then only send the latest notification.
sub:JSONThe internal sub structure.
dur:floatDuration for the batch notification.
Returns
boolIndication of the success of the operation.
def _stopNotificationBatchWorker(self, ri: str, nu: str):

Stop a batch notification worker for a given ri and nu.

Parameters
ri:strResource ID of the subscription.
nu:strNotification URI of the notification target.
def _storeBatchNotification(self, nu: str, sub: JSON, notificationRequest: JSON) -> bool:

Store a subscription's notification for later sending. For a single nu.

def _verifyNusInSubscription(self, subscription: SUB | CRS, previousNus: list[str] | None = None, originator: str | None = None):

Check all the notification URI's in a subscription.

A verification request is sent to new URI's. Notifications to the originator are not sent.

If previousNus is given then only new nus are notified.

Parameters
subscription:SUB | CRS<sub> or <crs> resource.
previousNus:list[str] | NoneThe list of previous NUs.
originator:str | NoneThe originator on which behalf to send the notification.
Raises
SUBSCRIPTION_VERIFICATION_INITIATION_FAILEDIn case a subscription verification fails.
def _workerID(self, ri: str, nu: str) -> str:

Return an ID for a batch notification background worker.

Parameters
ri:strResourceID of a subscription.
nu:strNotification URI of a notification target.
Returns
strString with the ID.
_eventNotification =

Cached reference to the notification event for optimized access.