Communication with an external application¶
The communication with an external application is based on a JSON-RPC, BSD sockets and SocketIO. Pipeline Manager implements a TCP server that is listening on a specified port and waiting for the client to connect. Pipeline Manager frontend sends JSON-RPC requests to this server through SocketIO, which redirects messages to connected client for specific actions described in this chapter. External application can also request actions from Pipeline Manager in similar manner.
Communication protocol¶
The application layer protocol specifies two blocks:
size
- four first bytes form an unsigned integer and tell the size of the content of the message in bytes.content
- the rest of the message holding additional data in JSON-RPC format described in API Specification.
The size
and content
are stored in big-endian.
Communication structure¶
By default, Pipeline Manager in server-app
mode will wait for an external application to connect and then request the specification.
If connection is established successfully, Pipeline Manager frontend will check if external application is still connected every 0.5 second.
Apart from that, both Pipeline Manager frontend and external application can send requests which pass through Pipeline Manager backend.
As Pipeline Manager part of communication is done with SocketIO, it is based on events, which are precisely defined for both sides and can trigger different actions.
Frontend listens to:
api – received messages are JSON-RPC requests, they are validated with specification, executed and generated responds are resend,
api-response – received messages are JSON-RPC responses, they are also validated and returned as result of previous request.
Backend implements the following events:
backend-api – receives all JSON-RPC requests, runs methods and responds,
external-api – redirects messages to external application through BSD socket.
On the other hand, communication between backend and external application is done through BSD socket. To manage this, both sides run socket listener as separate coroutine task, which waits for messages and responds or redirects them.
Following communication structure diagram below, we have:
blue lines describing Backend API request from frontend,
red lines describing External App API request from frontend,
purple lines describing Frontend API request from external application.
Response messages types - sent by the external application¶
OK¶
Message of type
OK
(0) is used to indicate a success and optionally includes an answer to a previous request.
Its content
may vary depending on the request type.
ERROR¶
Message of type
ERROR
(1) is used to indicate a failure and optionally includes an answer to a previous request.
Its content
may vary depending on the answered request.
PROGRESS¶
Message of optional type
PROGRESS
(2) is used to inform Pipeline Manager about the status of a running dataflow.
The PROGRESS
message type can only be used once a message of type RUN
is received and can be sent multiple times before sending a final response message of type either ERROR
or OK
that indicates the end of the run.
The progress information is conveyed in content
using a number ranging 0 - 100
encoded in UTF-8 that signals the percentage of completion of the run.
See RUN for more information.
WARNING¶
Message of type
WARNING
(3) is used to indicate a success but also alerts of a condition that might cause a problem in the future.
It optionally includes an answer to a previous request.
Its content
may vary depending on the answered request.
API Specification¶
Frontend API¶
graph_get¶
Returns dataflow used by frontend
params
: Refer to null_or_empty.result
: Refer to dataflow_object.
node_get¶
Procedure to read node state
params
All of
Refer to node_object.
result
(object): Cannot contain additional properties.node
(object, required): State of the node.
properties_get¶
Procedure to alter values of a node
params
(object): Cannot contain unevaluated properties.All of
Refer to node_object.
properties
([array, null]): Properties which values are going to be read specified by either a name of id. If undefined then all properties are returned.Items (object)
One of
object: Cannot contain additional properties.
id
(string, required): Id of the property to read.
object: Cannot contain additional properties.
name
(string, required): Name of the property to read.
result
(array)Items (object): Values of the searched properties. Cannot contain additional properties.
id
(string, required): Id of the property.name
(string, required): Name of the property.value
: Value of the property. Its type depends on the property.
properties_change¶
Procedure to alter properties of a node
params
: Refer to properties_diff.result
: Refer to null_or_empty.
position_change¶
Procedure to alter position of a node
params
: Refer to position_diff.result
: Refer to null_or_empty.
nodes_change¶
Procedure to add and delete nodes
params
: Refer to nodes_diff.result
: Refer to null_or_empty.
connections_change¶
Procedure to add and delete connections
params
: Refer to connections_diff.result
: Refer to null_or_empty.
graph_change¶
Procedure to add and delete nodes
params
: Refer to dataflow_object.result
: Refer to null_or_empty.
progress_change¶
Notification with progress of job ran by external application
params
(object): Parameters for notification. Cannot contain additional properties.method
(string, required): Name of the method used to run the job.progress
(number, required): Progress of job ran be external application. If between 0 and 100 progress will be set and if -1, animation will ran.
metadata_change¶
Updates the editor’s metadata
params
: Refer to metadata.
viewport_center¶
Center the editor
params
: Refer to null_or_empty.
terminal_add¶
Create new terminal instance
params
(object): Parameters for request.name
(string, required): Unique name for the new terminal instance. This name is used for any communication.readonly
(boolean): Specifies whether the terminal should be read-only, or editable by user. Default:true
.
result
: Refer to null_or_empty.
terminal_write¶
Writes a message line to a terminal instance. Allows to send both text and hterm’s control sequences. For more details on available control sequences check hterm Control Sequences.
Warning
Since data is sent in JSON format, the hexadecimal values need to be escaped with \u
, not \x
, e.g. \u001b[38:2:238:178:17mexample
.
params
(object): Parameters for request.name
(string, required): Name of the terminal to which the message is written.message
(string, required): Message to be written to the terminal.
result
: Refer to null_or_empty.
notification_send¶
Sends a notification to the frontend. It will display the message both in the notifications and in the default terminal
params
(object): Parameters for notification.type
: Type of the notification. Must be one of:["error", "warning", "info"]
.title
(string, required): Title of the notification, appearing both in the terminal and notification.details
(string, required): Details of the notification, displayed only in the terminal.
result
: Refer to null_or_empty.
specification_change¶
Procedure to update specification
params
: Refer to specification_object.result
: Refer to null_or_empty.
Backend API¶
status_get¶
Returns status of connection with external application
params
: Refer to null_or_empty.result
(object): Description of backend status. Cannot contain additional properties.status
(object, required): Status. Cannot contain additional properties.connected
(boolean, required): Connection with external application.
external_app_connect¶
Request to wait till connection with external application is established
params
: Refer to null_or_empty.result
: Refer to empty.
connected_frontends_get¶
Request the number of connected fronteds
params
: Refer to null_or_empty.result
(object): Cannot contain additional properties.connections
(number, required): Number of connections.
External App API¶
specification_get¶
Request specification used by external application
params
: Refer to null_or_empty.result
: Refer to external_endpoint_return_type.
app_capabilities_get¶
Request external application capabilities
params
: Refer to null_or_empty.result
(object): External application capabilities. Cannot contain additional properties.stoppable_methods
(array): List with methods that can be stopped with dataflow_stop request.Items (string): Name of the stoppable method.
dataflow_import¶
Request to convert dataflow in external app to Pipeline Manager format and import it
params
(object): Parameters for request. Cannot contain additional properties.external_application_dataflow
(string, required): Dataflow in external application format. If loaded file cannot be represented as text, it will be sent as base64 string. To make sure received data are in readable formatconvert_message_to_string
frompipeline_manager_backend_communication.utils
can be used. Optionally, for conversion to bytesconvert_message_to_bytes
function is available.mime
(string, required): MIME type.base64
(boolean, required): Specifies whetherexternal_application_dataflow
is in byte64 format.
result
: Refer to external_endpoint_return_type.
dataflow_validate¶
Request external application to validate dataflow
params
: Refer to dataflow_object.result
: Refer to external_endpoint_return_type.
dataflow_run¶
Request external application to run dataflow
params
: Refer to dataflow_object.result
: Refer to external_endpoint_return_type.
dataflow_stop¶
Request external application to run dataflow
params
(object): Parameters for stopping dataflow. Cannot contain additional properties.method
(string, required): Name of the method used to start run.
result
: Refer to external_endpoint_return_type.
dataflow_export¶
Request external application to export dataflow
params
: Refer to dataflow_object.result
(object): Common type returned by external app. Cannot contain additional properties.type
(number, required): MessageType specifying success or error.content
([object, string]): Exported dataflow, which is then saved by the frontend user. Should be either a json object, or a base64 encoded string. If any error occurred then it should contain a proper message.filename
(string): Suggested filename used to save the file.
frontend_on_connect¶
Request send when Pipeline Manager frontend connects to backend
params
: Refer to null_or_empty.result
: Refer to null_or_empty.
properties_on_change¶
Request send when properties of any node changes
params
: Refer to properties_diff.result
: Refer to null_or_empty.
position_on_change¶
Request send when position of any node changes
params
: Refer to position_diff.result
: Refer to null_or_empty.
nodes_on_change¶
Request send when node was added or deleted
params
: Refer to nodes_diff.result
: Refer to null_or_empty.
connections_on_change¶
Request send when connection was added or deleted
params
: Refer to connections_diff.result
: Refer to null_or_empty.
graph_on_change¶
Request send when whole dataflow changed, e.g. when dataflow is loaded
params
: Refer to dataflow_object.result
: Refer to null_or_empty.
metadata_on_change¶
Request send when metadata was changed
params
: Refer to metadata.result
: Refer to null_or_empty.
viewport_on_center¶
Request send when editor was centered
params
: Refer to null_or_empty.result
: Refer to null_or_empty.
terminal_read¶
Request sent by the frontend when terminal received an input
params
(object): Parameters for request.name
(string, required): Name of the terminal to which the message was written. Terminal has to be not read-only.message
(string, required): Terminal input.
result
: Refer to null_or_empty.
Common Types¶
empty¶
object: Empty object definition. Cannot contain additional properties.
null_or_empty¶
[object, null]: Empty or missing object definition. Cannot contain additional properties.
node_object¶
object: Schema that identifies a node in a graph.
graph_id
(string, required): Id of the graph.node_id
(string, required): Id of the node.
dataflow_object¶
object: Definition containing
dataflow
object. Cannot contain additional properties.dataflow
(object): JSON with graph definition in PM format.
specification_object¶
object: Definition containing
specification
object. Cannot contain additional properties.specification
(object): JSON with specification definition in PM format.
properties_diff¶
object: Schema that represents differences in properties of the node. Cannot contain unevaluated properties.
All of
Refer to node_object.
properties
(array, required): Properties to change specified by either a name or id.Items (object)
One of
object: Cannot contain additional properties.
id
(string, required): Id of the property to alter.new_value
: New value of the property.
object: Cannot contain additional properties.
name
(string, required): Name of the property to alter.new_value
: New value of the property.
position_diff¶
object: Schema that represent differences in position of the node. Cannot contain unevaluated properties.
All of
Refer to node_object.
position
(object, required): Position to change specified by either a name or id.x
(number): X coordinate.y
(number): Y coordinate.
nodes_diff¶
object: Schema that represents nodes’ differences in a graph.
graph_id
(string, required): Id of the graph.nodes
(object, required): .added
(array): List with created nodes.Items (object): JSON with node definition.
deleted
(array): List with removed nodes.Items (string): ID of node to delete.
remove_with_connections
(boolean): Should node be removed with connections.
connections_diff¶
object: Schema that represents connections’ differences in a graph.
graph_id
(string, required): Id of the graph.connections
(object, required): .added
(array): List with created connections.Items (object): JSON with connection definition.
deleted
(array): List with removed connections.Items (object): Connection defined with its beginning and end.
from
(string): ID of output interface.to
(string): ID of input interface.
metadata¶
object: Type with PM metadata. Cannot contain additional properties.
metadata
(object, required): JSON with metadata description in PM format.
external_endpoint_return_type¶
object: Common type returned by external app. Cannot contain additional properties.
type
(number, required): MessageType specifying success, error or progress.content
([object, string]): Additional information, either message or dataflow.
Custom procedures¶
External application can define new remote procedures, which will be called by custom Navbar button.
To use it, procedure’s name has to start with custom_
prefix, e.g. custom_simulate_design
.
Such remote procedures can be called from the frontend using custom Navbar buttons defined in the metadata’s navbarItems
field, e.g.:
{
"name": "Simulate design",
"stopName": "Stop simulation",
"iconName": "Run",
"procedureName": "custom_simulate_design"
}
Custom procedure has the same parameters and return type as dataflow_run method.
Implementing a Python-based client for Pipeline Manager¶
The communication described above is necessary to integrate an application with Pipeline Manager. The client needs to be able to read requests coming from Pipeline Manager and send proper responses.
For applications written in Python, you can use the pipeline-manager-backend-communication library. It implements an easy-to-use interface that is able to communicate with Pipeline Manager along with helper structures and enumerations.
The main structures provided by the pipeline-manager-backend-communication
library are:
CommunicationBackend
- class that implements the functionality for receiving and sending messages.MessageType
- enum used to easily distinguish message types.Status
- enum that describes the current state of the client.
The following code is an example of how to receive requests and send responses to Pipeline Manager: As Pipeline Manager communication is based on JSON-RPC, application should implement method that can be requested. They are described in Backend API.
# Class containing all implemented methods
class RPCMethods:
def specification_get(self) -> Dict:
# ...
return {'type': MessageType.OK.value, 'content': specification}
# ...
Defined methods have to have appropriate (matching with specification) name, input and output.
# Function name matches with the dataflow_import endpoint from External App API
def dataflow_import(self, external_application_dataflow: str, mime: str, base64: bool) -> Dict:
# Function will receive one parameter, it's name has to be the same
# as the one from API specification `params`.
# Optional, but you can convert the received file to string format
# with `convert_message_to_string`
# from pipeline_manager_backend_communication.utils import (
# convert_message_to_string
# )
data_as_string = convert_message_to_string(
external_application_dataflow,
base64,
mime
)
# ...
# pipeline_manager_dataflow here is the converted input file to the
# Pipeline Manager's graph representation
return {
'type': MessageType.OK.value,
'content': pipeline_manager_dataflow
}
def dataflow_validate(self, dataflow: Dict) -> Dict:
# ...
# Returned object has to match API specification `returns`
return {'type': MessageType.OK.value}
def dataflow_run(self, **kwargs: Dict) -> Dict:
# All params can also be retrieved as one dictionary
print(kwargs['dataflow'])
# ...
return {'type': MessageType.OK.value}
# Custom procedure example
def custom_build(self, dataflow: Dict) -> Dict:
# ...
return {'type': MessageType.OK.value}
Moreover, every uncaught exception will be classified as error.
def dataflow_export(self, dataflow: Dict) -> Dict:
# ...
raise Exception('Something went very, very bad...')
RPC methods can also be asynchronous. It is automatically detected by server and awaited.
async def dataflow_stop(self) -> Dict:
# ...
return {'type': MessageType.OK.value}
Therefore, the following JSON-RPC error message will be returned to frontend application.
{
"id": 1,
"jsonrpc": "2.0",
"error": {
"code": -3,
"message": "Something went very, very bad...",
"data": {}
}
}
TCP client, that connects to Pipeline Manager using host
and port
parameters provided has to be created.
It has to be initialized with object containing JSON-RPC methods.
host = '127.0.0.1'
port = 5000
# Creating a client instance with host and port specified
client = CommunicationBackend(host, port)
# Registering implemented methods and
# connecting to Pipeline Manager
await client.initialize_client(RPCMethods())
Once the connection is established, the application can start listening for the incoming requests.
await client.start_json_rpc_client()
These methods can be wrapped into the async
function and run with asyncio.run
function.
Sending JSON-RPC requests to Pipeline Manager¶
Sending requests is defined as coroutine which has to be awaited.
response = await client.request('graph_get')
This method sends graph-get request to frontend application and receive following response:
{
"id": 1,
"jsonrpc": "2.0",
"result": {
"dataflow": {
// ...
}
}
}