class documentation

class RequestManager:

View In Hierarchy

RequestManager class.

Method addResponse Add a response and topic to the response dictionary. The key is the rqi (requestIdentifier) of the response.
Method configUpdate Callback for the configUpdate event.
Method determineTargetDetails Resolve the real URL and more message parameters for a request and a target,
Method dissectRequestFromBytes Dissect a request in a byte string and build up a CSERequest instance.
Method fillAndValidateCSERequest Fill a cseRequest object according to its request structure in the Result.request attribute.
Method getSerializationFromOriginator Get for the content serializations of a registered originator.
Method handleCreateRequest Handle an incoming CREATE request.
Method handleDeleteRequest Handle an incoming DELETE request.
Method handleNotifyRequest Handle an incoming NOTIFY request.
Method handleReceivedNotifyRequest Handle a NOTIFY request to resource.
Method handleRequest Calls the fitting request handler for an operation and let that handle the request.
Method handleRetrieveRequest Handle an incoming RETRIEVE request.
Method handleSendRequest Handle a send request. This method is used to send a request to a remote CSE or to a local resource.
Method handleTransitCreateRequest Forward a CREATE request to a remote CSE.
Method handleTransitDeleteRequest Forward a DELETE request to a remote CSE.
Method handleTransitNotifyRequest Forward a NOTIFY request to a remote CSE.
Method handleTransitRetrieveRequest Forward a RETRIEVE request to a remote CSE
Method handleTransitUpdateRequest Forward an UPDATE request to a remote CSE.
Method handleUpdateRequest Handle an incoming UPDATE request.
Method hasPollingRequest Check whether there is a pending request or response pending for the tuple (originator, requestID). This method is also used as a callback for periodic check whether a request or response is queued. If ...
Method initialize Initialize the RequestManager.
Method prepareResultForSending Prepare a new request for MQTT or WebSocket sending.
Method processRequest Calls the fitting request process handler for an operation and call it.
Method queuePollingRequest Add a new request to the polling request queue.
Method queueRequestForPCH Queue a (incoming) request or content for a <PCH>. It can be retrieved via the target's <PCU> child resource.
Method recordRequest Record a request and its response in the database.
Method requestFromResult Convert a response request to a new Result object and create a new dictionary in Result.data with the full Response structure. Recursively do this if the embeddedRequest is also a full Request or Response.
Method responseFromResult Shortcut for requestFromResult to create a response object.
Method restart Restart the registrationManager service.
Method shutdown Shutdown the RequestManager.
Method unqueuePollingRequest Remove a request for the originator and with the requestID from the polling request queue.
Method waitForPollingRequest Busy waiting for a polling request. The function returns when there is a new or pending matching request in the queue, or when the timeout (in seconds) is met.
Method waitForResponse Wait for a response with a specific requestIdentifier rqi.
Method waitForResponseToPCH Wait for a RESPONSE to a request.
Class Variable __slots__ Slots for RequestManager class.
Class Variable coapServer The injected CoAPServer plugin instance is injected by the PluginManager based on the declared dependency.
Class Variable dispatcher Injected Dispatcher instance.
Class Variable httpServer The injected HttpServer plugin instance is injected by the PluginManager based on the declared dependency.
Class Variable mqttClient The injected MQTTClient plugin instance is injected by the PluginManager based on the declared dependency.
Class Variable notificationManager Injected NotificationManager instance.
Class Variable registration Injected RegistrationManager instance.
Class Variable remoteCSEManager The injected RemoteCSEManager plugin instance is injected by the PluginManager based on the declared dependency.
Class Variable securityManager Injected SecurityManager instance.
Class Variable storage Injected Storage instance.
Class Variable validator Injected Validator instance.
Class Variable websocketServer The injected WebSocketServer plugin instance is injected by the PluginManager based on the declared dependency.
Instance Variable enableRequestRecording Whether request recording is enabled.
Instance Variable flexBlockingBlocking Whether flexBlocking requests are handled as blocking or non-blocking.
Instance Variable maxExpirationDelta The maximum request expiration delta value.
Instance Variable requestExpirationDelta The request expiration delta value.
Instance Variable requestHandlers Dictionary to map operations to their corresponding request handling functions, dispatcher processing functions, send functions, and events for different protocols.
Instance Variable requestRingBuffer RingBuffer to store requests for later retrieval.
Instance Variable sendToFromInResponses Whether to include the 'to' and 'from' fields in responses.
Method _assignConfig Store relevant configuration values in the manager.
Method _cleanupPollingRequests Remove expired requests from the polling request queue. This method is called periodically by an actor.
Method _createRequestResource Create a <request> resource for the given request.
Method _executeOperation Execute a request operation and fill the respective request resource accordingly.
Method _handleNonBlockingRequest This method creates a <request> resource, initiates the execution of the desired operation in the background, but immediately returns with the reference of the <request> resource that will contain the result of the operation.
Method _originatorAdaptToScope Convert from to CSE-relative, SP-relative or Absolute format in the request. The from is converted in request.originator and request.originalRequest, but NOT in request.originalData.
Method _runNonBlockingRequestAsync Execute the actual request and store the result in the respective <request> resource. In addition notify the notification targets.
Method _runNonBlockingRequestSync Execute the actual request and store the result in the respective <request> resource.
Method _sendRequest Send a request via the appropriate channel or transport protocol.
Instance Variable _pcWorker Worker to clean up expired polling requests.
Instance Variable _receivedResponses Dictionary to store received responses for non-blocking requests.
Instance Variable _receivedResponsesLock Lock to access the received responses dictionary.
Instance Variable _requestLock Lock to access the following two dictionaries.
Instance Variable _requests Dictionary to map request originators to a list of requests. Used for handling polling requests.
Instance Variable _rqiOriginator Dictionary to map requestIdentifiers to an originator of a request. Used for handling of polling requests.
def addResponse(self, response: Result, info: str | None = None):

Add a response and topic to the response dictionary. The key is the rqi (requestIdentifier) of the response.

@onEvent(eventManager.configUpdate)
def configUpdate(self, eventData: EventData):

Callback for the configUpdate event.

Parameters
eventData:EventDataThe event data, containing the name of the updated configuration setting and its new value.
def determineTargetDetails(self, request: CSERequest) -> TargetDetails | None:

Resolve the real URL and more message parameters for a request and a target,

Notes

A successful determination may include the type of the target resource. This is different from the request content's resource type.

Parameters
request:CSERequestThe request from which the target details are taken from.
Returns
The results could differ

The result is a list of tuples of (real url including the protocol, list of allowed contentSerializations, target supported release version, PollingChannel resource, originator with adapted scope, target uri, determined target resource type).

Or, return a list of (url, None, None, None, originator, None, UNKNOWN), containing only one element, if the URI is already a URL. We cannot determine the preferred serializations in this case. and we don't know the target entity.

Return a list of (None, list of allowed contentSerializations, srv, PollingChannel resource, originator with adapted scope, target, uri, determined target resource type), containing only one element, if the target resourec is not request reachable and has a PollingChannel as a child resource.

Otherwise, return a list of the mentioned tuples.

In case of an error, an empty list is returned.

def dissectRequestFromBytes(self, data: bytes, contenType: ContentSerializationType, isResponse: bool | None = False) -> Result:

Dissect a request in a byte string and build up a CSERequest instance.

Parameters
data:bytesThe data to dissect.
contenType:ContentSerializationTypeThe content type of the data.
isResponse:bool | NoneIf True then the data is a response, otherwise it is a request.
Returns
ResultA Result instance with the dissected request in Result.request. The Result.data contains the pc of the request.
def fillAndValidateCSERequest(self, cseRequest: CSERequest | JSON, isResponse: bool | None = False) -> CSERequest:

Fill a cseRequest object according to its request structure in the Result.request attribute.

def getSerializationFromOriginator(self, originator: str) -> list[ContentSerializationType]:

Get for the content serializations of a registered originator.

It is either an AE, a CSE or a CSR.

Parameters
originator:strThe originator to check.
Returns
list[ContentSerializationType]List of ContentSerializationTypes.
def handleCreateRequest(self, request: CSERequest) -> Result:

Handle an incoming CREATE request.

Parameters
request:CSERequestThe incoming request.
Returns
ResultResult of the request handling.
Raises
BAD_REQUESTIf the request is invalid.
def handleDeleteRequest(self, request: CSERequest) -> Result:

Handle an incoming DELETE request.

Parameters
request:CSERequestThe incoming request.
Returns
ResultResult of the request handling.
Raises
OPERATION_NOT_ALLOWEDIf the operation is not allowed.
BAD_REQUESTIf the request is invalid.
def handleNotifyRequest(self, request: CSERequest) -> Result:

Handle an incoming NOTIFY request.

Parameters
request:CSERequestThe incoming request.
Returns
ResultResult of the request handling.
Raises
BAD_REQUESTIf the request is invalid.
def handleReceivedNotifyRequest(self, id: str, request: CSERequest, originator: str) -> Result:

Handle a NOTIFY request to resource.

def handleRequest(self, request: CSERequest | JSON) -> Result:

Calls the fitting request handler for an operation and let that handle the request.

Before the request is processed it will be determined whether it is blocking or non-blocking etc.

Parameters
request:CSERequest | JSONThe incoming request.
Returns
ResultRequest result.
def handleRetrieveRequest(self, request: CSERequest) -> Result:

Handle an incoming RETRIEVE request.

Parameters
request:CSERequestThe incoming request.
Returns
ResultResult of the request handling.
Raises
BAD_REQUESTIf the request is invalid.
def handleSendRequest(self, request: CSERequest) -> RequestResponseList:

Handle a send request. This method is used to send a request to a remote CSE or to a local resource.

Parameters
request:CSERequestThe request to send.
Returns
RequestResponseListA list of RequestResponse objects containing the request and the result of the send operation.
def handleTransitCreateRequest(self, request: CSERequest) -> Result:

Forward a CREATE request to a remote CSE.

def handleTransitDeleteRequest(self, request: CSERequest) -> Result:

Forward a DELETE request to a remote CSE.

def handleTransitNotifyRequest(self, request: CSERequest) -> Result:

Forward a NOTIFY request to a remote CSE.

def handleTransitRetrieveRequest(self, request: CSERequest) -> Result:

Forward a RETRIEVE request to a remote CSE

def handleTransitUpdateRequest(self, request: CSERequest) -> Result:

Forward an UPDATE request to a remote CSE.

def handleUpdateRequest(self, request: CSERequest) -> Result:

Handle an incoming UPDATE request.

Raises:

Parameters
request:CSERequestThe incoming request.
Returns
ResultResult of the request handling.
def hasPollingRequest(self, originator: str, requestID: str = None, reqType: RequestType = RequestType.REQUEST) -> bool:

Check whether there is a pending request or response pending for the tuple (originator, requestID). This method is also used as a callback for periodic check whether a request or response is queued. If requestID is not None then the check is for a request with that ID. Otherwise, True will be returned if there is any request for the originator.

def initialize(self):

Initialize the RequestManager.

def prepareResultForSending(self, inResult: Result, isResponse: bool | None = False, originalRequest: CSERequest | None = None) -> tuple[Result, bytes]:

Prepare a new request for MQTT or WebSocket sending.

Attention

Remember, a response is actually just a new request. This takes care of the fact that in MQTT or WebSockets a response is very similar to a response.

Parameters
inResult:ResultA Result object, that contains a request in its request attribute.
isResponse:bool | NoneIndicator whether the Result object is actually a response or a request.
originalRequest:CSERequest | NoneThe original request that was received.
Returns
tuple[Result, bytes]A tuple with an updated Result object and the serialized content.
def processRequest(self, request: CSERequest, originator: str, id: str) -> Result:

Calls the fitting request process handler for an operation and call it.

This will directly handle the request.

Result:
Request result
Parameters
request:CSERequestUndocumented
originator:strThe request originator.
id:strThe structured or unstructured resource id.
Returns
ResultUndocumented
def queuePollingRequest(self, request: CSERequest, reqType: RequestType = RequestType.REQUEST):

Add a new request to the polling request queue.

The reqType specifies whether this request is a oneM2M Request or Response.

def queueRequestForPCH(self, operation: Operation, pchOriginator: str, content: JSON = None, ty: ResourceTypes = None, rvi: str = None, request: CSERequest = None, reqType: RequestType = RequestType.REQUEST, ec: EventCategory = None, originator: str = None) -> CSERequest | None:

Queue a (incoming) request or content for a <PCH>. It can be retrieved via the target's <PCU> child resource.

If a request is passed then this object is queued. If no request but data is given then a new request object is created for content.

def recordRequest(self, request: CSERequest | None, result: Result):

Record a request and its response in the database.

Parameters
request:CSERequest | NoneThe request to record. If None, then no recording is done.
result:ResultThe result of the request, containing the response to record.
def requestFromResult(self, inResult: Result, originator: str | None = None, ty: ResourceTypes | None = None, op: Operation | None = None, isResponse: bool | None = False, originalRequest: CSERequest | None = None) -> Result:

Convert a response request to a new Result object and create a new dictionary in Result.data with the full Response structure. Recursively do this if the embeddedRequest is also a full Request or Response.

Parameters
inResult:ResultThe input Result object.
originator:str | NoneThe request originator.
ty:ResourceTypes | NoneOptional resource type.
op:Operation | NoneOptional request operation type
isResponse:bool | NoneWhether the result is actually a response, and not a request.
originalRequest:CSERequest | NoneUndocumented
Returns
ResultResult object with the response. The request or response is in data.
def responseFromResult(self, inResult: Result, originator: str | None = None) -> Result:

Shortcut for requestFromResult to create a response object.

Parameters
inResult:ResultResult that contains the response.
originator:str | NoneOriginator for the response.
Returns
ResultResult object with the response.
@onEvent(eventManager.cseReset)
def restart(self, eventData: EventData):

Restart the registrationManager service.

def shutdown(self) -> bool:

Shutdown the RequestManager.

Returns
boolAlways returns True.
def unqueuePollingRequest(self, originator: str, requestID: str, reqType: RequestType) -> CSERequest:

Remove a request for the originator and with the requestID from the polling request queue.

def waitForPollingRequest(self, originator: str, requestID: str, timeout: float, reqType: RequestType | None = RequestType.REQUEST, aggregate: bool | None = False) -> Result:

Busy waiting for a polling request. The function returns when there is a new or pending matching request in the queue, or when the timeout (in seconds) is met.

Parameters
originator:strRequest originator to match.
requestID:strRequest Identifier to match. Might be None to match all request IDs.
timeout:floatTimeout in seconds for the polling request to wait.
reqType:RequestType | NoneMatch request or response.
aggregate:bool | NoneBoolean indicating whether all the available requests shall be returned in one aggregation, or separately.
Returns
ResultThe function returns a Result object with the request or aggregated requests in the request attribute.
def waitForResponse(self, rqi: str, timeOut: float) -> tuple[Result | None, str | None]:

Wait for a response with a specific requestIdentifier rqi.

def waitForResponseToPCH(self, request: CSERequest) -> Result:

Wait for a RESPONSE to a request.

__slots__: tuple[str, ...] =

Slots for RequestManager class.

coapServer: Any =

The injected CoAPServer plugin instance is injected by the PluginManager based on the declared dependency.

dispatcher: Dispatcher =

Injected Dispatcher instance.

httpServer: Any =

The injected HttpServer plugin instance is injected by the PluginManager based on the declared dependency.

mqttClient: Any =

The injected MQTTClient plugin instance is injected by the PluginManager based on the declared dependency.

notificationManager: NotificationManager =

Injected NotificationManager instance.

registration: RegistrationManager =

Injected RegistrationManager instance.

remoteCSEManager: Any =

The injected RemoteCSEManager plugin instance is injected by the PluginManager based on the declared dependency.

securityManager: SecurityManager =

Injected SecurityManager instance.

storage: Storage =

Injected Storage instance.

validator: Validator =

Injected Validator instance.

websocketServer: Any =

The injected WebSocketServer plugin instance is injected by the PluginManager based on the declared dependency.

enableRequestRecording: bool =

Whether request recording is enabled.

flexBlockingBlocking: bool =

Whether flexBlocking requests are handled as blocking or non-blocking.

maxExpirationDelta: float =

The maximum request expiration delta value.

requestExpirationDelta: float =

The request expiration delta value.

requestHandlers: RequestHandler =

Dictionary to map operations to their corresponding request handling functions, dispatcher processing functions, send functions, and events for different protocols.

requestRingBuffer: RequestRingBuffer =

RingBuffer to store requests for later retrieval.

sendToFromInResponses: bool =

Whether to include the 'to' and 'from' fields in responses.

def _assignConfig(self):

Store relevant configuration values in the manager.

def _cleanupPollingRequests(self) -> bool:

Remove expired requests from the polling request queue. This method is called periodically by an actor.

Returns:

def _createRequestResource(self, request: CSERequest) -> Resource:

Create a <request> resource for the given request.

Parameters
request:CSERequestThe request for which to create the <request> resource.
Returns
ResourceThe created <request> resource.
def _executeOperation(self, request: CSERequest, reqRi: str) -> REQ:

Execute a request operation and fill the respective request resource accordingly.

Parameters
request:CSERequestThe request to execute.
reqRi:strThe <request> resource id.
Returns
REQThe <request> resource.
def _handleNonBlockingRequest(self, request: CSERequest) -> Result:

This method creates a <request> resource, initiates the execution of the desired operation in the background, but immediately returns with the reference of the <request> resource that will contain the result of the operation.

def _originatorAdaptToScope(self, request: CSERequest, simplify: bool = False):

Convert from to CSE-relative, SP-relative or Absolute format in the request. The from is converted in request.originator and request.originalRequest, but NOT in request.originalData.

See TS-0004, 7.3.2.6, Forwarding

def _runNonBlockingRequestAsync(self, request: CSERequest, reqRi: str) -> bool:

Execute the actual request and store the result in the respective <request> resource. In addition notify the notification targets.

def _runNonBlockingRequestSync(self, request: CSERequest, reqRi: str) -> bool:

Execute the actual request and store the result in the respective <request> resource.

def _sendRequest(self, request: CSERequest) -> RequestResponseList:

Send a request via the appropriate channel or transport protocol.

_pcWorker =

Worker to clean up expired polling requests.

_receivedResponses: dict[str, tuple[Result, str]] =

Dictionary to store received responses for non-blocking requests.

_receivedResponsesLock =

Lock to access the received responses dictionary.

_requestLock =

Lock to access the following two dictionaries.

_requests: dict[str, list[tuple[CSERequest, RequestType]]] =

Dictionary to map request originators to a list of requests. Used for handling polling requests.

_rqiOriginator: dict[str, str] =

Dictionary to map requestIdentifiers to an originator of a request. Used for handling of polling requests.