Dataflow graph builder¶
Class GraphBuilder
(available in pipeline_manager.dataflow_builder.dataflow_builder
) provides a convenient Python API for creating and editing a dataflow graph.
Examples of GraphBuilder usage¶
Creating GraphBuilder¶
In order to create an instance of GraphBuilder
:
from pipeline_manager.dataflow_builder.dataflow_builder import GraphBuilder
builder = GraphBuilder(
specification="examples/sample-specification.json",
specification_version="20240723.13",
)
The code creates an instance of GraphBuilder
with the sample-specification.json
loaded.
Obtaining a dataflow graph¶
A dataflow graph may be either loaded from a dataflow file or created.
To create a new empty dataflow graph:
from pipeline_manager.dataflow_builder.dataflow_graph import DataflowGraph
graph = builder.create_graph()
To load an existing specification from a file:
builder.load_graphs(dataflow_path='examples/sample-dataflow.json')
graph = builder.graphs[0]
In both cases, the dataflow graph:
has to be compliant with the provided specification.
is retrieved and may be edited with its public methods.
Creating a node¶
Having obtained the DataflowGraph
object (in the previous examples, stored in the graph
variable), a new node may be added to the graph in the following way:
from pipeline_manager.dataflow_builder.entities import Node
node = graph.create_node(
name="LoadVideo", # Required.
position=Vector2(0, 0), # Optional.
)
Only keyword parameters are allowed.
The available parameters of the create_node
method are attributes of the Node
dataclass (which is located under pipeline_manager.dataflow_builder.entities.Node
).
Getting a node¶
A dataflow graphs stores nodes. Thus, nodes are retrieved from the dataflow graph in the following way:
from pipeline_manager.dataflow_builder.dataflow_graph import AttributeType
nodes = graph.get(AttributeType.NODE, name="LoadVideo")
if len(nodes) > 0:
node = nodes[0]
Only nodes with name
equal to LoadVideo
will be retrieved.
The DataflowGraph.get
method retrieves not only nodes but other objects present in the AttributeType
enum: connections, interfaces, and nodes.
In this case, nodes are retrieved.
Parameters other than type
are filters, which specify what values should the obtained objects (here: nodes) have.
Take a look at the next example to see such usage.
Getting a node matching multiple criteria¶
To get a node, which satisfies all the matching criteria, use the get
method once again:
from pipeline_manager.dataflow_builder.entities import Vector2
[node] = graph.get(AttributeType.NODE, position=Vector2(0, 0), name="LoadVideo")
The code above puts in the node
variable a node with name LoadVideo
and position with coordinates [0, 0]
.
Manipulating a node¶
After having a node retrieved from a graph, it is time to modify its attributes. For example, to change an instance name of the node, run:
node.instance_name = 'Instance name of a node'
To change the property of the node, set_property
can be used:
try:
# Use of `set_property` is recommended to set values of properties of a node.
node.set_property('compression_rate', 1)
except KeyError:
print('Cannot set compression_rate to 1 as the property does not exists.')
To move the node from its current position, use move
:
# Move a node to [1000, 1000].
node.move(Vector2(1000, 1000))
# Move the node by 500 pixels to the right, relative to its previous position.
node.move(new_position=Vector2(500, 0), relative=True)
Specification of GraphBuilder¶
- class pipeline_manager.dataflow_builder.dataflow_builder.GraphBuilder(specification: Path, specification_version: str)¶
Class for building dataflow graphs.
Each instance of the GraphBuilder must be associated with a single specification to ensure proper validation.
-
create_graph(based_on: Path | str | DataflowGraph | None =
None
) DataflowGraph ¶ Create a dataflow graph and return its instance.
Create an instance of a dataflow graph, add it to the internal list and return it. The dataflow graph is based on the graph provided in based_on parameter.
- Parameters:¶
- based_on : Union[Path, str, DataflowGraph, None], optional¶
Dataflow graph, on which the new graph should be based on. When Path or str, it should be a path to dataflow graph in a JSON format. When DataflowGraph, it should be a valid representation (as its deep copy will be added). When None, the new dataflow graph will not be based on anything, by default None.
- Returns:¶
Instance a of a dataflow graph, preserved in the GraphBuilder.
- Return type:¶
- load_graphs(dataflow_path: Path)¶
Load all dataflow graphs from a file.
-
load_specification(specification_path: Path, purge_old_graphs: bool =
True
)¶ Load a specification from a file to use in GraphBuilder.
The default behaviour is to remove loaded dataflow graphs when loading a new specification. That is due to the fact that a dataflow graph is closely linked with its specification. Notice that loading a specification overrides the old one.
-
create_graph(based_on: Path | str | DataflowGraph | None =
Specification of DataflowGraph¶
-
class pipeline_manager.dataflow_builder.dataflow_graph.DataflowGraph(builder_with_spec: SpecificationBuilder, dataflow: dict[str, Any] | None =
None
)¶ Representation of a dataflow graph.
-
create_connection(from_interface: Interface | str, to_interface: Interface | str, connection_id: str | None =
None
) InterfaceConnection ¶ Create a connection between two existing interfaces.
The function performs numerous checks to verify validity of the desired connection, including if the connection already exists, interfaces’ types are matching and so on. The connection is added, given it has passed these checks.
- Parameters:¶
- Returns:¶
Created connection added to the graph.
- Return type:¶
- Raises:¶
ValueError – Raised if: - a source interface does not belong to the graph. - a destination interface does not belong to the graph. - a source interface direction is input. - a destination interface direction is output. - a mismatch between source and destination interfaces’ types occurs.
- create_node(name: str, **kwargs) Node ¶
Create the node initialized with the supplied arguments.
Use this method instead of manually adding a node. Default values are taken from the specification, based on the provided name parameter. The default values may be overridden by the values supplied in kwargs. id is already initialized.
- get(type: AttributeType, **kwargs) list[Node] | list[InterfaceConnection] | list[Interface] ¶
Get items of a given type, which satisfy all the desired criteria.
Items are understood as either nodes or connections or interfaces. The function finds objects by eliminating these, which do not match the criteria. Thus, between all the criteria is AND logical operator.
- get_by_id(type: AttributeType, id: str) InterfaceConnection | Node | Interface | None ¶
Fast getter, which finds an item of a supplied type and with the provided id.
It has complexity of O(1) (except for interfaces) in juxtaposition to the get method, which iterates over all items.
-
create_connection(from_interface: Interface | str, to_interface: Interface | str, connection_id: str | None =
Specification of Node¶
- class pipeline_manager.dataflow_builder.entities.Node(specification_builder: SpecificationBuilder, **kwargs)¶
Representation of a node in a dataflow graph.
- get(type: NodeAttributeType, **kwargs) list[Property] | list[Interface] ¶
Get either properties or interfaces matching criteria.
-
move(new_position: Vector2, relative: bool =
False
)¶ Change a position of the node.
Change a position of the node either in a relative manner or an absolute manner. Values are clamped to stay in the range of values.
- set_property(property_name: str, property_value: Any) None ¶
Convenient setter to change value of a property of the node.
Specification of Interface¶
- class pipeline_manager.dataflow_builder.entities.Interface(name: str, direction: ~pipeline_manager.dataflow_builder.entities.Direction, side: ~pipeline_manager.dataflow_builder.entities.Side | None = None, side_position: int | None = None, external_name: str | None = None, id: str = '74d47adb-8d56-4283-8ea5-61838f02ece7', type: ~typing.List[str] = <factory>)¶
Representation of a node’s interface.
Specification of Connection¶
Notice that name of the class representing connection is InterfaceConnection
, not Connection
.
-
class pipeline_manager.dataflow_builder.entities.InterfaceConnection(from_interface: Interface, to_interface: Interface, anchors: list[Vector2] | None =
None
, id: str ='9028b6e4-9c1d-4745-b785-1277142bba79'
)¶ Representation of a connection between two interfaces in a dataflow graph.
Specification of Property¶
-
class pipeline_manager.dataflow_builder.entities.Property(name: str, value: Any, id: str =
'3e3cba6f-c997-4457-84f5-efaa9494fc5b'
)¶ A property of a node.