class documentation

class SecurityManager(object):

View In Hierarchy

This manager entity handles access to resources and requests.

Method checkAcpiUpdatePermission Check whether this is actually a correct update of the acpi attribute, and whether this is actually allowed.
Method checkACPSelfPermission Check whether an originator has the requested permissions to the ACP resource itself.
Method checkSingleACPPermission Check whether an originator has the requested permissions with this ACP.
Method configUpdate Handle configuration updates.
Method getPOACredentialsForCSEID Return the credentials for the Point of Access (POA) for the given CSE-ID. These credentials are used by the registrar CSE to access this hosting CSE, or by the hosting CSE to access the registrar CSE. ...
Method getSSLContextCoAP Depending on the configuration whether to use DTLS, this method creates a new SSLContext from the configured certificates and returns it. If TLS is disabled then None is returned.
Method getSSLContextHttp Depending on the configuration whether to use TLS, this method creates a new SSLContext from the configured certificates and returns it. If TLS is disabled then None is returned.
Method getSSLContextWs Depending on the configuration whether to use TLS, this method creates a new SSLContext from the configured certificates and returns it. If TLS is disabled then None is returned.
Method hasAccess Test whether an originator has access to a resource for the requested permission.
Method hasAccessToPollingChannel Check whether the originator has access to the PCU resource. This should be done to check the parent PCH, but the originator would be the same as the PCU, so we can optimize this a bit.
Method initialize Initialize the SecurityManager.
Method isAEOriginator Check whether the provided originator could be an AE-ID.
Method isAllowedOriginator Check whether an Originator is in the provided list of allowed originators. This list may contain regex.
Method restart Restart the Security manager service.
Method shutdown Shutdown the SecurityManager.
Method validateHttpBasicAuth Validate the provided username and password against the configured HTTP basic authentication file.
Method validateHttpTokenAuth Validate the provided token against the configured HTTP token authentication file.
Method validateWSBasicAuth Validate the provided username and password against the configured WebSocket basic authentication file.
Method validateWSTokenAuth Validate the provided token against the configured WebSocket token authentication file.
Class Variable __slots__ Slots for SecurityManager class.
Class Variable dispatcher Injected Dispatcher instance.
Class Variable httpServer The injected HttpServer plugin instance.
Class Variable storage Injected Storage instance.
Class Variable websocketServer The injected WebSocketServer plugin instance.
Instance Variable allowedCSIOriginators List of allowed CSE originators that are allowed to access the CSEBase resource.
Instance Variable httpBasicAuthData Dictionary to store the HTTP Basic Authentication data, mapping originators to their hashed passwords.
Instance Variable httpTokenAuthData List to store the HTTP Token Authentication data.
Instance Variable wsBasicAuthData Dictionary to store the WebSocket Basic Authentication data, mapping originators to their hashed passwords.
Instance Variable wsTokenAuthData List to store the WebSocket Token Authentication data.
Method _checkAcor Check whether an originator is in the list of acor entries.
Method _getContext Depending on the configuration whether to use TLS, this method creates a new SSLContext from the configured certificates and returns it. If TLS is disabled then None is returned.
Method _initAuthInformation Initialize the authentication information by reading the configured authentication files for HTTP and WebSocket.
Method _readHttpBasicAuthFile Read the HTTP basic authentication file and store the data in a dictionary. The authentication information is stored as username:password.
Method _readHttpTokenAuthFile Read the HTTP token authentication file and store the data in a dictionary. The authentication information is stored as a single token per line.
Method _readWSBasicAuthFile Read the WebSocket basic authentication file and store the data in a dictionary. The authentication information is stored as username:password.
Method _readWSTokenAuthFile Read the WebSocket token authentication file and store the data in a dictionary. The authentication information is stored as a single token per line.
def checkAcpiUpdatePermission(self, request: CSERequest, targetResource: Resource, originator: str) -> bool:

Check whether this is actually a correct update of the acpi attribute, and whether this is actually allowed.

Raises
BAD_REQUEST: If the acpi attribute is not the only attribute in an UPDATE request. ORIGINATOR_HAS_NO_PRIVILEGE: If the originator has no access.
Parameters
request:CSERequestThe original request.
targetResource:ResourceThe request target.
originator:strThe request originator.
Returns
boolBoolean value. True indicates that this is an ACPI update. False indicates that this NOT an ACPI update. if no access is provided then an exception is raised.
def checkACPSelfPermission(self, acp: ACP | ACPAnnc, originator: str, requestedPermission: Permission) -> bool:

Check whether an originator has the requested permissions to the ACP resource itself.

Parameters
acp:ACP | ACPAnncUndocumented
originator:strThe originator to test the permissions for.
requestedPermission:PermissionThe permissions to test.
Returns
boolIf any of the configured accessControlRules of the ACP resource matches, then the originatorhas access, and True is returned, or False otherwise.
def checkSingleACPPermission(self, acp: ACP, originator: str, requestedPermission: Permission, ty: ResourceTypes, context: str | None = 'pv', request: CSERequest | None = None) -> ACPResult:

Check whether an originator has the requested permissions with this ACP.

Parameters
acp:ACPThe ACP resource to check.
originator:strThe originator to test the permissions for.
requestedPermission:PermissionThe permissions to test.
ty:ResourceTypesIf the resource type is given then it is checked for CREATE (as an allowed child resource type), otherwise as an allowed resource type.
context:str | NoneThe context to check. Default is 'pv'.
request:CSERequest | NoneUndocumented
Returns
ACPResultIf any of the configured accessControlRules of the ACP resource matches, then the originatorhas access, and True is returned, or False otherwise. Additionally, a list of accessControlAttributes combined is returned.
@onEvent(eventManager.configUpdate)
def configUpdate(self, eventData: EventData):

Handle configuration updates.

Parameters
eventData:EventDataThe event data, containing the name of the updated configuration setting and its new value.
def getPOACredentialsForCSEID(self, registrarConfig: CSERegistrar, cseID: str | None = None, binding: BindingType | None = BindingType.HTTP) -> tuple[str, str] | None:

Return the credentials for the Point of Access (POA) for the given CSE-ID. These credentials are used by the registrar CSE to access this hosting CSE, or by the hosting CSE to access the registrar CSE. The credentials are used in the POA attribute. Currently, only HTTP and WS bindings are supported.

Parameters
registrarConfig:CSERegistrarThe CSERegistrar configuration to use.
cseID:str | NoneThe CSE-ID to get the credentials for.
binding:BindingType | NoneThe binding type to get the credentials for. Default is HTTP.
Returns
tuple[str, str] | NoneA tuple with the username and password or token for the given CSE-ID and binding.
def getSSLContextCoAP(self) -> ssl.SSLContext:

Depending on the configuration whether to use DTLS, this method creates a new SSLContext from the configured certificates and returns it. If TLS is disabled then None is returned.

This method is used for CoAP connections.

Returns
ssl.SSLContextSSL / TLD context.
def getSSLContextHttp(self) -> ssl.SSLContext:

Depending on the configuration whether to use TLS, this method creates a new SSLContext from the configured certificates and returns it. If TLS is disabled then None is returned.

This method is used for HTTP connections.

Returns
ssl.SSLContextSSL / TLD context.
def getSSLContextWs(self) -> ssl.SSLContext:

Depending on the configuration whether to use TLS, this method creates a new SSLContext from the configured certificates and returns it. If TLS is disabled then None is returned.

This method is used for WebSocket connections.

Returns
ssl.SSLContextSSL / TLD context.
def hasAccess(self, originator: str, resource: Resource, requestedPermission: Permission, ty: ResourceTypes | None = None, parentResource: Resource | None = None, request: CSERequest | None = None, resultResource: Resource | None = None) -> bool:

Test whether an originator has access to a resource for the requested permission.

Parameters
originator:strThe originator to check for.
resource:ResourceThe target resource of a request.
requestedPermission:PermissionThe persmission to test.
ty:ResourceTypes | NoneMandatory for CREATE, else optional. The type of the resoure that is about to be created.
parentResource:Resource | NoneOptional, the parent resource of a target resource.
request:CSERequest | NoneUndocumented
resultResource:Resource | NoneUndocumented
Returns
boolBoolean indicating access.
def hasAccessToPollingChannel(self, originator: str, resource: PCH | PCH_PCU) -> bool:

Check whether the originator has access to the PCU resource. This should be done to check the parent PCH, but the originator would be the same as the PCU, so we can optimize this a bit.

Parameters
originator:strThe request originator
resource:PCH | PCH_PCUEither a PCH or PCU resource
Returns
boolBoolean indicating the result.
def initialize(self):

Initialize the SecurityManager.

def isAEOriginator(self, originator: str) -> bool:

Check whether the provided originator could be an AE-ID.

Parameters
originator:strThe request originator.
Returns
boolBoolean indicating the result.
def isAllowedOriginator(self, originator: str, allowedOriginators: list[str]) -> bool:

Check whether an Originator is in the provided list of allowed originators. This list may contain regex.

The hosting CSE has always access.

Parameters
originator:strThe request originator.
allowedOriginators:list[str]A list of allowed originators, which may include regex.
Returns
boolBoolean value indicating the result.
@onEvent(eventManager.cseReset)
def restart(self, eventData: EventData):

Restart the Security manager service.

def shutdown(self) -> bool:

Shutdown the SecurityManager.

Returns
boolAlways True.
def validateHttpBasicAuth(self, username: str, password: str) -> bool:

Validate the provided username and password against the configured HTTP basic authentication file.

Parameters
username:strThe username to validate.
password:strThe password to validate.
Returns
boolBoolean indicating the result.
def validateHttpTokenAuth(self, token: str) -> bool:

Validate the provided token against the configured HTTP token authentication file.

Parameters
token:strThe token to validate.
Returns
boolBoolean indicating the result.
def validateWSBasicAuth(self, username: str, password: str) -> bool:

Validate the provided username and password against the configured WebSocket basic authentication file.

Parameters
username:strThe username to validate.
password:strThe password to validate.
Returns
boolBoolean indicating the result.
def validateWSTokenAuth(self, token: str) -> bool:

Validate the provided token against the configured WebSocket token authentication file.

Parameters
token:strThe token to validate.
Returns
boolBoolean indicating the result.
__slots__: tuple[str, ...] =

Slots for SecurityManager class.

dispatcher: Dispatcher =

Injected Dispatcher instance.

httpServer: HttpServer =

The injected HttpServer plugin instance.

storage: Storage =

Injected Storage instance.

websocketServer: WebSocketServer =

The injected WebSocketServer plugin instance.

allowedCSIOriginators: list[str] =

List of allowed CSE originators that are allowed to access the CSEBase resource.

httpBasicAuthData: dict[str, str] =

Dictionary to store the HTTP Basic Authentication data, mapping originators to their hashed passwords.

httpTokenAuthData: list[str] =

List to store the HTTP Token Authentication data.

wsBasicAuthData: dict[str, str] =

Dictionary to store the WebSocket Basic Authentication data, mapping originators to their hashed passwords.

wsTokenAuthData: list[str] =

List to store the WebSocket Token Authentication data.

def _checkAcor(self, acp: ACP | ACPAnnc, acor: list[str], originator: str) -> bool:

Check whether an originator is in the list of acor entries.

Parameters
acp:ACP | ACPAnncThe ACP resource to check.
acor:list[str]The list of acor entries.
originator:strThe originator to check.
Returns
boolTrue if the originator is in the list of acor entries, False otherwise.
def _getContext(self, useTLS: bool, verifyCertificate: bool, tlsVersion: str, caCertificateFile: str, caPrivateKeyFile: str) -> ssl.SSLContext:

Depending on the configuration whether to use TLS, this method creates a new SSLContext from the configured certificates and returns it. If TLS is disabled then None is returned.

Returns
ssl.SSLContextSSL / TLD context.
def _initAuthInformation(self):

Initialize the authentication information by reading the configured authentication files for HTTP and WebSocket.

def _readHttpBasicAuthFile(self):

Read the HTTP basic authentication file and store the data in a dictionary. The authentication information is stored as username:password.

The data is stored in the httpBasicAuthData dictionary.

def _readHttpTokenAuthFile(self):

Read the HTTP token authentication file and store the data in a dictionary. The authentication information is stored as a single token per line.

The data is stored in the httpTokenAuthData list.

def _readWSBasicAuthFile(self):

Read the WebSocket basic authentication file and store the data in a dictionary. The authentication information is stored as username:password.

The data is stored in the wsBasicAuthData dictionary.

def _readWSTokenAuthFile(self):

Read the WebSocket token authentication file and store the data in a dictionary. The authentication information is stored as a single token per line.

The data is stored in the wsTokenAuthData list.