Kenning protocols¶
Kenning enables workflows spanning multiple machines, usually utilizing client-server architecture, where the server (also referred to as the target platform or the target device) is running tasks (such as inference or model optimization) on the client’s (host’s) requests, which e.g. delivers models to run or input samples from the dataset.
Protocol was created as a uniform interface for communication between two Kenning instances, serving as an abstraction for the underlying communication logic and technology stack.
There are multiple Protocol
implementations - below is outlined the inheritance hierarchy.
BytesBasedProtocol (abstract - requires further implementations) - implements
kenning.core.protocol.Protocol
abstract methods using a simple protocol with custom data format. Cannot be used on its own (requires implementing the underlying logic of requests and transmissions).KenningProtocol (abstract - requires further implementations) - implements communication logic and flow control for requests and transmissions relying on
BytesBasedProtocol
. Introducessend_data
andreceive_data
abstract methods (for sending and receiving raw bytes) that user needs to implement to send prepared packets over selected mean of communication. There are:NetworkProtocol - Implements methods
send_data
andreceive_data
using Python sockets (TCP/IP protocol).UARTProtocol - Implements methods
send_data
andreceive_data
using the UART standard.
ROS2Protocol - Implementation based on the ROS2 framework.
Note
Things to take into account:
Some of the implementations are not complete communication solutions and require implementations of their own.
Not all implementations are compatible with all platforms (for example Kenning Zephyr Runtime can currently only communicate over UART), some implementations do not support all methods (for example
UARTProtocol
only supports client-side API).
BytesBasedProtocol¶
BytesBasedProtocol is an abstract class implementing all kenning.core.protocol.Protocol
abstract methods and introducing custom protocol message format.
In addition to initialization and disconnecting, BytesBasedProtocol
requires implementing following methods:
These methods allow for requesting, sending and receiving payload (bytes
), along with a set of flags (defined by enum kenning.protocols.bytes_based_protocol.TransmissionFlag
).
Each request and transmission is identified by a MessageType
, which serves to differentiate between concurrent communication streams (for example: the client is requesting inference output and at the same time the server is sending logs), as well as to identify the correct server callback.
This class does not work as a standalone communication solution - aforementioned abstract methods, as well as kenning.core.protocols.Protocol
methods initialize_client, initialize_server and disconnect have to be provided by its implementations.
BytesBasedProtocol implementations¶
Available MessageType values¶
PING
- Message for checking the connection.STATUS
- Message for reporting server status.DATA
- Message contains inference input.MODEL
- Message contains model to load.PROCESS
- Message means the data should be processed.OUTPUT
- Host requests the output from the target.STATS
- Host requests the inference statistics from the target.IO_SPEC
- Message contains io specification to load.OPTIMIZERS
- Message contains optimizers config.OPTIMIZE_MODEL
- Host requests model optimization and receives optimized model.RUNTIME
- Message contains runtime that should be used for inference (i.e. LLEXT binary).UNOPTIMIZED_MODEL
- Message contains an unoptimized model.LOGS
- Log messages sent from the target device (server).
Available TransmissionFlags¶
SUCCESS
- Transmission is informing about a success (for example successful inference).FAIL
- Transmission is informing about a failure.IS_HOST_MESSAGE
- Not set if the transmission was sent by the target device. Set otherwise.IS_KENNING
- Messages sent by Kenning.IS_ZEPHYR
- Messages sent by Kenning Zephyr Runtime.
Protocol specification¶
Sending requests from the client side:
upload_runtime - Sends request:
MessageType.RUNTIME
, binary data from the file as payload. Expects a transmission withTransmissionFlag.SUCCESS
and either:TransmissionFlag.IS_ZEPHYR
orTransmissionFlag.IS_KENNING
andServerAction.UPLOADING_RUNTIME
as payload.upload_io_specification - Sends request,
MessageType.IO_SPEC
, binary data from the file as payload. Expects a transmission withTransmissionFlag.SUCCESS
and either:TransmissionFlag.IS_ZEPHYR
orTransmissionFlag.IS_KENNING
andServerAction.UPLOADING_IOSPEC
as payload.upload_model - Sends request,
MessageType.MODEL
, binary data from the file as payload. Expects a transmission withTransmissionFlag.SUCCESS
and either:TransmissionFlag.IS_ZEPHYR
orTransmissionFlag.IS_KENNING
andServerAction.UPLOADING_MODEL
as payload.upload_input - Sends request,
MessageType.DATA
, raw bytes as payload. Expects a transmission withTransmissionFlag.SUCCESS
and either:TransmissionFlag.IS_ZEPHYR
orTransmissionFlag.IS_KENNING
andServerAction.UPLOADING_INPUT
as payload.request_processing - Sends request,
MessageType.PROCESS
, no payload. Expects a transmission withTransmissionFlag.SUCCESS
and either:TransmissionFlag.IS_ZEPHYR
orTransmissionFlag.IS_KENNING
andServerAction.PROCESSING_INPUT
as payload.download_output - Sends request
MessageType.OUTPUT
, no payload. Expects a transmission withTransmissionFlag.SUCCESS
and non-empty payload.download_statistics - Sends request
MessageType.STATS
, no payload. Expects a transmission withTransmissionFlag.SUCCESS
and non-empty payload.upload_optimizers - Sends request
MessageType.OPTIMIZERS
, JSON serialized to string and encoded as payload. Expects a transmission withTransmissionFlag.SUCCESS
and either:TransmissionFlag.IS_ZEPHYR
orTransmissionFlag.IS_KENNING
andServerAction.UPLOADING_OPTIMIZERS
as payload.request_optimization - Sends request
MessageType.UNOPTIMIZED_MODEL
with binary data from the model file as payload. After receiving a response client sends a requestMessageType.OPTIMIZE_MODEL
with no payload. Expects a transmission with compiled model as payload andTransmissionFlag.SUCCESS
set.
Serving requests on the server side, by MessageType
(after the serve
method has been called to provide callbacks):
Request with message type:
IO_SPEC
,MODEL
,DATA
,PROCESS
,OPTIMIZERS
orUNOPTIMIZED_MODEL
- Calls appropriate server callback. Sends a transmission with theServerAction
(from theServerStatus
returned by the callback) as payload, as well as theTransmissionFlag.IS_KENNING
set. If thesuccess
field of theServerStatus
is set toTrue
,TransmissionFlag.SUCCESS
is set. OtherwiseTransmissionFlag.FAIL
is set.Request with message type:
OPTIMIZE_MODEL
,OUTPUT
,STATS
- Calls appropriate callback. If the callback returnedServerStatus
with thesuccess
field set to true, sends a transmission with the bytes returned by the callback as payload, as well asTransmissionFlag.IS_KENNING
andTransmissionFlag.SUCCESS
set. Otherwise sends a transmission with theServerAction
as payload, as well asTransmissionFlag.IS_KENNING
andTransmissionFlag.FAIL
set.
Logs are sent by the server as unprompted transmissions (MessageType.LOGS
).
Sending multiple log messages per transmission is allowed.
Payload format:
<message 1 size (1 byte)><message 1 ASCII string><message 2 ASCII size (1 byte)><message 2 ASCII string>...<message n size (1byte)><message n ASCII string>.
Methods:
-
class kenning.protocols.bytes_based_protocol.BytesBasedProtocol(timeout: int =
-1
)¶ Bases:
Protocol
,ABC
Implements abstract methods from the Protocol class, using an underlying mechanism of transmissions and requests (that mechanism needs to be provided by this class’es extensions).
- check_status(status: ServerStatus, artifacts: tuple[bytes, list[TransmissionFlag]]) bool ¶
Parses ServerStatus from the contents of a transmission (payload and flags) and compares it against given status.
- Parameters:¶
- status : ServerStatus¶
Status to compare against.
- artifacts : Tuple[bytes, List[TransmissionFlag]]¶
Payload and bytes from a transmission. Flags should either contain ‘IS_ZEPHYR’ flag or the ‘IS_KENNING’ flag.
- Returns:¶
True if status is equal, False if status is not as expected, or if some or the ‘artifacts’ are None.
- Return type:¶
bool
- Raises:¶
ValueError – Flags do not containing ‘IS_ZEPHYR’ flag or the ‘IS_KENNING’ flag.
- download_output() tuple[bool, bytes | None] ¶
Downloads the outputs from the target device.
Requests and downloads the latest inference output from the target device for quality measurements.
-
download_statistics(final: bool =
False
) Measurements ¶ Downloads inference statistics from the target device.
By default no statistics are gathered.
-
abstractmethod event_active(message_type: MessageType | None =
None
) bool ¶ Checks if an active protocol event (Transmission, Request etc.) of a given message type exists.
-
abstractmethod kill_event(message_type: MessageType | None =
None
)¶ Forcibly stops an active event (Transmission, Request etc.).
-
abstractmethod listen(message_type: MessageType | None =
None
, transmission_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None =None
, request_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None =None
, limit: int =-1
, failure_callback: Callable[[MessageType], None] | None =None
)¶ Waits for transmissions and requests from the other side, without blocking the current thread.
- Parameters:¶
- message_type : Optional[MessageType]¶
Message type of the requests/transmissions to listen for, or None (to listen to requests/transmissions of any message type).
- transmission_callback : Optional[ProtocolSuccessCallback]¶
Function, that will be called when a transmission is successfully received. Message type, payload and flags from the transmission will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
- request_callback : Optional[ProtocolSuccessCallback]¶
Function, that will be called when a request is successfully received. Message type, payload and flags from the request will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
- limit : int¶
Meximum number of requests/transmissions (including failures), that will be received before listening stops.
- failure_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if a request/transmission is attempted by the other side, but fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
-
abstractmethod listen_blocking(message_type: MessageType | None =
None
, transmission_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None =None
, request_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None =None
, timeout: float | None =None
, failure_callback: Callable[[MessageType], None] | None =None
) tuple[IncomingEventType | None, MessageType | None, bytes | None, list[TransmissionFlag] | None] ¶ Blocks the current thread until a transmission or a request is received.
- Parameters:¶
- message_type : Optional[MessageType]¶
Message type of the requests/transmissions to listen for, or None (to listen to requests/transmissions of any message type).
- transmission_callback : Optional[ProtocolSuccessCallback]¶
Function, that will be called when a transmission is successfully received. Message type, payload and flags from the transmission will be passed to the function.
- request_callback : Optional[ProtocolSuccessCallback]¶
Function, that will be called when a request is successfully received. Message type, payload and flags from the request will be passed to the function.
- timeout : Optional[float]¶
Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘deny_callback’ will be called.
- failure_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if a request/transmission is attempted by the other side, but fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
- Returns:¶
Enum denoting whether a request or a transmission was received, message type of the received tranmsission/request, received payload, received flags. Alternatively: (None, None, None, None) if no request/transmission was received in the specified timeout or if a request/transmission was attempted but failed.
- Return type:¶
tuple[Optional[IncomingEventType], Optional[MessageType], Optional[bytes], Optional[List[TransmissionFlag]]]
- listen_to_server_logs()¶
Starts continuously receiving and printing logs sent by the server.
-
abstractmethod request(message_type: MessageType, callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None], payload: bytes | None =
None
, flags: list[TransmissionFlag] =[]
, retry: int =1
, deny_callback: Callable[[MessageType], None] | None =None
)¶ Prompts the other side for a transmission and waits for a response, without blocking the current thread. Bytes and flags can also be sent along with the request.
- Parameters:¶
- message_type : MessageType¶
Value denoting what type of transmission is being requested.
- callback : ProtocolSuccessCallback¶
Function, that will be called when the transmission is received. Message type, payload and flags from the transmission will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
- payload : Optional[bytes]¶
Bytes to send along with the request (or None if the payload is to be empty).
- flags : List[TransmissionFlag]¶
A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).
- retry : int¶
Denotes how many times the request will be re-sent after failing, before calling ‘deny_callback’. Negative number denotes infinite retries.
- deny_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if the request is denied or otherwise fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
-
abstractmethod request_blocking(message_type: MessageType, callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None =
None
, payload: bytes | None =None
, flags: list[TransmissionFlag] =[]
, timeout: float | None =None
, retry: int =1
, deny_callback: Callable[[MessageType], None] | None =None
) tuple[bytes | None, list[TransmissionFlag] | None] ¶ Prompts the other side for a transmission and blocks the current thread until a response is received. Bytes and flags can also be sent along with the request.
- Parameters:¶
- message_type : MessageType¶
Value denoting what type of transmission is being requested.
- callback : Optional[ProtocolSuccessCallback]¶
Function, that will be called when the transmission is received. Message type, payload and flags from the transmission will be passed to the function.
- payload : Optional[bytes]¶
Bytes to send along with the request (or None if the payload is to be empty).
- flags : List[TransmissionFlag]¶
A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).
- timeout : Optional[float]¶
Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘deny_callback’ will be called.
- retry : int¶
Denotes how many times the request will be re-sent after failing, before calling ‘deny_callback’. Negative number denotes infinite retries.
- deny_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if the request is denied or otherwise fails.
- Returns:¶
Payload and flags received as response to the request, or (None, None) if the request was denied or otherwise failed.
- Return type:¶
Tuple[Optional[bytes], Optional[List[TransmissionFlag]]]
- request_optimization(model_path: ~pathlib.Path, get_time_func: ~typing.Callable[[], float] = <built-in function perf_counter>) tuple[bool, bytes | None] ¶
Request optimization of model.
- Parameters:¶
- model_path : Path
Path to the model for optimization.
- get_time_func : Callable[[], float]
Function that returns current timestamp.
- Returns:¶
First element is equal to True if optimization finished successfully and the second element contains compiled model.
- Return type:¶
Tuple[bool, Optional[bytes]]
- request_processing(get_time_func: ~typing.Callable[[], float] = <built-in function perf_counter>) bool ¶
Requests processing of input data and waits for acknowledgement.
This method triggers inference on target device and waits until the end of inference on target device is reached.
This method measures processing time on the target device from the level of the host.
Target may send its own measurements in the statistics.
-
serve(upload_input_callback: Callable[[bytes], ServerStatus] | None =
None
, upload_model_callback: Callable[[bytes], ServerStatus] | None =None
, process_input_callback: Callable[[bytes], ServerStatus] | None =None
, download_output_callback: Callable[[None], tuple[ServerStatus, bytes | None]] | None =None
, download_stats_callback: Callable[[None], tuple[ServerStatus, bytes | None]] | None =None
, upload_iospec_callback: Callable[[bytes], ServerStatus] | None =None
, upload_optimizers_callback: Callable[[bytes], ServerStatus] | None =None
, upload_unoptimized_model_callback: Callable[[bytes], ServerStatus] | None =None
, download_optimized_model_callback: Callable[[None], tuple[ServerStatus, bytes | None]] | None =None
, upload_runtime_callback: Callable[[bytes], ServerStatus] | None =None
)¶ Waits for requests from the other device (the client) and calls appropriate callbacks. Some callbacks take bytes as arguments, all return ServerStatus and some also return bytes. These responses are then sent back to the client.
Callbacks are guaranteed to be executed in the order requests were sent in.
This method is non-blocking.
- Parameters:¶
- upload_input_callback : Optional[ServerUploadCallback]¶
Called, when the client uploads input (‘upload_input’ method below). Should upload model input into the runtime.
- upload_model_callback : Optional[ServerUploadCallback]¶
Called. when the client uploads optimized model (‘upload_model’ method below). It should load the model and start inference session.
- process_input_callback : Optional[ServerUploadCallback]¶
Called, when the client requests inference. Should return after inference is completed.
- download_output_callback : Optional[ServerDownloadCallback]¶
Called, when the client requests inference output, should return status of the server and the output.
- download_stats_callback : Optional[ServerDownloadCallback]¶
Called, when the client requests inference stats, should end the inference session and return status of the server and stats.
- upload_iospec_callback : Optional[ServerUploadCallback]¶
Called to upload model input/output specifications (iospec).
- upload_optimizers_callback : Optional[ServerUploadCallback]¶
Called to upload optimizer config (serialized JSON) - ‘upload_optimizers’ method call by the client.
- upload_unoptimized_model_callback : Optional[ServerUploadCallback]¶
Called to upload an unoptimized ML model, should save it.
- download_optimized_model_callback : Optional[ServerDownloadCallback]¶
Called, when client requests optimization of the model (uploaded with ‘upload_unoptimized_model_callback’). Should optimize the model and return it.
- upload_runtime_callback : Optional[ServerUploadCallback]¶
Called, when the client uploads runtime.
- start_sending_logs()¶
Starts sending over the protocol all logs, except for the logs generated by the operation of sending logs itself (otherwise we would have an infinite logging recursion).
- stop_sending_logs()¶
Stops sending logs over the protocol.
-
abstractmethod transmit(message_type: MessageType, payload: bytes | None =
None
, flags: list[TransmissionFlag] =[]
, failure_callback: Callable[[MessageType], None] | None =None
)¶ Sends bytes and a set of flags to the other side, without blocking the current thread.
- Parameters:¶
- message_type : MessageType¶
Value denoting what is being sent. It serves to differentiate one transmission from another (analogous to port number in a TCP protocol).
- payload : Optional[bytes]¶
Bytes to send (or None if the payload is to be empty).
- flags : List[TransmissionFlag]¶
A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).
- failure_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if the transmission fails to send. Note: This will be executed on a separate thread, so make sure the function is thread-safe.
-
abstractmethod transmit_blocking(message_type: MessageType, payload: bytes | None =
None
, flags: list[TransmissionFlag] =[]
, timeout: float | None =None
, failure_callback: Callable[[MessageType], None] | None =None
)¶ Sends bytes and a set of flags to the other side, blocks the current thread until the transmission is completed.
- Parameters:¶
- message_type : MessageType¶
Value denoting what is being sent. It serves to differentiate one transmission from another (analogous to port number in a TCP protocol).
- payload : Optional[bytes]¶
Bytes to send (or None if the payload is to be empty).
- flags : List[TransmissionFlag]¶
A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).
- timeout : Optional[float]¶
Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘failure_callback’ will be called.
- failure_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if the transmission fails to send.
- upload_input(data: bytes) bool ¶
Uploads input to the target device and waits for acknowledgement.
This method should wait until the target device confirms the data is delivered and preprocessed for inference.
- upload_io_specification(path: Path) bool ¶
Uploads input/output specification to the target device.
This method takes the specification in a json format from the given Path and sends it to the target device.
This method should receive the status of uploading the data to the target.
- upload_model(path: Path) bool ¶
Uploads the model to the target device.
This method takes the model from given Path and sends it to the target device.
This method should receive the status of uploading the model from the target.
- upload_optimizers(optimizers_cfg: dict[str, Any]) bool ¶
Upload optimizers config to the target device.
KenningProtocol¶
KenningProtocol is an abstract class implementing the abstract methods of BytesBasedProtocol (so methods such as request
, transmit_blocking
) using abstract methods receive_data and send_data (which have to be provided by implementations of this class).
Therefore, it uses a data link that allows for sending/receiving raw bytes and builds a protocol over it that allows for asynchronous, simultaneous communication through multiple channels (each MessageType
is essentially a separate channel).
KenningProtocol implementations¶
Message structure¶
Size |
6 bits |
2 bits |
8 bits |
16 bits |
0 or 32 bits |
0-n bits |
---|---|---|---|---|---|---|
Offset |
0-5 |
6-7 |
8-15 |
16-31 |
32-63 |
64 |
Field |
Msg Type |
Flow control |
Checksum |
Flags |
Payload size |
Payload |
Flow control values¶
REQUEST
- message is a request for transmissionREQUEST_RETRANSMIT
- message is a request to repeat the last message (because of invalid checksum)ACKNOWLEDGE
- message is an acknowledgement of the last messageTRANSMISSION
- message is part of a transmission
Checksum¶
Bit-wise XOR of all bytes in the message (except for the checksum itself) with 0x4B
byte.
Flags¶
Flag |
Offset in the |
Meaning for |
Meaning for |
---|---|---|---|
|
0 |
can be set/read as a |
last message was received and accepted |
|
1 |
can be set/read as a |
last message was received, but it was rejected ( |
|
2 |
can be set/read as a |
- |
|
3 |
message has payload |
- |
|
4 |
first message in this transmission/request |
- |
|
5 |
last message in this transmission/request (it is possible for a message to be both |
- |
|
6 |
can be set/read as a |
- |
|
7 |
can be set/read as a |
- |
|
12 |
can be set/read as a |
- |
As you can see, certain flags are used by KenningProtocol
for flow-control and protocol logic.
Other flags are available to the protocol user (like BytesBasedProtocol
class), as TransmissionFlag
.
And with some flags, that depends on what message it is.
Some flags are only present for a specific message type (these are carried in the last 4 bits of the Flags
field).
Payload size¶
Size of the payload in bytes.
Example communication scenarios¶
Note
Transmission flags are omitted.
Request (no payload) denied:
A -> B
MessageType.OUTPUT
,REQUEST
, (FIRST
,LAST
)A <- B
MessageType.OUTPUT
,ACKNOWLEDGE
, (FAIL
)
Unprompted single-message transmission:
A -> B
MessageType.LOGS
,TRANSMISSION
, (HAS_PAYLOAD
,FIRST
,LAST
)
Request (no payload) with a multi-message transmission as a response:
A -> B
MessageType.STATS
,REQUEST
, (FIRST
,LAST
)A <- B
MessageType.STATS
,TRANSMISSION
, (HAS_PAYLOAD
,FIRST
)A <- B
MessageType.STATS
,TRANSMISSION
, (HAS_PAYLOAD
)A <- B
MessageType.STATS
,TRANSMISSION
, (HAS_PAYLOAD
)A <- B
MessageType.STATS
,TRANSMISSION
, (HAS_PAYLOAD
,LAST
)
Multi-message request with payload with a single-message transmission as a response:
A -> B
MessageType.IO_SPEC
,REQUEST
, (HAS_PAYLOAD
,FIRST
)A -> B
MessageType.IO_SPEC
,REQUEST
, (HAS_PAYLOAD
)A -> B
MessageType.IO_SPEC
,REQUEST
, (HAS_PAYLOAD
,LAST
)A <- B
MessageType.STATUS
,TRANSMISSION
, (HAS_PAYLOAD
,FIRST
,LAST
)
Methods¶
-
class kenning.protocols.kenning_protocol.KenningProtocol(timeout: int =
-1
, error_recovery: bool =False
, max_message_size: int =1024
)¶ Bases:
BytesBasedProtocol
,ABC
Class for managing the flow of Kenning Protocol.
-
event_active(message_type: MessageType | None =
None
) bool ¶ Checks if an active protocol event (Transmission, Request etc.) of a given message type exists.
-
finish_event(event: ProtocolEvent, log_failure: bool =
True
)¶ Removes an event from the ‘current_protocol_events’ dict and logs it’s success or failure.
-
gather_data(timeout: float | None =
None
) bytes | None ¶ Gathers data from the client.
This method should be called by receive_message in order to get data from the client.
- Parameters:¶
- timeout : Optional[float]¶
Receive timeout in seconds. If timeout > 0, this specifies the maximum wait time, in seconds. If timeout <= 0, the call won’t block, and will report the currently ready file objects. If timeout is None, the call will block until a monitored file object becomes ready.
- Returns:¶
Received data.
- Return type:¶
Optional[bytes]
-
kill_event(message_type: MessageType | None =
None
)¶ Forcibly stops an active event (Transmission, Request etc.).
-
listen(message_type: MessageType | None =
None
, transmission_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None =None
, request_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None =None
, limit: int =-1
, failure_callback: Callable[[MessageType], None] | None =None
)¶ Waits for transmissions and requests from the other side, without blocking the current thread.
- Parameters:¶
- message_type : Optional[MessageType]¶
Message type of the requests/transmissions to listen for, or None (to listen to requests/transmissions of any message type).
- transmission_callback : Optional[ProtocolSuccessCallback]¶
Function, that will be called when a transmission is successfully received. Message type, payload and flags from the transmission will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
- request_callback : Optional[ProtocolSuccessCallback]¶
Function, that will be called when a request is successfully received. Message type, payload and flags from the request will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
- limit : int¶
Meximum number of requests/transmissions (including failures), that will be received before listening stops.
- failure_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if a request/transmission is attempted by the other side, but fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
-
listen_blocking(message_type: MessageType | None =
None
, transmission_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None =None
, request_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None =None
, timeout: float | None =None
, failure_callback: Callable[[MessageType], None] | None =None
) tuple[IncomingEventType | None, MessageType | None, bytes | None, list[TransmissionFlag] | None] ¶ Blocks the current thread until a transmission or a request is received.
- Parameters:¶
- message_type : Optional[MessageType]¶
Message type of the requests/transmissions to listen for, or None (to listen to requests/transmissions of any message type).
- transmission_callback : Optional[ProtocolSuccessCallback]¶
Function, that will be called when a transmission is successfully received. Message type, payload and flags from the transmission will be passed to the function.
- request_callback : Optional[ProtocolSuccessCallback]¶
Function, that will be called when a request is successfully received. Message type, payload and flags from the request will be passed to the function.
- timeout : Optional[float]¶
Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘deny_callback’ will be called.
- failure_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if a request/transmission is attempted by the other side, but fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
- Returns:¶
Enum denoting whether a request or a transmission was received, message type of the received tranmsission/request, received payload, received flags. Alternatively: (None, None, None, None) if no request/transmission was received in the specified timeout or if a request/transmission was attempted but failed.
- Return type:¶
tuple[Optional[IncomingEventType], Optional[MessageType], Optional[bytes], Optional[List[TransmissionFlag]]]
- abstractmethod receive_data(connection: Any, mask: int) Any | None ¶
Receives data from the target device.
-
receive_message(timeout: float | None =
None
) Message | None ¶ Waits for incoming data from the other side of connection.
This method should wait for the input data to arrive and return the appropriate status code along with received data.
- Parameters:¶
- timeout : Optional[float]¶
Receive timeout in seconds. If timeout > 0, this specifies the maximum wait time, in seconds. If timeout <= 0, the call won’t block, and will report the currently ready file objects. If timeout is None, the call will block until a monitored file object becomes ready.
- Returns:¶
Received message, or None if message was not received.
- Return type:¶
Optional[Message]
- receiver()¶
Method sitting in a loop, receiving messages and passing them to the relevant ProtocolEvent class object in the ‘current_protocol_events’ dict, based on message type.
This method is meant to be running constantly on a separate thread, if the protocol is active.
-
request(message_type: MessageType, callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None], payload: bytes | None =
None
, flags: list[TransmissionFlag] =[]
, retry: int =1
, deny_callback: Callable[[MessageType], None] | None =None
)¶ Prompts the other side for a transmission and waits for a response, without blocking the current thread. Bytes and flags can also be sent along with the request.
- Parameters:¶
- message_type : MessageType¶
Value denoting what type of transmission is being requested.
- callback : ProtocolSuccessCallback¶
Function, that will be called when the transmission is received. Message type, payload and flags from the transmission will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
- payload : Optional[bytes]¶
Bytes to send along with the request (or None if the payload is to be empty).
- flags : List[TransmissionFlag]¶
A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).
- retry : int¶
Denotes how many times the request will be re-sent after failing, before calling ‘deny_callback’. Negative number denotes infinite retries.
- deny_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if the request is denied or otherwise fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.
-
request_blocking(message_type: MessageType, callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None =
None
, payload: bytes | None =None
, flags: list[TransmissionFlag] =[]
, timeout: float | None =None
, retry: int =1
, deny_callback: Callable[[MessageType], None] | None =None
) tuple[bytes | None, list[TransmissionFlag] | None] ¶ Prompts the other side for a transmission and blocks the current thread until a response is received. Bytes and flags can also be sent along with the request.
- Parameters:¶
- message_type : MessageType¶
Value denoting what type of transmission is being requested.
- callback : Optional[ProtocolSuccessCallback]¶
Function, that will be called when the transmission is received. Message type, payload and flags from the transmission will be passed to the function.
- payload : Optional[bytes]¶
Bytes to send along with the request (or None if the payload is to be empty).
- flags : List[TransmissionFlag]¶
A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).
- timeout : Optional[float]¶
Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘deny_callback’ will be called.
- retry : int¶
Denotes how many times the request will be re-sent after failing, before calling ‘deny_callback’. Negative number denotes infinite retries.
- deny_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if the request is denied or otherwise fails.
- Returns:¶
Payload and flags received as response to the request, or (None, None) if the request was denied or otherwise failed.
- Return type:¶
Tuple[Optional[bytes], Optional[List[TransmissionFlag]]]
- run_event(event: ProtocolEvent, success_callback: Callable[[ProtocolEvent], None], deny_callback: Callable[[ProtocolEvent], None])¶
Adds a protocol event to the ‘current_protocol_events’ dict, so that it will be serviced by the receiver thread, starts it in non-blocking mode and passes callbacks.
- run_event_blocking(event: ProtocolEvent, timeout: float | None) tuple[bool, ProtocolEvent] ¶
Adds a protocol event to the ‘current_protocol_events’ dict, so that it will be serviced by the receiver thread, starts it in blocking mode and blocks the current thread until it completes.
- Parameters:¶
- Returns:¶
True if event succeeded, False if not, ProtocolEvent object returned by the event (see the dostring of the relevant ProtocolEvent to see what object will be passed here).
- Return type:¶
Tuple[bool, ProtocolEvent]
- Raises:¶
ValueError – Attempted to start the event, while another event of the same type was already in progress.
ProtocolNotStartedError – Protocol is not active, call ‘start’ first.
- abstractmethod send_data(data: Any) bool ¶
Sends data to the target device.
Data can be model to use, input to process, additional configuration.
- send_messages(message_type: MessageType, messages: list[Message])¶
Gives a job to the ‘transmitter’ thread pool, that will send messages and call ‘messages_sent’ method on the appropriate ProtocolEvent object in the ‘self.current_protocol_events’ dict (based on message type).
This is meant be be called by the ProtocolEvent objects to send messages and update their state.
- Parameters:¶
- message_type : MessageType¶
Message type of the ProtocolEvent, that sends the messages. This is needed, so that we can later find that object in the ‘self.current_protocol_events’ dict and call ‘messages_sent’. We could not use a reference here, because for example if a Listen object in the dict has an IncomingTransmission object inside, and that inner object calls this method, we need to call ‘message_sent’ on the whole Listen object, not just the inner one.
- messages : List[Message]¶
List of messages to send.
- start()¶
Starts the protocol, creates and runs threads for receiving and transmitting messages.
- stop()¶
Stops the protocol and joins all threads.
-
transmit(message_type: MessageType, payload: bytes | None =
None
, flags: list[TransmissionFlag] =[]
, failure_callback: Callable[[MessageType], None] | None =None
)¶ Sends bytes and a set of flags to the other side, without blocking the current thread.
- Parameters:¶
- message_type : MessageType¶
Value denoting what is being sent. It serves to differentiate one transmission from another (analogous to port number in a TCP protocol).
- payload : Optional[bytes]¶
Bytes to send (or None if the payload is to be empty).
- flags : List[TransmissionFlag]¶
A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).
- failure_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if the transmission fails to send. Note: This will be executed on a separate thread, so make sure the function is thread-safe.
-
transmit_blocking(message_type: MessageType, payload: bytes | None =
None
, flags: list[TransmissionFlag] =[]
, timeout: float | None =None
, failure_callback: Callable[[MessageType], None] | None =None
)¶ Sends bytes and a set of flags to the other side, blocks the current thread until the transmission is completed.
- Parameters:¶
- message_type : MessageType¶
Value denoting what is being sent. It serves to differentiate one transmission from another (analogous to port number in a TCP protocol).
- payload : Optional[bytes]¶
Bytes to send (or None if the payload is to be empty).
- flags : List[TransmissionFlag]¶
A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).
- timeout : Optional[float]¶
Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘failure_callback’ will be called.
- failure_callback : Optional[ProtocolFailureCallback]¶
Function, that will be called if the transmission fails to send.
-
event_active(message_type: MessageType | None =
NetworkProtocol¶
NetworkProtocol implements abstract methods from KenningProtocol (send_data and receive_data), as well as abstract methods from Protocol, that neither KenningProtocol nor BytesBasedProtocol implemented (so disconnect, initialize_client, and initialize_server).
It uses Python sockets.
After calling initialize_server
it waits for a client to connect and creates a socket. Only one client at a time can be connected.
When a client connects, the server sends a single byte (0x00
) to the client, as a confirmation and test of the connection.
The client does not respond.
Methods¶
-
class kenning.protocols.network.NetworkProtocol(host: str =
'localhost'
, port: int =12345
, timeout: int =-1
, error_recovery: bool =False
, packet_size: int =4096
, max_message_size: int =1048576
)¶ Bases:
KenningProtocol
A TCP-based protocol.
Protocol is implemented using BSD sockets and selectors-based pooling.
- disconnect()¶
Ends connection with the other side.
- initialize_client() bool ¶
Initializes client side of the protocol.
The client side is supposed to run on host testing the target hardware.
The parameters for the client should be provided in the constructor.
-
initialize_server(client_connected_callback: Callable[[Any], None] | None =
None
, client_disconnected_callback: Callable[[None], None] | None =None
) bool ¶ Initializes server side of the protocol.
The server side is supposed to run on target hardware.
The parameters for the server should be provided in the constructor.
- Parameters:¶
- client_connected_callback : Optional[Callable[Any, None]]¶
Called when a client connects to the server. Either IP address or another distinguishing characteristic of the client will be passed to the callback (depending on the underlying protocol).
- client_disconnected_callback : Optional[Callable[None, None]]¶
Called, when the current client disconnects from the server.
- Returns:¶
True if succeeded.
- Return type:¶
bool
- send_data(data: bytes)¶
Sends data to the target device.
Data can be model to use, input to process, additional configuration.
UARTProtocol¶
UARTProtocol implements abstract methods from KenningProtocol (send_data and receive_data), as well as abstract methods from Protocol, that neither KenningProtocol nor BytesBasedProtocol implemented (so disconnect and initialize_client).
The initialize_server method raises kenning.core.exceptions.NotSupportedError
, since Kenning Server working on UART is not supported.
When initializing, the client sends a MessageType.PING
request with TransmissionFlag.SUCCESS
, as a test of the connection and a signal starting a session.
The server responds with a MessageType.PING
transmission with TransmissionFlag.SUCCESS
if it accepts the connection and TransmissionFlag.FAIL
if it does not accept the connection (for example because a client is already connected).
When disconnecting, the client sends a MessageType.PING
request with TransmissionFlag.FAIL
.
The server responds with a MessageType.PING
, TransmissionFlag.SUCCESS
transmission.
The UARTProtocol
class overrides some of the kenning.core.protocol.Protocol
abstract methods, that are already implemented by BytesBasedProtocol
, in order to change their behaviour:
download_statistics - It expects the statistics to arrive in a custom format, instead of a serialized JSON. Statistics are parsed by the
kenning.protocols.uart._parse_stats
function.upload_io_specification - Instead of sending the input/output specification as a JSON serialized to string, it sends it serialized to a custom format that enables easy mapping of the binary data to a packed C struct. The format is defined in the form of a dict (
kenning.interfaces.io_spec_serializer.IOSPEC_STRUCT_FIELDS
).
Methods¶
ROS2Protocol¶
ROS2Protocol is an implementation based on the ROS 2 framework.
It does not support server-side API methods (initialize_server, serve), target-side model optimization (upload_optimizers, request_optimization), dynamic runtime changes (upload_runtime) or sending/receiving logs (listen_to_server_logs, start_sending_logs, stop_sending_logs).