Dataflow graph builder

Class GraphBuilder (available in pipeline_manager.dataflow_builder.dataflow_builder) provides a convenient Python API for creating and editing a dataflow graph.

Examples of GraphBuilder usage

Creating GraphBuilder

In order to create an instance of GraphBuilder:

from pipeline_manager.dataflow_builder.dataflow_builder import GraphBuilder

builder = GraphBuilder(
    specification="examples/sample-specification.json",
    specification_version="20240723.13",
)

The code creates an instance of GraphBuilder with the sample-specification.json loaded.

Obtaining a dataflow graph

A dataflow graph may be either loaded from a dataflow file or created.

To create a new empty dataflow graph:

from pipeline_manager.dataflow_builder.dataflow_graph import DataflowGraph

graph = builder.create_graph()

To load an existing specification from a file:

builder.load_graphs(dataflow_path='examples/sample-dataflow.json')
graph = builder.graphs[0]

In both cases, the dataflow graph:

  • has to be compliant with the provided specification.

  • is retrieved and may be edited with its public methods.

Creating a node

Having obtained the DataflowGraph object (in the previous examples, stored in the graph variable), a new node may be added to the graph in the following way:

from pipeline_manager.dataflow_builder.entities import Node

node = graph.create_node(
    name="LoadVideo", # Required.
    position=Vector2(0, 0), # Optional.
)

Only keyword parameters are allowed. The available parameters of the create_node method are attributes of the Node dataclass (which is located under pipeline_manager.dataflow_builder.entities.Node).

Getting a node

A dataflow graphs stores nodes. Thus, nodes are retrieved from the dataflow graph in the following way:

from pipeline_manager.dataflow_builder.dataflow_graph import AttributeType

nodes = graph.get(AttributeType.NODE, name="LoadVideo")
if len(nodes) > 0:
    node = nodes[0]

Only nodes with name equal to LoadVideo will be retrieved.

The DataflowGraph.get method retrieves not only nodes but other objects present in the AttributeType enum: connections, interfaces, and nodes. In this case, nodes are retrieved. Parameters other than type are filters, which specify what values should the obtained objects (here: nodes) have. Take a look at the next example to see such usage.

Getting a node matching multiple criteria

To get a node, which satisfies all the matching criteria, use the get method once again:

from pipeline_manager.dataflow_builder.entities import Vector2

[node] = graph.get(AttributeType.NODE, position=Vector2(0, 0), name="LoadVideo")

The code above puts in the node variable a node with name LoadVideo and position with coordinates [0, 0].

Manipulating a node

After having a node retrieved from a graph, it is time to modify its attributes. For example, to change an instance name of the node, run:

node.instance_name = 'Instance name of a node'

To change the property of the node, set_property can be used:

try:
    # Use of `set_property` is recommended to set values of properties of a node.
    node.set_property('compression_rate', 1)
except KeyError:
    print('Cannot set compression_rate to 1 as the property does not exists.')

To move the node from its current position, use move:

# Move a node to [1000, 1000].
node.move(Vector2(1000, 1000))

# Move the node by 500 pixels to the right, relative to its previous position.
node.move(new_position=Vector2(500, 0), relative=True)

Specification of GraphBuilder

class pipeline_manager.dataflow_builder.dataflow_builder.GraphBuilder(specification: Path, specification_version: str)

Class for building dataflow graphs.

Each instance of the GraphBuilder must be associated with a single specification to ensure proper validation.

create_graph(based_on: Path | str | DataflowGraph | None = None) DataflowGraph

Create a dataflow graph and return its instance.

Create an instance of a dataflow graph, add it to the internal list and return it. The dataflow graph is based on the graph provided in based_on parameter.

Parameters:
based_on : Union[Path, str, DataflowGraph, None], optional

Dataflow graph, on which the new graph should be based on. When Path or str, it should be a path to dataflow graph in a JSON format. When DataflowGraph, it should be a valid representation (as its deep copy will be added). When None, the new dataflow graph will not be based on anything, by default None.

Returns:

Instance a of a dataflow graph, preserved in the GraphBuilder.

Return type:

DataflowGraph

load_graphs(dataflow_path: Path)

Load all dataflow graphs from a file.

Parameters:
dataflow_path : Path

Path to a dataflow graph.

Raises:

ValueError – Raised if a dataflow graph could not be loaded.

load_specification(specification_path: Path, purge_old_graphs: bool = True)

Load a specification from a file to use in GraphBuilder.

The default behaviour is to remove loaded dataflow graphs when loading a new specification. That is due to the fact that a dataflow graph is closely linked with its specification. Notice that loading a specification overrides the old one.

Parameters:
specification_path : Path

Path to a specification file.

purge_old_graphs : bool, optional

Determine if dataflow graphs loaded to memory should be purged. It makes sense as after changing a specification dataflow graphs may no longer be valid. By default True.

Raises:

ValueError – Raised if specification file cannot be loaded or is invalid.

save(json_file: Path)

Save graphs to a JSON file.

Parameters:
json_file : Path

Path, where an output JSON file will be created.

validate()

Validate the entire dataflow file including all the included graphs.

Raises:

RuntimeError – Raised if an external validator failed to validate either a dataflow or specification file. An error message is provided by the external validator.

Specification of DataflowGraph

class pipeline_manager.dataflow_builder.dataflow_graph.DataflowGraph(builder_with_spec: SpecificationBuilder, dataflow: dict[str, Any] | None = None)

Representation of a dataflow graph.

create_connection(from_interface: Interface | str, to_interface: Interface | str, connection_id: str | None = None) InterfaceConnection

Create a connection between two existing interfaces.

The function performs numerous checks to verify validity of the desired connection, including if the connection already exists, interfaces’ types are matching and so on. The connection is added, given it has passed these checks.

Parameters:
from_interface : Union[Interface, str]

Source interface, where data will flow from.

to_interface : Union[Interface, str]

Destination interface, where data will flow to.

connection_id : Optional[str]

Identifier of a connection. If not supplied, one will be generated.

Returns:

Created connection added to the graph.

Return type:

InterfaceConnection

Raises:

ValueError – Raised if: - a source interface does not belong to the graph. - a destination interface does not belong to the graph. - a source interface direction is input. - a destination interface direction is output. - a mismatch between source and destination interfaces’ types occurs.

create_node(name: str, **kwargs) Node

Create the node initialized with the supplied arguments.

Use this method instead of manually adding a node. Default values are taken from the specification, based on the provided name parameter. The default values may be overridden by the values supplied in kwargs. id is already initialized.

Parameters:
name : str

Name of a node, based on which default values will be derived.

**kwargs

Keyword arguments to initialise a newly created node. Check attributes of Node dataclass, to find all available keys.

Returns:

The initialized node that belongs to the dataflow graph.

Return type:

Node

Raises:

ValueError – Raised if name key is missing in the kwargs directory or the provided name of the node does not exists in the specification.

get(type: AttributeType, **kwargs) list[Node] | list[InterfaceConnection] | list[Interface]

Get items of a given type, which satisfy all the desired criteria.

Items are understood as either nodes or connections or interfaces. The function finds objects by eliminating these, which do not match the criteria. Thus, between all the criteria is AND logical operator.

Parameters:
type : AttributeType

Type of the output objects.

**kwargs

Contains search criteria. Available: - Keys: the attributes of the object of he chosen type. - Values: values to be matched.

Returns:

List of items satisfying the criteria.

Return type:

Union[List[Node], List[InterfaceConnection], List[Interface]]

get_by_id(type: AttributeType, id: str) InterfaceConnection | Node | Interface | None

Fast getter, which finds an item of a supplied type and with the provided id.

It has complexity of O(1) (except for interfaces) in juxtaposition to the get method, which iterates over all items.

Parameters:
type : AttributeType

Type of the output objects.

id : str

ID of the sought object.

Returns:

Either an single instance of InterfaceConnection or Node or Interface depending on the provided type. If does not exist, None is returned.

Return type:

Optional[Union[InterfaceConnection, Node, Interface]]

to_json(as_str: bool = True) str | dict

Convert a dataflow graph to the JSON format.

Parameters:
as_str : bool, optional

Determine return type. By default, True.

Returns:

Representation of the dataflow graph in JSON. If as_str is True, JSON in str is returned. Otherwise, a Python dictionary is returned.

Return type:

Union[Dict, str]

Specification of Node

class pipeline_manager.dataflow_builder.entities.Node(specification_builder: SpecificationBuilder, **kwargs)

Representation of a node in a dataflow graph.

get(type: NodeAttributeType, **kwargs) list[Property] | list[Interface]

Get either properties or interfaces matching criteria.

Parameters:
type : NodeAttributeType

Type of an item to retrieve.

kwargs : Any

Criteria, which items have to satisfy.

Returns:

List of either Property or Interface instances.

Return type:

Union[List[Property], List[Interface]]

move(new_position: Vector2, relative: bool = False)

Change a position of the node.

Change a position of the node either in a relative manner or an absolute manner. Values are clamped to stay in the range of values.

Parameters:
new_position : Vector2

If relative is False, then this a new position. Otherwise, it is a displacement (movement) vector.

relative : bool, optional

Whether position should be calculated based on the previous one (True) or not (False), by default False.

set_property(property_name: str, property_value: Any) None

Convenient setter to change value of a property of the node.

Parameters:
property_name : str

Name of the property.

property_value : Any

New value of a property.

Return type:

None

Raises:

KeyError – Raised if property with the supplied property_named was not found.

to_json(as_str=True) str | dict

Convert a dataflow graph to the JSON format.

Parameters:
as_str : bool, optional

Determine return type. By default, True.

Returns:

Representation of the dataflow graph in JSON. If as_str is True, JSON in str is returned. Otherwise, a Python dictionary is returned.

Return type:

Union[Dict, str]

Specification of Interface

class pipeline_manager.dataflow_builder.entities.Interface(name: str, direction: ~pipeline_manager.dataflow_builder.entities.Direction, side: ~pipeline_manager.dataflow_builder.entities.Side | None = None, side_position: int | None = None, external_name: str | None = None, id: str = '74d47adb-8d56-4283-8ea5-61838f02ece7', type: ~typing.List[str] = <factory>)

Representation of a node’s interface.

to_json(as_str: bool) str | dict

Convert a dataflow graph to the JSON format.

Parameters:
as_str : bool, optional

Determine return type. By default, True.

Returns:

Representation of the dataflow graph in JSON. If as_str is True, JSON in str is returned. Otherwise, a Python dictionary is returned.

Return type:

Union[Dict, str]

Specification of Connection

Notice that name of the class representing connection is InterfaceConnection, not Connection.

class pipeline_manager.dataflow_builder.entities.InterfaceConnection(from_interface: Interface, to_interface: Interface, anchors: list[Vector2] | None = None, id: str = '9028b6e4-9c1d-4745-b785-1277142bba79')

Representation of a connection between two interfaces in a dataflow graph.

to_json(as_str: bool = True) dict | str

Convert a dataflow graph to the JSON format.

Parameters:
as_str : bool, optional

Determine return type. By default, True.

Returns:

Representation of the dataflow graph in JSON. If as_str is True, JSON in str is returned. Otherwise, a Python dictionary is returned.

Return type:

Union[Dict, str]

Specification of Property

class pipeline_manager.dataflow_builder.entities.Property(name: str, value: Any, id: str = '3e3cba6f-c997-4457-84f5-efaa9494fc5b')

A property of a node.

to_json(as_str: bool = True) dict | str

Convert a dataflow graph to the JSON format.

Parameters:
as_str : bool, optional

Determine return type. By default, True.

Returns:

Representation of the dataflow graph in JSON. If as_str is True, JSON in str is returned. Otherwise, a Python dictionary is returned.

Return type:

Union[Dict, str]


Last update: 2025-01-09