Kenning protocols

Kenning enables workflows spanning multiple machines, usually utilizing client-server architecture, where the server (also referred to as the target platform or the target device) is running tasks (such as inference or model optimization) on the client’s (host’s) requests, which e.g. delivers models to run or input samples from the dataset.

Protocol was created as a uniform interface for communication between two Kenning instances, serving as an abstraction for the underlying communication logic and technology stack.

There are multiple Protocol implementations - below is outlined the inheritance hierarchy.

  • BytesBasedProtocol (abstract - requires further implementations) - implements kenning.core.protocol.Protocol abstract methods using a simple protocol with custom data format. Cannot be used on its own (requires implementing the underlying logic of requests and transmissions).

    • KenningProtocol (abstract - requires further implementations) - implements communication logic and flow control for requests and transmissions relying on BytesBasedProtocol. Introduces send_data and receive_data abstract methods (for sending and receiving raw bytes) that user needs to implement to send prepared packets over selected mean of communication. There are:

  • ROS2Protocol - Implementation based on the ROS2 framework.

Note

Things to take into account:

  • Some of the implementations are not complete communication solutions and require implementations of their own.

  • Not all implementations are compatible with all platforms (for example Kenning Zephyr Runtime can currently only communicate over UART), some implementations do not support all methods (for example UARTProtocol only supports client-side API).

BytesBasedProtocol

BytesBasedProtocol is an abstract class implementing all kenning.core.protocol.Protocol abstract methods and introducing custom protocol message format. In addition to initialization and disconnecting, BytesBasedProtocol requires implementing following methods:

These methods allow for requesting, sending and receiving payload (bytes), along with a set of flags (defined by enum kenning.protocols.bytes_based_protocol.TransmissionFlag).

Each request and transmission is identified by a MessageType, which serves to differentiate between concurrent communication streams (for example: the client is requesting inference output and at the same time the server is sending logs), as well as to identify the correct server callback. This class does not work as a standalone communication solution - aforementioned abstract methods, as well as kenning.core.protocols.Protocol methods initialize_client, initialize_server and disconnect have to be provided by its implementations.

BytesBasedProtocol implementations

Available MessageType values

  • PING - Message for checking the connection.

  • STATUS - Message for reporting server status.

  • DATA - Message contains inference input.

  • MODEL - Message contains model to load.

  • PROCESS - Message means the data should be processed.

  • OUTPUT - Host requests the output from the target.

  • STATS - Host requests the inference statistics from the target.

  • IO_SPEC - Message contains io specification to load.

  • OPTIMIZERS - Message contains optimizers config.

  • OPTIMIZE_MODEL - Host requests model optimization and receives optimized model.

  • RUNTIME - Message contains runtime that should be used for inference (i.e. LLEXT binary).

  • UNOPTIMIZED_MODEL - Message contains an unoptimized model.

  • LOGS - Log messages sent from the target device (server).

Available TransmissionFlags

  • SUCCESS - Transmission is informing about a success (for example successful inference).

  • FAIL- Transmission is informing about a failure.

  • IS_HOST_MESSAGE- Not set if the transmission was sent by the target device. Set otherwise.

  • IS_KENNING- Messages sent by Kenning.

  • IS_ZEPHYR- Messages sent by Kenning Zephyr Runtime.

Protocol specification

Sending requests from the client side:

  • upload_runtime - Sends request: MessageType.RUNTIME, binary data from the file as payload. Expects a transmission with TransmissionFlag.SUCCESS and either: TransmissionFlag.IS_ZEPHYR or TransmissionFlag.IS_KENNING and ServerAction.UPLOADING_RUNTIME as payload.

  • upload_io_specification - Sends request, MessageType.IO_SPEC, binary data from the file as payload. Expects a transmission with TransmissionFlag.SUCCESS and either: TransmissionFlag.IS_ZEPHYR or TransmissionFlag.IS_KENNING and ServerAction.UPLOADING_IOSPEC as payload.

  • upload_model - Sends request, MessageType.MODEL, binary data from the file as payload. Expects a transmission with TransmissionFlag.SUCCESS and either: TransmissionFlag.IS_ZEPHYR or TransmissionFlag.IS_KENNING and ServerAction.UPLOADING_MODEL as payload.

  • upload_input - Sends request, MessageType.DATA, raw bytes as payload. Expects a transmission with TransmissionFlag.SUCCESS and either: TransmissionFlag.IS_ZEPHYR or TransmissionFlag.IS_KENNING and ServerAction.UPLOADING_INPUT as payload.

  • request_processing - Sends request, MessageType.PROCESS, no payload. Expects a transmission with TransmissionFlag.SUCCESS and either: TransmissionFlag.IS_ZEPHYR or TransmissionFlag.IS_KENNING and ServerAction.PROCESSING_INPUT as payload.

  • download_output - Sends request MessageType.OUTPUT, no payload. Expects a transmission with TransmissionFlag.SUCCESS and non-empty payload.

  • download_statistics - Sends request MessageType.STATS, no payload. Expects a transmission with TransmissionFlag.SUCCESS and non-empty payload.

  • upload_optimizers - Sends request MessageType.OPTIMIZERS, JSON serialized to string and encoded as payload. Expects a transmission with TransmissionFlag.SUCCESS and either: TransmissionFlag.IS_ZEPHYR or TransmissionFlag.IS_KENNING and ServerAction.UPLOADING_OPTIMIZERS as payload.

  • request_optimization - Sends request MessageType.UNOPTIMIZED_MODEL with binary data from the model file as payload. After receiving a response client sends a request MessageType.OPTIMIZE_MODEL with no payload. Expects a transmission with compiled model as payload and TransmissionFlag.SUCCESS set.

Serving requests on the server side, by MessageType (after the serve method has been called to provide callbacks):

  • Request with message type: IO_SPEC, MODEL, DATA, PROCESS, OPTIMIZERS or UNOPTIMIZED_MODEL - Calls appropriate server callback. Sends a transmission with the ServerAction (from the ServerStatus returned by the callback) as payload, as well as the TransmissionFlag.IS_KENNING set. If the success field of the ServerStatus is set to True, TransmissionFlag.SUCCESS is set. Otherwise TransmissionFlag.FAIL is set.

  • Request with message type: OPTIMIZE_MODEL, OUTPUT, STATS - Calls appropriate callback. If the callback returned ServerStatus with the success field set to true, sends a transmission with the bytes returned by the callback as payload, as well as TransmissionFlag.IS_KENNING and TransmissionFlag.SUCCESS set. Otherwise sends a transmission with the ServerAction as payload, as well as TransmissionFlag.IS_KENNING and TransmissionFlag.FAIL set.

Logs are sent by the server as unprompted transmissions (MessageType.LOGS).

Sending multiple log messages per transmission is allowed.

Payload format:

<message 1 size (1 byte)><message 1 ASCII string><message 2 ASCII size (1 byte)><message 2 ASCII string>...<message n size (1byte)><message n ASCII string>.

Methods:

class kenning.protocols.bytes_based_protocol.BytesBasedProtocol(timeout: int = -1)

Bases: Protocol, ABC

Implements abstract methods from the Protocol class, using an underlying mechanism of transmissions and requests (that mechanism needs to be provided by this class’es extensions).

check_status(status: ServerStatus, artifacts: tuple[bytes, list[TransmissionFlag]]) bool

Parses ServerStatus from the contents of a transmission (payload and flags) and compares it against given status.

Parameters:
status : ServerStatus

Status to compare against.

artifacts : Tuple[bytes, List[TransmissionFlag]]

Payload and bytes from a transmission. Flags should either contain ‘IS_ZEPHYR’ flag or the ‘IS_KENNING’ flag.

Returns:

True if status is equal, False if status is not as expected, or if some or the ‘artifacts’ are None.

Return type:

bool

Raises:

ValueError – Flags do not containing ‘IS_ZEPHYR’ flag or the ‘IS_KENNING’ flag.

download_output() tuple[bool, bytes | None]

Downloads the outputs from the target device.

Requests and downloads the latest inference output from the target device for quality measurements.

Returns:

Tuple with download status (True if successful) and downloaded data.

Return type:

Tuple[bool, Optional[Any]]

download_statistics(final: bool = False) Measurements

Downloads inference statistics from the target device.

By default no statistics are gathered.

Parameters:
final : bool

If the inference is finished

Returns:

Inference statistics on target device.

Return type:

Measurements

abstractmethod event_active(message_type: MessageType | None = None) bool

Checks if an active protocol event (Transmission, Request etc.) of a given message type exists.

Parameters:
message_type : Optional[MessageType]

Message type to check, or None (which will check for an event, that accepts all message types).

Returns:

True if event exists, False otherwise.

Return type:

bool

abstractmethod kill_event(message_type: MessageType | None = None)

Forcibly stops an active event (Transmission, Request etc.).

Parameters:
message_type : Optional[MessageType]

Message type to check, or None (which will stop an event, that accepts all message types).

Raises:

ValueError – There is no such event.

abstractmethod listen(message_type: MessageType | None = None, transmission_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None = None, request_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None = None, limit: int = -1, failure_callback: Callable[[MessageType], None] | None = None)

Waits for transmissions and requests from the other side, without blocking the current thread.

Parameters:
message_type : Optional[MessageType]

Message type of the requests/transmissions to listen for, or None (to listen to requests/transmissions of any message type).

transmission_callback : Optional[ProtocolSuccessCallback]

Function, that will be called when a transmission is successfully received. Message type, payload and flags from the transmission will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

request_callback : Optional[ProtocolSuccessCallback]

Function, that will be called when a request is successfully received. Message type, payload and flags from the request will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

limit : int

Meximum number of requests/transmissions (including failures), that will be received before listening stops.

failure_callback : Optional[ProtocolFailureCallback]

Function, that will be called if a request/transmission is attempted by the other side, but fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

abstractmethod listen_blocking(message_type: MessageType | None = None, transmission_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None = None, request_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None = None, timeout: float | None = None, failure_callback: Callable[[MessageType], None] | None = None) tuple[IncomingEventType | None, MessageType | None, bytes | None, list[TransmissionFlag] | None]

Blocks the current thread until a transmission or a request is received.

Parameters:
message_type : Optional[MessageType]

Message type of the requests/transmissions to listen for, or None (to listen to requests/transmissions of any message type).

transmission_callback : Optional[ProtocolSuccessCallback]

Function, that will be called when a transmission is successfully received. Message type, payload and flags from the transmission will be passed to the function.

request_callback : Optional[ProtocolSuccessCallback]

Function, that will be called when a request is successfully received. Message type, payload and flags from the request will be passed to the function.

timeout : Optional[float]

Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘deny_callback’ will be called.

failure_callback : Optional[ProtocolFailureCallback]

Function, that will be called if a request/transmission is attempted by the other side, but fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

Returns:

Enum denoting whether a request or a transmission was received, message type of the received tranmsission/request, received payload, received flags. Alternatively: (None, None, None, None) if no request/transmission was received in the specified timeout or if a request/transmission was attempted but failed.

Return type:

tuple[Optional[IncomingEventType], Optional[MessageType], Optional[bytes], Optional[List[TransmissionFlag]]]

listen_to_server_logs()

Starts continuously receiving and printing logs sent by the server.

abstractmethod request(message_type: MessageType, callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None], payload: bytes | None = None, flags: list[TransmissionFlag] = [], retry: int = 1, deny_callback: Callable[[MessageType], None] | None = None)

Prompts the other side for a transmission and waits for a response, without blocking the current thread. Bytes and flags can also be sent along with the request.

Parameters:
message_type : MessageType

Value denoting what type of transmission is being requested.

callback : ProtocolSuccessCallback

Function, that will be called when the transmission is received. Message type, payload and flags from the transmission will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

payload : Optional[bytes]

Bytes to send along with the request (or None if the payload is to be empty).

flags : List[TransmissionFlag]

A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).

retry : int

Denotes how many times the request will be re-sent after failing, before calling ‘deny_callback’. Negative number denotes infinite retries.

deny_callback : Optional[ProtocolFailureCallback]

Function, that will be called if the request is denied or otherwise fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

abstractmethod request_blocking(message_type: MessageType, callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None = None, payload: bytes | None = None, flags: list[TransmissionFlag] = [], timeout: float | None = None, retry: int = 1, deny_callback: Callable[[MessageType], None] | None = None) tuple[bytes | None, list[TransmissionFlag] | None]

Prompts the other side for a transmission and blocks the current thread until a response is received. Bytes and flags can also be sent along with the request.

Parameters:
message_type : MessageType

Value denoting what type of transmission is being requested.

callback : Optional[ProtocolSuccessCallback]

Function, that will be called when the transmission is received. Message type, payload and flags from the transmission will be passed to the function.

payload : Optional[bytes]

Bytes to send along with the request (or None if the payload is to be empty).

flags : List[TransmissionFlag]

A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).

timeout : Optional[float]

Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘deny_callback’ will be called.

retry : int

Denotes how many times the request will be re-sent after failing, before calling ‘deny_callback’. Negative number denotes infinite retries.

deny_callback : Optional[ProtocolFailureCallback]

Function, that will be called if the request is denied or otherwise fails.

Returns:

Payload and flags received as response to the request, or (None, None) if the request was denied or otherwise failed.

Return type:

Tuple[Optional[bytes], Optional[List[TransmissionFlag]]]

request_optimization(model_path: ~pathlib.Path, get_time_func: ~typing.Callable[[], float] = <built-in function perf_counter>) tuple[bool, bytes | None]

Request optimization of model.

Parameters:
model_path : Path

Path to the model for optimization.

get_time_func : Callable[[], float]

Function that returns current timestamp.

Returns:

First element is equal to True if optimization finished successfully and the second element contains compiled model.

Return type:

Tuple[bool, Optional[bytes]]

request_processing(get_time_func: ~typing.Callable[[], float] = <built-in function perf_counter>) bool

Requests processing of input data and waits for acknowledgement.

This method triggers inference on target device and waits until the end of inference on target device is reached.

This method measures processing time on the target device from the level of the host.

Target may send its own measurements in the statistics.

Parameters:
get_time_func : Callable[[], float]

Function that returns current timestamp.

Returns:

True if inference finished successfully.

Return type:

bool

serve(upload_input_callback: Callable[[bytes], ServerStatus] | None = None, upload_model_callback: Callable[[bytes], ServerStatus] | None = None, process_input_callback: Callable[[bytes], ServerStatus] | None = None, download_output_callback: Callable[[None], tuple[ServerStatus, bytes | None]] | None = None, download_stats_callback: Callable[[None], tuple[ServerStatus, bytes | None]] | None = None, upload_iospec_callback: Callable[[bytes], ServerStatus] | None = None, upload_optimizers_callback: Callable[[bytes], ServerStatus] | None = None, upload_unoptimized_model_callback: Callable[[bytes], ServerStatus] | None = None, download_optimized_model_callback: Callable[[None], tuple[ServerStatus, bytes | None]] | None = None, upload_runtime_callback: Callable[[bytes], ServerStatus] | None = None)

Waits for requests from the other device (the client) and calls appropriate callbacks. Some callbacks take bytes as arguments, all return ServerStatus and some also return bytes. These responses are then sent back to the client.

Callbacks are guaranteed to be executed in the order requests were sent in.

This method is non-blocking.

Parameters:
upload_input_callback : Optional[ServerUploadCallback]

Called, when the client uploads input (‘upload_input’ method below). Should upload model input into the runtime.

upload_model_callback : Optional[ServerUploadCallback]

Called. when the client uploads optimized model (‘upload_model’ method below). It should load the model and start inference session.

process_input_callback : Optional[ServerUploadCallback]

Called, when the client requests inference. Should return after inference is completed.

download_output_callback : Optional[ServerDownloadCallback]

Called, when the client requests inference output, should return status of the server and the output.

download_stats_callback : Optional[ServerDownloadCallback]

Called, when the client requests inference stats, should end the inference session and return status of the server and stats.

upload_iospec_callback : Optional[ServerUploadCallback]

Called to upload model input/output specifications (iospec).

upload_optimizers_callback : Optional[ServerUploadCallback]

Called to upload optimizer config (serialized JSON) - ‘upload_optimizers’ method call by the client.

upload_unoptimized_model_callback : Optional[ServerUploadCallback]

Called to upload an unoptimized ML model, should save it.

download_optimized_model_callback : Optional[ServerDownloadCallback]

Called, when client requests optimization of the model (uploaded with ‘upload_unoptimized_model_callback’). Should optimize the model and return it.

upload_runtime_callback : Optional[ServerUploadCallback]

Called, when the client uploads runtime.

start_sending_logs()

Starts sending over the protocol all logs, except for the logs generated by the operation of sending logs itself (otherwise we would have an infinite logging recursion).

stop_sending_logs()

Stops sending logs over the protocol.

abstractmethod transmit(message_type: MessageType, payload: bytes | None = None, flags: list[TransmissionFlag] = [], failure_callback: Callable[[MessageType], None] | None = None)

Sends bytes and a set of flags to the other side, without blocking the current thread.

Parameters:
message_type : MessageType

Value denoting what is being sent. It serves to differentiate one transmission from another (analogous to port number in a TCP protocol).

payload : Optional[bytes]

Bytes to send (or None if the payload is to be empty).

flags : List[TransmissionFlag]

A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).

failure_callback : Optional[ProtocolFailureCallback]

Function, that will be called if the transmission fails to send. Note: This will be executed on a separate thread, so make sure the function is thread-safe.

abstractmethod transmit_blocking(message_type: MessageType, payload: bytes | None = None, flags: list[TransmissionFlag] = [], timeout: float | None = None, failure_callback: Callable[[MessageType], None] | None = None)

Sends bytes and a set of flags to the other side, blocks the current thread until the transmission is completed.

Parameters:
message_type : MessageType

Value denoting what is being sent. It serves to differentiate one transmission from another (analogous to port number in a TCP protocol).

payload : Optional[bytes]

Bytes to send (or None if the payload is to be empty).

flags : List[TransmissionFlag]

A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).

timeout : Optional[float]

Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘failure_callback’ will be called.

failure_callback : Optional[ProtocolFailureCallback]

Function, that will be called if the transmission fails to send.

upload_input(data: bytes) bool

Uploads input to the target device and waits for acknowledgement.

This method should wait until the target device confirms the data is delivered and preprocessed for inference.

Parameters:
data : Any

Input data for inference.

Returns:

True if ready for inference.

Return type:

bool

upload_io_specification(path: Path) bool

Uploads input/output specification to the target device.

This method takes the specification in a json format from the given Path and sends it to the target device.

This method should receive the status of uploading the data to the target.

Parameters:
path : Path

Path to the json file.

Returns:

True if data upload finished successfully.

Return type:

bool

upload_model(path: Path) bool

Uploads the model to the target device.

This method takes the model from given Path and sends it to the target device.

This method should receive the status of uploading the model from the target.

Parameters:
path : Path

Path to the model.

Returns:

True if model upload finished successfully.

Return type:

bool

upload_optimizers(optimizers_cfg: dict[str, Any]) bool

Upload optimizers config to the target device.

Parameters:
optimizers_cfg : Dict[str, Any]

Config JSON of optimizers.

Returns:

True if data upload finished successfully.

Return type:

bool

upload_runtime(path: Path) bool

Uploads the runtime to the target device.

This method takes the binary from given Path and sends it to the target device.

This method should receive the status of runtime loading from the target.

Parameters:
path : Path

Path to the runtime binary.

Returns:

True if runtime upload finished successfully.

Return type:

bool

KenningProtocol

KenningProtocol is an abstract class implementing the abstract methods of BytesBasedProtocol (so methods such as request, transmit_blocking) using abstract methods receive_data and send_data (which have to be provided by implementations of this class).

Therefore, it uses a data link that allows for sending/receiving raw bytes and builds a protocol over it that allows for asynchronous, simultaneous communication through multiple channels (each MessageType is essentially a separate channel).

KenningProtocol implementations

Message structure

Size

6 bits

2 bits

8 bits

16 bits

0 or 32 bits

0-n bits

Offset

0-5

6-7

8-15

16-31

32-63

64

Field

Msg Type

Flow control

Checksum

Flags

Payload size

Payload

Flow control values

  • REQUEST - message is a request for transmission

  • REQUEST_RETRANSMIT- message is a request to repeat the last message (because of invalid checksum)

  • ACKNOWLEDGE - message is an acknowledgement of the last message

  • TRANSMISSION - message is part of a transmission

Checksum

Bit-wise XOR of all bytes in the message (except for the checksum itself) with 0x4B byte.

Flags

Flag

Offset in the Flags field

Meaning for REQUEST and TRANSMISSION messages

Meaning for ACKNOWLEDGE messages

SUCCESS

0

can be set/read as a TransmissionFlag

last message was received and accepted

FAIL

1

can be set/read as a TransmissionFlag

last message was received, but it was rejected (ACKNOWLEDGE with a FAIL flag is used to deny a request or reject a transmission)

IS_HOST_MESSAGE

2

can be set/read as a TransmissionFlag

-

HAS_PAYLOAD

3

message has payload

-

FIRST

4

first message in this transmission/request

-

LAST

5

last message in this transmission/request (it is possible for a message to be both FIRST and LAST, if the transmission/request only has a single message

-

IS_KENNING

6

can be set/read as a TransmissionFlag

-

IS_ZEPHYR

7

can be set/read as a TransmissionFlag

-

SERIALIZED (only for MessageType.IO_SPEC)

12

can be set/read as a TransmissionFlag

-

As you can see, certain flags are used by KenningProtocol for flow-control and protocol logic. Other flags are available to the protocol user (like BytesBasedProtocol class), as TransmissionFlag. And with some flags, that depends on what message it is. Some flags are only present for a specific message type (these are carried in the last 4 bits of the Flags field).

Payload size

Size of the payload in bytes.

Example communication scenarios

Note

Transmission flags are omitted.

Request (no payload) denied:

  • A -> B MessageType.OUTPUT, REQUEST, (FIRST, LAST)

  • A <- B MessageType.OUTPUT, ACKNOWLEDGE, (FAIL)

Unprompted single-message transmission:

  • A -> B MessageType.LOGS, TRANSMISSION, (HAS_PAYLOAD, FIRST, LAST)

Request (no payload) with a multi-message transmission as a response:

  • A -> B MessageType.STATS, REQUEST, (FIRST, LAST)

  • A <- B MessageType.STATS, TRANSMISSION, (HAS_PAYLOAD, FIRST)

  • A <- B MessageType.STATS, TRANSMISSION, (HAS_PAYLOAD)

  • A <- B MessageType.STATS, TRANSMISSION, (HAS_PAYLOAD)

  • A <- B MessageType.STATS, TRANSMISSION, (HAS_PAYLOAD, LAST)

Multi-message request with payload with a single-message transmission as a response:

  • A -> B MessageType.IO_SPEC, REQUEST, (HAS_PAYLOAD, FIRST)

  • A -> B MessageType.IO_SPEC, REQUEST, (HAS_PAYLOAD)

  • A -> B MessageType.IO_SPEC, REQUEST, (HAS_PAYLOAD, LAST)

  • A <- B MessageType.STATUS, TRANSMISSION, (HAS_PAYLOAD, FIRST, LAST)

Methods

class kenning.protocols.kenning_protocol.KenningProtocol(timeout: int = -1, error_recovery: bool = False, max_message_size: int = 1024)

Bases: BytesBasedProtocol, ABC

Class for managing the flow of Kenning Protocol.

event_active(message_type: MessageType | None = None) bool

Checks if an active protocol event (Transmission, Request etc.) of a given message type exists.

Parameters:
message_type : Optional[MessageType]

Message type to check, or None (which will check for an event, that accepts all message types).

Returns:

True if event exists, False otherwise.

Return type:

bool

finish_event(event: ProtocolEvent, log_failure: bool = True)

Removes an event from the ‘current_protocol_events’ dict and logs it’s success or failure.

Parameters:
event : ProtocolEvent

Event to finish

log_failure : bool

If true, in case of event failure an error will be logged (used to suppress errors, when the event is being re-tried).

gather_data(timeout: float | None = None) bytes | None

Gathers data from the client.

This method should be called by receive_message in order to get data from the client.

Parameters:
timeout : Optional[float]

Receive timeout in seconds. If timeout > 0, this specifies the maximum wait time, in seconds. If timeout <= 0, the call won’t block, and will report the currently ready file objects. If timeout is None, the call will block until a monitored file object becomes ready.

Returns:

Received data.

Return type:

Optional[bytes]

kill_event(message_type: MessageType | None = None)

Forcibly stops an active event (Transmission, Request etc.).

Parameters:
message_type : Optional[MessageType]

Message type to check, or None (which will stop an event, that accepts all message types).

Raises:

ValueError – There is no such event.

listen(message_type: MessageType | None = None, transmission_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None = None, request_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None = None, limit: int = -1, failure_callback: Callable[[MessageType], None] | None = None)

Waits for transmissions and requests from the other side, without blocking the current thread.

Parameters:
message_type : Optional[MessageType]

Message type of the requests/transmissions to listen for, or None (to listen to requests/transmissions of any message type).

transmission_callback : Optional[ProtocolSuccessCallback]

Function, that will be called when a transmission is successfully received. Message type, payload and flags from the transmission will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

request_callback : Optional[ProtocolSuccessCallback]

Function, that will be called when a request is successfully received. Message type, payload and flags from the request will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

limit : int

Meximum number of requests/transmissions (including failures), that will be received before listening stops.

failure_callback : Optional[ProtocolFailureCallback]

Function, that will be called if a request/transmission is attempted by the other side, but fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

listen_blocking(message_type: MessageType | None = None, transmission_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None = None, request_callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None = None, timeout: float | None = None, failure_callback: Callable[[MessageType], None] | None = None) tuple[IncomingEventType | None, MessageType | None, bytes | None, list[TransmissionFlag] | None]

Blocks the current thread until a transmission or a request is received.

Parameters:
message_type : Optional[MessageType]

Message type of the requests/transmissions to listen for, or None (to listen to requests/transmissions of any message type).

transmission_callback : Optional[ProtocolSuccessCallback]

Function, that will be called when a transmission is successfully received. Message type, payload and flags from the transmission will be passed to the function.

request_callback : Optional[ProtocolSuccessCallback]

Function, that will be called when a request is successfully received. Message type, payload and flags from the request will be passed to the function.

timeout : Optional[float]

Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘deny_callback’ will be called.

failure_callback : Optional[ProtocolFailureCallback]

Function, that will be called if a request/transmission is attempted by the other side, but fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

Returns:

Enum denoting whether a request or a transmission was received, message type of the received tranmsission/request, received payload, received flags. Alternatively: (None, None, None, None) if no request/transmission was received in the specified timeout or if a request/transmission was attempted but failed.

Return type:

tuple[Optional[IncomingEventType], Optional[MessageType], Optional[bytes], Optional[List[TransmissionFlag]]]

abstractmethod receive_data(connection: Any, mask: int) Any | None

Receives data from the target device.

Parameters:
connection : Any

Connection used to read data.

mask : int

Selector mask from the event.

Returns:

Status of receive and optionally data that was received.

Return type:

Optional[Any]

receive_message(timeout: float | None = None) Message | None

Waits for incoming data from the other side of connection.

This method should wait for the input data to arrive and return the appropriate status code along with received data.

Parameters:
timeout : Optional[float]

Receive timeout in seconds. If timeout > 0, this specifies the maximum wait time, in seconds. If timeout <= 0, the call won’t block, and will report the currently ready file objects. If timeout is None, the call will block until a monitored file object becomes ready.

Returns:

Received message, or None if message was not received.

Return type:

Optional[Message]

receiver()

Method sitting in a loop, receiving messages and passing them to the relevant ProtocolEvent class object in the ‘current_protocol_events’ dict, based on message type.

This method is meant to be running constantly on a separate thread, if the protocol is active.

request(message_type: MessageType, callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None], payload: bytes | None = None, flags: list[TransmissionFlag] = [], retry: int = 1, deny_callback: Callable[[MessageType], None] | None = None)

Prompts the other side for a transmission and waits for a response, without blocking the current thread. Bytes and flags can also be sent along with the request.

Parameters:
message_type : MessageType

Value denoting what type of transmission is being requested.

callback : ProtocolSuccessCallback

Function, that will be called when the transmission is received. Message type, payload and flags from the transmission will be passed to the function. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

payload : Optional[bytes]

Bytes to send along with the request (or None if the payload is to be empty).

flags : List[TransmissionFlag]

A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).

retry : int

Denotes how many times the request will be re-sent after failing, before calling ‘deny_callback’. Negative number denotes infinite retries.

deny_callback : Optional[ProtocolFailureCallback]

Function, that will be called if the request is denied or otherwise fails. Note: This will be executed on a separate thread, so make sure the callback function is thread-safe.

request_blocking(message_type: MessageType, callback: Callable[[tuple[MessageType, bytes, list[TransmissionFlag]]], None] | None = None, payload: bytes | None = None, flags: list[TransmissionFlag] = [], timeout: float | None = None, retry: int = 1, deny_callback: Callable[[MessageType], None] | None = None) tuple[bytes | None, list[TransmissionFlag] | None]

Prompts the other side for a transmission and blocks the current thread until a response is received. Bytes and flags can also be sent along with the request.

Parameters:
message_type : MessageType

Value denoting what type of transmission is being requested.

callback : Optional[ProtocolSuccessCallback]

Function, that will be called when the transmission is received. Message type, payload and flags from the transmission will be passed to the function.

payload : Optional[bytes]

Bytes to send along with the request (or None if the payload is to be empty).

flags : List[TransmissionFlag]

A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).

timeout : Optional[float]

Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘deny_callback’ will be called.

retry : int

Denotes how many times the request will be re-sent after failing, before calling ‘deny_callback’. Negative number denotes infinite retries.

deny_callback : Optional[ProtocolFailureCallback]

Function, that will be called if the request is denied or otherwise fails.

Returns:

Payload and flags received as response to the request, or (None, None) if the request was denied or otherwise failed.

Return type:

Tuple[Optional[bytes], Optional[List[TransmissionFlag]]]

run_event(event: ProtocolEvent, success_callback: Callable[[ProtocolEvent], None], deny_callback: Callable[[ProtocolEvent], None])

Adds a protocol event to the ‘current_protocol_events’ dict, so that it will be serviced by the receiver thread, starts it in non-blocking mode and passes callbacks.

Parameters:
event : ProtocolEvent

Event to start.

success_callback : Callable[ProtocolEvent, None]

Function, that will be called if the event succeeds.

deny_callback : Callable[ProtocolEvent, None]

Function, that will be called if the event fails.

Raises:
  • ValueError – Attempted to start the event, while another event of the same type was already in progress.

  • ProtocolNotStartedError – Protocol is not active, call ‘start’ first.

run_event_blocking(event: ProtocolEvent, timeout: float | None) tuple[bool, ProtocolEvent]

Adds a protocol event to the ‘current_protocol_events’ dict, so that it will be serviced by the receiver thread, starts it in blocking mode and blocks the current thread until it completes.

Parameters:
event : ProtocolEvent

Event to start.

timeout : Optional[float]

Maximum blocking time in seconds (or None for infinite timeout).

Returns:

True if event succeeded, False if not, ProtocolEvent object returned by the event (see the dostring of the relevant ProtocolEvent to see what object will be passed here).

Return type:

Tuple[bool, ProtocolEvent]

Raises:
  • ValueError – Attempted to start the event, while another event of the same type was already in progress.

  • ProtocolNotStartedError – Protocol is not active, call ‘start’ first.

abstractmethod send_data(data: Any) bool

Sends data to the target device.

Data can be model to use, input to process, additional configuration.

Parameters:
data : Any

Data to send.

Returns:

True if successful.

Return type:

bool

send_message(message: Message) bool

Sends message to the target device.

Parameters:
message : Message

Message to be sent.

Returns:

True if succeeded.

Return type:

bool

send_messages(message_type: MessageType, messages: list[Message])

Gives a job to the ‘transmitter’ thread pool, that will send messages and call ‘messages_sent’ method on the appropriate ProtocolEvent object in the ‘self.current_protocol_events’ dict (based on message type).

This is meant be be called by the ProtocolEvent objects to send messages and update their state.

Parameters:
message_type : MessageType

Message type of the ProtocolEvent, that sends the messages. This is needed, so that we can later find that object in the ‘self.current_protocol_events’ dict and call ‘messages_sent’. We could not use a reference here, because for example if a Listen object in the dict has an IncomingTransmission object inside, and that inner object calls this method, we need to call ‘message_sent’ on the whole Listen object, not just the inner one.

messages : List[Message]

List of messages to send.

start()

Starts the protocol, creates and runs threads for receiving and transmitting messages.

stop()

Stops the protocol and joins all threads.

transmit(message_type: MessageType, payload: bytes | None = None, flags: list[TransmissionFlag] = [], failure_callback: Callable[[MessageType], None] | None = None)

Sends bytes and a set of flags to the other side, without blocking the current thread.

Parameters:
message_type : MessageType

Value denoting what is being sent. It serves to differentiate one transmission from another (analogous to port number in a TCP protocol).

payload : Optional[bytes]

Bytes to send (or None if the payload is to be empty).

flags : List[TransmissionFlag]

A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).

failure_callback : Optional[ProtocolFailureCallback]

Function, that will be called if the transmission fails to send. Note: This will be executed on a separate thread, so make sure the function is thread-safe.

transmit_blocking(message_type: MessageType, payload: bytes | None = None, flags: list[TransmissionFlag] = [], timeout: float | None = None, failure_callback: Callable[[MessageType], None] | None = None)

Sends bytes and a set of flags to the other side, blocks the current thread until the transmission is completed.

Parameters:
message_type : MessageType

Value denoting what is being sent. It serves to differentiate one transmission from another (analogous to port number in a TCP protocol).

payload : Optional[bytes]

Bytes to send (or None if the payload is to be empty).

flags : List[TransmissionFlag]

A list of flags to be sent (available flags in the TransmissionFlag enum above - please note that some flags are only allowed for a specific message type).

timeout : Optional[float]

Maximum blocking time in seconds, or None to block indefinitely. If that time passes the ‘failure_callback’ will be called.

failure_callback : Optional[ProtocolFailureCallback]

Function, that will be called if the transmission fails to send.

NetworkProtocol

NetworkProtocol implements abstract methods from KenningProtocol (send_data and receive_data), as well as abstract methods from Protocol, that neither KenningProtocol nor BytesBasedProtocol implemented (so disconnect, initialize_client, and initialize_server).

It uses Python sockets.

After calling initialize_server it waits for a client to connect and creates a socket. Only one client at a time can be connected.

When a client connects, the server sends a single byte (0x00) to the client, as a confirmation and test of the connection.

The client does not respond.

Methods

class kenning.protocols.network.NetworkProtocol(host: str = 'localhost', port: int = 12345, timeout: int = -1, error_recovery: bool = False, packet_size: int = 4096, max_message_size: int = 1048576)

Bases: KenningProtocol

A TCP-based protocol.

Protocol is implemented using BSD sockets and selectors-based pooling.

accept_client(socket: socket, mask: int) bool

Accepts the new client.

Parameters:
socket : socket.socket

New client’s socket.

mask : int

Selector mask. Not used.

Returns:

True if client connected successfully, False otherwise.

Return type:

bool

disconnect()

Ends connection with the other side.

initialize_client() bool

Initializes client side of the protocol.

The client side is supposed to run on host testing the target hardware.

The parameters for the client should be provided in the constructor.

Returns:

True if succeeded.

Return type:

bool

initialize_server(client_connected_callback: Callable[[Any], None] | None = None, client_disconnected_callback: Callable[[None], None] | None = None) bool

Initializes server side of the protocol.

The server side is supposed to run on target hardware.

The parameters for the server should be provided in the constructor.

Parameters:
client_connected_callback : Optional[Callable[Any, None]]

Called when a client connects to the server. Either IP address or another distinguishing characteristic of the client will be passed to the callback (depending on the underlying protocol).

client_disconnected_callback : Optional[Callable[None, None]]

Called, when the current client disconnects from the server.

Returns:

True if succeeded.

Return type:

bool

receive_data(socket: socket, mask: int) bytes | None

Receives data from the target device.

Parameters:
connection : Any

Connection used to read data.

mask : int

Selector mask from the event.

Returns:

Status of receive and optionally data that was received.

Return type:

Optional[Any]

send_data(data: bytes)

Sends data to the target device.

Data can be model to use, input to process, additional configuration.

Parameters:
data : Any

Data to send.

Returns:

True if successful.

Return type:

bool

wait_send(data: bytes) int

Wrapper for sending method that waits until write buffer is ready for new data.

Parameters:
data : bytes

Data to send.

Returns:

The number of bytes sent.

Return type:

int

Raises:

ProtocolNotStartedError – The protocol has not been initialized, or it has been initialized as server, but no client has connected.

UARTProtocol

UARTProtocol implements abstract methods from KenningProtocol (send_data and receive_data), as well as abstract methods from Protocol, that neither KenningProtocol nor BytesBasedProtocol implemented (so disconnect and initialize_client).

The initialize_server method raises kenning.core.exceptions.NotSupportedError, since Kenning Server working on UART is not supported.

When initializing, the client sends a MessageType.PING request with TransmissionFlag.SUCCESS, as a test of the connection and a signal starting a session.

The server responds with a MessageType.PING transmission with TransmissionFlag.SUCCESS if it accepts the connection and TransmissionFlag.FAIL if it does not accept the connection (for example because a client is already connected).

When disconnecting, the client sends a MessageType.PING request with TransmissionFlag.FAIL.

The server responds with a MessageType.PING, TransmissionFlag.SUCCESS transmission.

The UARTProtocol class overrides some of the kenning.core.protocol.Protocol abstract methods, that are already implemented by BytesBasedProtocol, in order to change their behaviour:

  • download_statistics - It expects the statistics to arrive in a custom format, instead of a serialized JSON. Statistics are parsed by the kenning.protocols.uart._parse_stats function.

  • upload_io_specification - Instead of sending the input/output specification as a JSON serialized to string, it sends it serialized to a custom format that enables easy mapping of the binary data to a packed C struct. The format is defined in the form of a dict (kenning.interfaces.io_spec_serializer.IOSPEC_STRUCT_FIELDS).

Methods

ROS2Protocol

ROS2Protocol is an implementation based on the ROS 2 framework.

It does not support server-side API methods (initialize_server, serve), target-side model optimization (upload_optimizers, request_optimization), dynamic runtime changes (upload_runtime) or sending/receiving logs (listen_to_server_logs, start_sending_logs, stop_sending_logs).

Methods


Last update: 2025-09-18