Communication with an external application

The communication with an external application is based on a JSON-RPC, BSD sockets and SocketIO. Pipeline Manager implements a TCP server that is listening on a specified port and waiting for the client to connect. Pipeline Manager frontend sends JSON-RPC requests to this server through SocketIO, which redirects messages to connected client for specific actions described in this chapter. External application can also request actions from Pipeline Manager in similar manner.

Communication protocol

The application layer protocol specifies two blocks:

  • size - four first bytes form an unsigned integer and tell the size of the content of the message in bytes.

  • content - the rest of the message holding additional data in JSON-RPC format described in API Specification.

The size and content are stored in big-endian.

Communication structure

By default, Pipeline Manager in server-app mode will wait for an external application to connect and then request the specification. If connection is established successfully, Pipeline Manager frontend will check if external application is still connected every 0.5 second. Apart from that, both Pipeline Manager frontend and external application can send requests which pass through Pipeline Manager backend.

        sequenceDiagram
    box Pipeline Manager
        participant Frontend
        participant Backend
    end
    box Pipeline Manager Backend Communication
        participant External App
    end
    Note over Frontend,Backend: SocketIO
    Frontend->>Backend: status_get
    Backend->>Frontend: status
    Frontend->>+Backend: external_app_connect
    Note over Backend,External App: BSD socket
    External App->>Backend: connect_socket
    Backend->>-Frontend: MessageType.OK
    Frontend->>+Backend: specification_get
    Backend->>External App: specification_get
    External App->>Backend: specification
    Backend->>-Frontend: MessageType.OK + specification
    loop Every 0.5s
        Frontend->>Backend: status_get
        Backend->>Frontend: status
    end
    par Frontend request
        Frontend->>+Backend: request
        Backend->>+External App: redirected request
        External App->>-Backend: response
        Backend->>-Frontend: redirected response
    and External App request
        External App->>+Backend: request
        Backend->>+Frontend: redirected request
        Frontend->>-Backend: response
        Backend->>-External App: redirected response
    end
    

Figure 1 Communication sequence diagram

As Pipeline Manager part of communication is done with SocketIO, it is based on events, which are precisely defined for both sides and can trigger different actions.

Frontend listens to:

  • api – received messages are JSON-RPC requests, they are validated with specification, executed and generated responds are resend,

  • api-response – received messages are JSON-RPC responses, they are also validated and returned as result of previous request.

Backend implements the following events:

  • backend-api – receives all JSON-RPC requests, runs methods and responds,

  • external-api – redirects messages to external application through BSD socket.

On the other hand, communication between backend and external application is done through BSD socket. To manage this, both sides run socket listener as separate coroutine task, which waits for messages and responds or redirects them.

Following communication structure diagram below, we have:

        C4Deployment
    Deployment_Node(pm, "Pipeline Manager", "") {
        Deployment_Node(front, "Frontend", "") {
            Deployment_Node(socketio, "SocketIO", "") {
                Container(front-socket, "SocketIO")
                Deployment_Node(front-event, "Events", "") {
                    Container(front-api, "api")
                    Container(front-response-api, "api-response")
                }
            }
        }
        Deployment_Node(back, "Backend", "") {
            Deployment_Node(flask, "Python-SocketIO events", "") {
                Container(flask-backend-api, "backend-api")
                Container(flask-external-api, "external-api")
            }
            Deployment_Node(flask-task, "Coroutine task", "") {
                Container(back-socket, "BSD socket listener")
            }
        }
    }
    Deployment_Node(pmbc, "Pipeline Manager Backend Communication", "") {
        Deployment_Node(pmbc-socket, "Socket", "") {
            Container(pmbc-socket, "BSD socket")
            Deployment_Node(pmbc-task, "Coroutine task", "") {
                Container(pmbc-listener, "BSD socket listener")
            }
        }
    }
    %% frontend to backend request
    Rel(front-socket, flask-backend-api, "JSON-RPC request")
    Rel(flask-backend-api, front-response-api, "JSON-RPC response")
    UpdateRelStyle(front-socket, flask-backend-api, $lineColor="var(--md-code-hl-keyword-color)", $textColor="var(--md-code-hl-keyword-color)", $offsetY="-15")
    UpdateRelStyle(flask-backend-api, front-response-api, $lineColor="var(--md-code-hl-keyword-color)", $textColor="var(--md-code-hl-keyword-color)", $offsetX="-50", $offsetY="55")
    %% frontend to external app request
    Rel(front-socket, flask-external-api, "JSON-RPC request")
    Rel(flask-external-api, pmbc-listener, "Redirected requests and responses")
    Rel(pmbc-listener, back-socket, "JSON-RPC response")
    Rel(back-socket, front-response-api, "Redirected response")
    UpdateRelStyle(front-socket, flask-external-api, $lineColor="var(--md-code-hl-number-color)", $textColor="var(--md-code-hl-number-color)")
    UpdateRelStyle(flask-external-api, pmbc-listener, $lineColor="var(--md-code-hl-name-color)", $textColor="var(--md-code-hl-name-color)", $offsetX="-10", $offsetY="-10")
    UpdateRelStyle(pmbc-listener, back-socket, $lineColor="var(--md-code-hl-number-color)", $textColor="var(--md-code-hl-number-color)", $offsetX="25", $offsetY="-25")
    UpdateRelStyle(back-socket, front-response-api, $lineColor="var(--md-code-hl-number-color)", $textColor="var(--md-code-hl-number-color)", $offsetX="-10", $offsetY="20")
    %% external app to frontend request
    Rel(pmbc-socket, back-socket, "JSON-RPC request")
    Rel(back-socket, front-api, "Redirected request")
    Rel(front-api, flask-external-api, "JSON-RPC response")
    Rel(pmbc-listener, pmbc-socket, "Received response")
    UpdateRelStyle(pmbc-socket, back-socket, $textColor="var(--md-code-hl-function-color)", $lineColor="var(--md-code-hl-function-color)", $offsetX="-10", $offsetY="15")
    UpdateRelStyle(back-socket, front-api, $textColor="var(--md-code-hl-function-color)", $lineColor="var(--md-code-hl-function-color)", $offsetX="45", $offsetY="-35")
    UpdateRelStyle(front-api, flask-external-api, $textColor="var(--md-code-hl-function-color)", $lineColor="var(--md-code-hl-function-color)")
    UpdateRelStyle(pmbc-listener, pmbc-socket, $textColor="var(--md-code-hl-function-color)", $lineColor="var(--md-code-hl-function-color)", $offsetX="5", $offsetY="-30")

    UpdateLayoutConfig($c4ShapeInRow="1", $c4BoundaryInRow="2")
    

Figure 2 Communication structure diagram

Response messages types - sent by the external application

OK

Message of type OK (0) is used to indicate a success and optionally includes an answer to a previous request. Its content may vary depending on the request type.

ERROR

Message of type ERROR (1) is used to indicate a failure and optionally includes an answer to a previous request. Its content may vary depending on the answered request.

PROGRESS

Message of optional type PROGRESS (2) is used to inform Pipeline Manager about the status of a running dataflow. The PROGRESS message type can only be used once a message of type RUN is received and can be sent multiple times before sending a final response message of type either ERROR or OK that indicates the end of the run. The progress information is conveyed in content using a number ranging 0 - 100 encoded in UTF-8 that signals the percentage of completion of the run. See RUN for more information.

WARNING

Message of type WARNING (3) is used to indicate a success but also alerts of a condition that might cause a problem in the future. It optionally includes an answer to a previous request. Its content may vary depending on the answered request.

API Specification

Frontend API

graph_get

Returns dataflow used by frontend

node_get

Procedure to read node state

  • params

  • result (object): Cannot contain additional properties.

    • node (object, required): State of the node.

properties_get

Procedure to alter values of a node

  • params (object): Cannot contain unevaluated properties.

    • All of

    • properties ([array, null]): Properties which values are going to be read specified by either a name of id. If undefined then all properties are returned.

      • Items (object)

        • One of

          • object: Cannot contain additional properties.

            • id (string, required): Id of the property to read.

          • object: Cannot contain additional properties.

            • name (string, required): Name of the property to read.

  • result (array)

    • Items (object): Values of the searched properties. Cannot contain additional properties.

      • id (string, required): Id of the property.

      • name (string, required): Name of the property.

      • value: Value of the property. Its type depends on the property.

properties_change

Procedure to alter properties of a node

position_change

Procedure to alter position of a node

nodes_change

Procedure to add and delete nodes

connections_change

Procedure to add and delete connections

graph_change

Procedure to add and delete nodes

progress_change

Notification with progress of job ran by external application

  • params (object): Parameters for notification. Cannot contain additional properties.

    • method (string, required): Name of the method used to run the job.

    • progress (number, required): Progress of job ran be external application. If between 0 and 100 progress will be set and if -1, animation will ran.

metadata_change

Updates the editor’s metadata

viewport_center

Center the editor

terminal_add

Create new terminal instance

  • params (object): Parameters for request.

    • name (string, required): Unique name for the new terminal instance. This name is used for any communication.

    • readonly (boolean): Specifies whether the terminal should be read-only, or editable by user. Default: true.

  • result: Refer to null_or_empty.

terminal_write

Writes a message line to a terminal instance. Allows to send both text and hterm’s control sequences. For more details on available control sequences check hterm Control Sequences.

Warning

Since data is sent in JSON format, the hexadecimal values need to be escaped with \u, not \x, e.g. \u001b[38:2:238:178:17mexample.

  • params (object): Parameters for request.

    • name (string, required): Name of the terminal to which the message is written.

    • message (string, required): Message to be written to the terminal.

  • result: Refer to null_or_empty.

notification_send

Sends a notification to the frontend. It will display the message both in the notifications and in the default terminal

  • params (object): Parameters for notification.

    • type: Type of the notification. Must be one of: ["error", "warning", "info"].

    • title (string, required): Title of the notification, appearing both in the terminal and notification.

    • details (string, required): Details of the notification, displayed only in the terminal.

  • result: Refer to null_or_empty.

specification_change

Procedure to update specification

Backend API

status_get

Returns status of connection with external application

  • params: Refer to null_or_empty.

  • result (object): Description of backend status. Cannot contain additional properties.

    • status (object, required): Status. Cannot contain additional properties.

      • connected (boolean, required): Connection with external application.

external_app_connect

Request to wait till connection with external application is established

connected_frontends_get

Request the number of connected fronteds

  • params: Refer to null_or_empty.

  • result (object): Cannot contain additional properties.

    • connections (number, required): Number of connections.

External App API

specification_get

Request specification used by external application

app_capabilities_get

Request external application capabilities

  • params: Refer to null_or_empty.

  • result (object): External application capabilities. Cannot contain additional properties.

    • stoppable_methods (array): List with methods that can be stopped with dataflow_stop request.

      • Items (string): Name of the stoppable method.

dataflow_import

Request to convert dataflow in external app to Pipeline Manager format and import it

  • params (object): Parameters for request. Cannot contain additional properties.

    • external_application_dataflow (string, required): Dataflow in external application format. If loaded file cannot be represented as text, it will be sent as base64 string. To make sure received data are in readable format convert_message_to_string from pipeline_manager_backend_communication.utils can be used. Optionally, for conversion to bytes convert_message_to_bytes function is available.

    • mime (string, required): MIME type.

    • base64 (boolean, required): Specifies whether external_application_dataflow is in byte64 format.

  • result: Refer to external_endpoint_return_type.

dataflow_validate

Request external application to validate dataflow

dataflow_run

Request external application to run dataflow

dataflow_stop

Request external application to run dataflow

  • params (object): Parameters for stopping dataflow. Cannot contain additional properties.

    • method (string, required): Name of the method used to start run.

  • result: Refer to external_endpoint_return_type.

dataflow_export

Request external application to export dataflow

  • params: Refer to dataflow_object.

  • result (object): Common type returned by external app. Cannot contain additional properties.

    • type (number, required): MessageType specifying success or error.

    • content ([object, string]): Exported dataflow, which is then saved by the frontend user. Should be either a json object, or a base64 encoded string. If any error occurred then it should contain a proper message.

    • filename (string): Suggested filename used to save the file.

frontend_on_connect

Request send when Pipeline Manager frontend connects to backend

properties_on_change

Request send when properties of any node changes

position_on_change

Request send when position of any node changes

nodes_on_change

Request send when node was added or deleted

connections_on_change

Request send when connection was added or deleted

graph_on_change

Request send when whole dataflow changed, e.g. when dataflow is loaded

metadata_on_change

Request send when metadata was changed

viewport_on_center

Request send when editor was centered

terminal_read

Request sent by the frontend when terminal received an input

  • params (object): Parameters for request.

    • name (string, required): Name of the terminal to which the message was written. Terminal has to be not read-only.

    • message (string, required): Terminal input.

  • result: Refer to null_or_empty.

Common Types

empty

  • object: Empty object definition. Cannot contain additional properties.

null_or_empty

  • [object, null]: Empty or missing object definition. Cannot contain additional properties.

node_object

  • object: Schema that identifies a node in a graph.

    • graph_id (string, required): Id of the graph.

    • node_id (string, required): Id of the node.

dataflow_object

  • object: Definition containing dataflow object. Cannot contain additional properties.

    • dataflow (object): JSON with graph definition in PM format.

specification_object

  • object: Definition containing specification object. Cannot contain additional properties.

    • specification (object): JSON with specification definition in PM format.

properties_diff

  • object: Schema that represents differences in properties of the node. Cannot contain unevaluated properties.

    • All of

    • properties (array, required): Properties to change specified by either a name or id.

      • Items (object)

        • One of

          • object: Cannot contain additional properties.

            • id (string, required): Id of the property to alter.

            • new_value: New value of the property.

          • object: Cannot contain additional properties.

            • name (string, required): Name of the property to alter.

            • new_value: New value of the property.

position_diff

  • object: Schema that represent differences in position of the node. Cannot contain unevaluated properties.

    • All of

    • position (object, required): Position to change specified by either a name or id.

      • x (number): X coordinate.

      • y (number): Y coordinate.

nodes_diff

  • object: Schema that represents nodes’ differences in a graph.

    • graph_id (string, required): Id of the graph.

    • nodes (object, required): .

      • added (array): List with created nodes.

        • Items (object): JSON with node definition.

      • deleted (array): List with removed nodes.

        • Items (string): ID of node to delete.

    • remove_with_connections (boolean): Should node be removed with connections.

connections_diff

  • object: Schema that represents connections’ differences in a graph.

    • graph_id (string, required): Id of the graph.

    • connections (object, required): .

      • added (array): List with created connections.

        • Items (object): JSON with connection definition.

      • deleted (array): List with removed connections.

        • Items (object): Connection defined with its beginning and end.

          • from (string): ID of output interface.

          • to (string): ID of input interface.

metadata

  • object: Type with PM metadata. Cannot contain additional properties.

    • metadata (object, required): JSON with metadata description in PM format.

external_endpoint_return_type

  • object: Common type returned by external app. Cannot contain additional properties.

    • type (number, required): MessageType specifying success, error or progress.

    • content ([object, string]): Additional information, either message or dataflow.

Custom procedures

External application can define new remote procedures, which will be called by custom Navbar button. To use it, procedure’s name has to start with custom_ prefix, e.g. custom_simulate_design.

Such remote procedures can be called from the frontend using custom Navbar buttons defined in the metadata’s navbarItems field, e.g.:

{
    "name": "Simulate design",
    "stopName": "Stop simulation",
    "iconName": "Run",
    "procedureName": "custom_simulate_design"
}

Custom procedure has the same parameters and return type as dataflow_run method.

Implementing a Python-based client for Pipeline Manager

The communication described above is necessary to integrate an application with Pipeline Manager. The client needs to be able to read requests coming from Pipeline Manager and send proper responses.

For applications written in Python, you can use the pipeline-manager-backend-communication library. It implements an easy-to-use interface that is able to communicate with Pipeline Manager along with helper structures and enumerations.

The main structures provided by the pipeline-manager-backend-communication library are:

  • CommunicationBackend - class that implements the functionality for receiving and sending messages.

  • MessageType - enum used to easily distinguish message types.

  • Status - enum that describes the current state of the client.

The following code is an example of how to receive requests and send responses to Pipeline Manager: As Pipeline Manager communication is based on JSON-RPC, application should implement method that can be requested. They are described in Backend API.

# Class containing all implemented methods
class RPCMethods:
    def specification_get(self) -> Dict:
        # ...
        return {'type': MessageType.OK.value, 'content': specification}

    # ...

Defined methods have to have appropriate (matching with specification) name, input and output.

    # Function name matches with the dataflow_import endpoint from External App API
    def dataflow_import(self, external_application_dataflow: str, mime: str, base64: bool) -> Dict:
        # Function will receive one parameter, it's name has to be the same
        # as the one from API specification `params`.
        # Optional, but you can convert the received file to string format
        # with `convert_message_to_string`
        # from pipeline_manager_backend_communication.utils import (
        #     convert_message_to_string
        # )
        data_as_string = convert_message_to_string(
            external_application_dataflow,
            base64,
            mime
        )
        # ...
        # pipeline_manager_dataflow here is the converted input file to the
        # Pipeline Manager's graph representation
        return {
            'type': MessageType.OK.value,
            'content': pipeline_manager_dataflow
        }

    def dataflow_validate(self, dataflow: Dict) -> Dict:
        # ...
        # Returned object has to match API specification `returns`
        return {'type': MessageType.OK.value}

    def dataflow_run(self, **kwargs: Dict) -> Dict:
        # All params can also be retrieved as one dictionary
        print(kwargs['dataflow'])
        # ...
        return {'type': MessageType.OK.value}

    # Custom procedure example
    def custom_build(self, dataflow: Dict) -> Dict:
        # ...
        return {'type': MessageType.OK.value}

Moreover, every uncaught exception will be classified as error.

    def dataflow_export(self, dataflow: Dict) -> Dict:
        # ...
        raise Exception('Something went very, very bad...')

RPC methods can also be asynchronous. It is automatically detected by server and awaited.

    async def dataflow_stop(self) -> Dict:
        # ...
        return {'type': MessageType.OK.value}

Therefore, the following JSON-RPC error message will be returned to frontend application.

{
    "id": 1,
    "jsonrpc": "2.0",
    "error": {
        "code": -3,
        "message": "Something went very, very bad...",
        "data": {}
    }
}

TCP client, that connects to Pipeline Manager using host and port parameters provided has to be created. It has to be initialized with object containing JSON-RPC methods.

host = '127.0.0.1'
port = 5000

# Creating a client instance with host and port specified
client = CommunicationBackend(host, port)
# Registering implemented methods and
# connecting to Pipeline Manager
await client.initialize_client(RPCMethods())

Once the connection is established, the application can start listening for the incoming requests.

await client.start_json_rpc_client()

These methods can be wrapped into the async function and run with asyncio.run function.

Sending JSON-RPC requests to Pipeline Manager

Sending requests is defined as coroutine which has to be awaited.

response = await client.request('graph_get')

This method sends graph-get request to frontend application and receive following response:

{
    "id": 1,
    "jsonrpc": "2.0",
    "result": {
        "dataflow": {
            // ...
        }
    }
}

Last update: 2025-01-20