PipelineGraph

class lsst.pipe.base.pipeline_graph.PipelineGraph(*, description: str = '', universe: DimensionUniverse | None = None, data_id: DataCoordinate | Mapping[str, Any] | None = None)

Bases: object

A graph representation of fully-configured pipeline.

PipelineGraph instances are typically constructed by calling Pipeline.to_graph, but in rare cases constructing and then populating an empty one may be preferable.

Parameters:
descriptionstr, optional

String description for this pipeline.

universelsst.daf.butler.DimensionUniverse, optional

Definitions for all butler dimensions. If not provided, some attributes will not be available until resolve is called.

data_idlsst.daf.butler.DataCoordinate or other data ID, optional

Data ID that represents a constraint on all quanta generated by this pipeline. This typically just holds the instrument constraint included in the pipeline definition, if there was one.

Attributes Summary

data_id

Data ID that represents a constraint on all quanta generated from this pipeline.

dataset_types

A mapping view of the dataset types in the graph.

description

String description for this pipeline.

has_been_sorted

Whether this graph's tasks and dataset types have been topologically sorted (with unspecified but deterministic tiebreakers) since the last modification to the graph.

is_fully_resolved

Whether all of this graph's nodes are resolved.

is_sorted

Whether this graph's tasks and dataset types are topologically sorted with the exact same deterministic tiebreakers that sort would apply.

task_subsets

A mapping of all labeled subsets of tasks.

tasks

A mapping view of the tasks in the graph.

universe

Definitions for all butler dimensions.

Methods Summary

add_task(label, task_class[, config, ...])

Add a new task to the graph.

add_task_nodes(nodes[, parent])

Add one or more existing task nodes to the graph.

add_task_subset(subset_label, task_labels[, ...])

Add a label for a set of tasks that are already in the pipeline.

consumers_of(dataset_type_name)

Return the TaskNode and/or TaskInitNode objects that read the given dataset type.

consuming_edges_of(dataset_type_name)

Return the ReadEdge objects that link the named dataset type to the tasks that consume it.

copy()

Return a copy of this graph that copies all mutable state.

diff_tasks(other)

Compare two pipeline graphs.

group_by_dimensions([prerequisites])

Group this graph's tasks and dataset types by their dimensions.

inputs_of(task_label[, init])

Return the dataset types that are inputs to a task.

iter_edges([init])

Iterate over edges in the graph.

iter_nodes()

Iterate over nodes in the graph.

iter_overall_inputs()

Iterate over all of the dataset types that are consumed but not produced by the graph.

make_bipartite_xgraph([init])

Return a bipartite networkx representation of just the runtime or init-time pipeline graph.

make_dataset_type_xgraph([init])

Return a networkx representation of just the dataset types in the pipeline.

make_task_xgraph([init])

Return a networkx representation of just the tasks in the pipeline.

make_xgraph()

Export a networkx representation of the full pipeline graph, including both init and runtime edges.

outputs_of(task_label[, init, ...])

Return the dataset types that are outputs of a task.

producer_of(dataset_type_name)

Return the TaskNode or TaskInitNode that writes the given dataset type.

producing_edge_of(dataset_type_name)

Return the WriteEdge that links the producing task to the named dataset type.

reconfigure_tasks(*args[, ...])

Update the configuration for one or more tasks.

remove_task_subset(subset_label)

Remove a labeled set of tasks.

remove_tasks(labels[, drop_from_subsets])

Remove one or more tasks from the graph.

resolve([registry, dimensions, dataset_types])

Resolve all dimensions and dataset types and check them for consistency.

sort()

Sort this graph's nodes topologically with deterministic (but unspecified) tiebreakers.

split_independent()

Iterate over independent subgraphs that together comprise this pipeline graph.

Attributes Documentation

data_id

Data ID that represents a constraint on all quanta generated from this pipeline.

This is may not be available unless universe is not None.

dataset_types

A mapping view of the dataset types in the graph.

This mapping has str parent dataset type name keys, but only provides access to its DatasetTypeNode values if resolve has been called since the last modification involving a task that uses a dataset type. See DatasetTypeMappingView for details.

description

String description for this pipeline.

has_been_sorted

Whether this graph’s tasks and dataset types have been topologically sorted (with unspecified but deterministic tiebreakers) since the last modification to the graph.

This may return False if the graph happens to be sorted but sort was never called, but it is potentially much faster than is_sorted, which may attempt (and then discard) a full sort if has_been_sorted is False.

is_fully_resolved

Whether all of this graph’s nodes are resolved.

is_sorted

Whether this graph’s tasks and dataset types are topologically sorted with the exact same deterministic tiebreakers that sort would apply.

This may perform (and then discard) a full sort if has_been_sorted is False. If the goal is to obtain a sorted graph, it is better to just call sort without guarding that with an if not graph.is_sorted check.

task_subsets

A mapping of all labeled subsets of tasks.

Keys are subset labels, values are sets of task labels. See TaskSubset for more information.

Use add_task_subset to add a new subset. The subsets themselves may be modified in-place.

tasks

A mapping view of the tasks in the graph.

This mapping has str task label keys and TaskNode values. Iteration is topologically and deterministically ordered if and only if sort has been called since the last modification to the graph.

universe

Definitions for all butler dimensions.

Methods Documentation

add_task(label: str | None, task_class: type[PipelineTask], config: PipelineTaskConfig | None = None, connections: PipelineTaskConnections | None = None) TaskNode

Add a new task to the graph.

Parameters:
labelstr or None

Label for the task in the pipeline. If None, Task._DefaultName is used.

task_classtype [ PipelineTask ]

Class object for the task.

configPipelineTaskConfig, optional

Configuration for the task. If not provided, a default-constructed instance of task_class.ConfigClass is used.

connectionsPipelineTaskConnections, optional

Object that describes the dataset types used by the task. If not provided, one will be constructed from the given configuration. If provided, it is assumed that config has already been validated and frozen.

Returns:
nodeTaskNode

The new task node added to the graph.

Raises:
ValueError

Raised if configuration validation failed when constructing connections.

PipelineDataCycleError

Raised if the graph is cyclic after this addition.

RuntimeError

Raised if an unexpected exception (which will be chained) occurred at a stage that may have left the graph in an inconsistent state. Other exceptions should leave the graph unchanged.

Notes

Checks for dataset type consistency and multiple producers do not occur until resolve is called, since the resolution depends on both the state of the data repository and all contributing tasks.

Adding new tasks removes any existing resolutions of all dataset types it references and marks the graph as unsorted. It is most effiecient to add all tasks up front and only then resolve and/or sort the graph.

add_task_nodes(nodes: Iterable[TaskNode], parent: PipelineGraph | None = None) None

Add one or more existing task nodes to the graph.

Parameters:
nodesIterable [ TaskNode ]

Iterable of task nodes to add. If any tasks have resolved dimensions, they must have the same dimension universe as the rest of the graph.

parentPipelineGraph, optional

If provided, another PipelineGraph from which these nodes were obtained. Any dataset type nodes already present in parent that are referenced by the given tasks will be used in this graph if they are not already present, preserving any dataset type resolutions present in the parent graph. Adding nodes from a parent graph after the graph has its own nodes (e.g. from add_task) or nodes from a third graph may result in invalid dataset type resolutions. It is safest to only use this argument when populating an empty graph for the first time.

Raises:
PipelineDataCycleError

Raised if the graph is cyclic after this addition.

Notes

Checks for dataset type consistency and multiple producers do not occur until resolve is called, since the resolution depends on both the state of the data repository and all contributing tasks.

Adding new tasks removes any existing resolutions of all dataset types it references (unless parent is not None and marks the graph as unsorted. It is most efficient to add all tasks up front and only then resolve and/or sort the graph.

add_task_subset(subset_label: str, task_labels: Iterable[str], description: str = '') None

Add a label for a set of tasks that are already in the pipeline.

Parameters:
subset_labelstr

Label for this set of tasks.

task_labelsIterable [ str ]

Labels of the tasks to include in the set. All must already be included in the graph.

descriptionstr, optional

String description to associate with this label.

consumers_of(dataset_type_name: str) list[lsst.pipe.base.pipeline_graph._tasks.TaskNode | lsst.pipe.base.pipeline_graph._tasks.TaskInitNode]

Return the TaskNode and/or TaskInitNode objects that read the given dataset type.

Parameters:
dataset_type_namestr

Dataset type name. Must not be a component.

Returns:
edgeslist [ ReadEdge ]

Edges that connect this dataset type to the tasks that consume it.

Notes

On resolved graphs, it may be slightly more efficient to use:

graph.dataset_types[dataset_type_name].producing_edges

but this method works on graphs with unresolved dataset types as well.

consuming_edges_of(dataset_type_name: str) list[lsst.pipe.base.pipeline_graph._edges.ReadEdge]

Return the ReadEdge objects that link the named dataset type to the tasks that consume it.

Parameters:
dataset_type_namestr

Dataset type name. Must not be a component.

Returns:
edgeslist [ ReadEdge ]

Edges that connect this dataset type to the tasks that consume it.

Notes

On resolved graphs, it may be slightly more efficient to use:

graph.dataset_types[dataset_type_name].producing_edges

but this method works on graphs with unresolved dataset types as well.

copy() PipelineGraph

Return a copy of this graph that copies all mutable state.

diff_tasks(other: PipelineGraph) list[str]

Compare two pipeline graphs.

This only compares graph structure and task classes (including their edges). It does not compare full configuration (which is subject to spurious differences due to import-cache state), dataset type resolutions, or sort state.

Parameters:
otherPipelineGraph

Graph to compare to.

Returns:
differenceslist [ str ]

List of string messages describing differences between the pipelines. If empty, the graphs have the same tasks and connections.

group_by_dimensions(prerequisites: bool = False) dict[lsst.daf.butler.dimensions._group.DimensionGroup, tuple[dict[str, lsst.pipe.base.pipeline_graph._tasks.TaskNode], dict[str, lsst.pipe.base.pipeline_graph._dataset_types.DatasetTypeNode]]]

Group this graph’s tasks and dataset types by their dimensions.

Parameters:
prerequisitesbool, optional

If True, include prerequisite dataset types as well as regular input and output datasets (including intermediates).

Returns:
groupsdict [ DimensionGroup, tuple ]

A dictionary of groups keyed by DimensionGroup, in which each value is a tuple of:

that have those dimensions.

Notes

Init inputs and outputs are always included, but always have empty dimensions and are hence are all grouped together.

inputs_of(task_label: str, init: bool = False) dict[str, lsst.pipe.base.pipeline_graph._dataset_types.DatasetTypeNode | None]

Return the dataset types that are inputs to a task.

Parameters:
task_labelstr

Label for the task in the pipeline.

initbool, optional

If True, return init-input dataset types instead of runtime (including prerequisite) inputs.

Returns:
inputsdict [ str, DatasetTypeNode or None ]

Dictionary parent dataset type name keys and either DatasetTypeNode values (if the dataset type has been resolved) or None values.

Notes

To get the input edges of a task or task init node (which provide information about storage class overrides nd components) use:

graph.tasks[task_label].iter_all_inputs()

or

graph.tasks[task_label].init.iter_all_inputs()

or the various mapping attributes of the TaskNode and TaskInitNode class.

iter_edges(init: bool = False) Iterator[Edge]

Iterate over edges in the graph.

Parameters:
initbool, optional

If True (False is default) iterate over the edges between task initialization node and init input/output dataset types, instead of the runtime task nodes and regular input/output/prerequisite dataset types.

Returns:
edgesIterator [ Edge ]

A lazy iterator over Edge (WriteEdge or ReadEdge) instances.

Notes

This method always returns _either_ init edges or runtime edges, never both. The full (internal) graph that contains both also includes a special edge that connects each task init node to its runtime node; that is also never returned by this method, since it is never a part of the init-only or runtime-only subgraphs.

iter_nodes() DatasetTypeNode | None]]

Iterate over nodes in the graph.

Returns:
nodesIterator [ tuple ]

A lazy iterator over all of the nodes in the graph. Each yielded element is a tuple of:

iter_overall_inputs() Iterator[tuple[str, lsst.pipe.base.pipeline_graph._dataset_types.DatasetTypeNode | None]]

Iterate over all of the dataset types that are consumed but not produced by the graph.

Returns:
dataset_typesIterator [ tuple ]

A lazy iterator over the overall-input dataset types (including overall init inputs and prerequisites). Each yielded element is a tuple of:

  • the parent dataset type name;

  • the resolved DatasetTypeNode, or None if the dataset type has

  • not been resolved.

make_bipartite_xgraph(init: bool = False) MultiDiGraph

Return a bipartite networkx representation of just the runtime or init-time pipeline graph.

Parameters:
initbool, optional

If True (False is default) return the graph of task initialization nodes and init input/output dataset types, instead of the graph of runtime task nodes and regular input/output/prerequisite dataset types.

Returns:
xgraphnetworkx.MultiDiGraph

Directed acyclic graph with parallel edges.

Notes

The returned graph uses NodeKey instances for nodes. Parallel edges represent the same dataset type appearing in multiple connections for the same task, and are hence rare. The connection name is used as the edge key to disambiguate those parallel edges.

This graph is bipartite because each dataset type node only has edges that connect it to a task [init] node, and vice versa.

See TaskNode, TaskInitNode, DatasetTypeNode, ReadEdge, and WriteEdge for the descriptive node and edge attributes added.

make_dataset_type_xgraph(init: bool = False) DiGraph

Return a networkx representation of just the dataset types in the pipeline.

Parameters:
initbool, optional

If True (False is default) return the graph of init input and output dataset types, instead of the graph of runtime (input, output, prerequisite input) dataset types.

Returns:
xgraphnetworkx.DiGraph

Directed acyclic graph with no parallel edges.

Notes

The returned graph uses NodeKey instances for nodes. The tasks that link these tasks are not represented at all; edges have no attributes, and there are no parallel edges.

See DatasetTypeNode for the descriptive node and attributes added.

make_task_xgraph(init: bool = False) DiGraph

Return a networkx representation of just the tasks in the pipeline.

Parameters:
initbool, optional

If True (False is default) return the graph of task initialization nodes, instead of the graph of runtime task nodes.

Returns:
xgraphnetworkx.DiGraph

Directed acyclic graph with no parallel edges.

Notes

The returned graph uses NodeKey instances for nodes. The dataset types that link these tasks are not represented at all; edges have no attributes, and there are no parallel edges.

See TaskNode and TaskInitNode for the descriptive node and attributes added.

make_xgraph() MultiDiGraph

Export a networkx representation of the full pipeline graph, including both init and runtime edges.

Returns:
xgraphnetworkx.MultiDiGraph

Directed acyclic graph with parallel edges.

Notes

The returned graph uses NodeKey instances for nodes. Parallel edges represent the same dataset type appearing in multiple connections for the same task, and are hence rare. The connection name is used as the edge key to disambiguate those parallel edges.

Almost all edges connect dataset type nodes to task or task init nodes or vice versa, but there is also a special edge that connects each task init node to its runtime node. The existence of these edges makes the graph not quite bipartite, though its init-only and runtime-only subgraphs are bipartite.

See TaskNode, TaskInitNode, DatasetTypeNode, ReadEdge, and WriteEdge for the descriptive node and edge attributes added.

outputs_of(task_label: str, init: bool = False, include_automatic_connections: bool = True) dict[str, lsst.pipe.base.pipeline_graph._dataset_types.DatasetTypeNode | None]

Return the dataset types that are outputs of a task.

Parameters:
task_labelstr

Label for the task in the pipeline.

initbool, optional

If True, return init-output dataset types instead of runtime outputs.

include_automatic_connectionsbool, optional

Whether to include automatic connections such as configs, metadata, and logs.

Returns:
outputsdict [ str, DatasetTypeNode or None ]

Dictionary parent dataset type name keys and either DatasetTypeNode values (if the dataset type has been resolved) or None values.

Notes

To get the input edges of a task or task init node (which provide information about storage class overrides nd components) use:

graph.tasks[task_label].iter_all_outputs()

or

graph.tasks[task_label].init.iter_all_outputs()

or the various mapping attributes of the TaskNode and TaskInitNode class.

producer_of(dataset_type_name: str) TaskNode | TaskInitNode | None

Return the TaskNode or TaskInitNode that writes the given dataset type.

Parameters:
dataset_type_namestr

Dataset type name. Must not be a component.

Returns:
edgeTaskNode, TaskInitNode, or None

Producing node or None if there isn’t one in this graph.

Raises:
DuplicateOutputError

Raised if there are multiple tasks defined to produce this dataset type. This is only possible if the graph’s dataset types are not resolved.

producing_edge_of(dataset_type_name: str) WriteEdge | None

Return the WriteEdge that links the producing task to the named dataset type.

Parameters:
dataset_type_namestr

Dataset type name. Must not be a component.

Returns:
edgeWriteEdge or None

Producing edge or None if there isn’t one in this graph.

Raises:
DuplicateOutputError

Raised if there are multiple tasks defined to produce this dataset type. This is only possible if the graph’s dataset types are not resolved.

Notes

On resolved graphs, it may be slightly more efficient to use:

graph.dataset_types[dataset_type_name].producing_edge

but this method works on graphs with unresolved dataset types as well.

reconfigure_tasks(*args: tuple[str, PipelineTaskConfig], check_edges_unchanged: bool = False, assume_edges_unchanged: bool = False, **kwargs: PipelineTaskConfig) None

Update the configuration for one or more tasks.

Parameters:
*argstuple [ str, PipelineTaskConfig ]

Positional arguments are each a 2-tuple of task label and new config object. Note that the same arguments may also be passed as **kwargs, which is usually more readable, but task labels in *args are not required to be valid Python identifiers.

check_edges_unchangedbool, optional

If True, require the edges (connections) of the modified tasks to remain unchanged after the configuration updates, and verify that this is the case.

assume_edges_unchangedbool, optional

If True, the caller declares that the edges (connections) of the modified tasks will remain unchanged after the configuration updates, and that it is unnecessary to check this.

**kwargsPipelineTaskConfig

New config objects or overrides to apply to copies of the current config objects, with task labels as the keywords.

Raises:
ValueError

Raised if assume_edges_unchanged and check_edges_unchanged are both True, or if the same task appears twice.

EdgesChangedError

Raised if check_edges_unchanged=True and the edges of a task do change.

Notes

If reconfiguring a task causes its edges to change, any dataset type nodes connected to that task (not just those whose edges have changed!) will be unresolved.

remove_task_subset(subset_label: str) None

Remove a labeled set of tasks.

Parameters:
subset_labelstr

Label for this set of tasks.

remove_tasks(labels: Iterable[str], drop_from_subsets: bool = True) list[tuple[lsst.pipe.base.pipeline_graph._tasks.TaskNode, set[str]]]

Remove one or more tasks from the graph.

Parameters:
labelsIterable [ str ]

Iterable of the labels of the tasks to remove.

drop_from_subsetsbool, optional

If True, drop each removed task from any subset in which it currently appears. If False, raise PipelineGraphError if any such subsets exist.

Returns:
nodes_and_subsetslist [ tuple [ TaskNode, set [ str ] ] ]

List of nodes removed and the labels of task subsets that referenced them.

Raises:
PipelineGraphError

Raised if drop_from_subsets is False and the task is still part of one or more subsets.

Notes

Removing a task will cause dataset nodes with no other referencing tasks to be removed. Any other dataset type nodes referenced by a removed task will be reset to an “unresolved” state.

resolve(registry: Registry | None = None, dimensions: DimensionUniverse | None = None, dataset_types: Mapping[str, DatasetType] | None = None) None

Resolve all dimensions and dataset types and check them for consistency.

Resolving a graph also causes it to be sorted.

Parameters:
registrylsst.daf.butler.Registry, optional

Client for the data repository to resolve against. If not provided, both dimensions and dataset_types must be.

dimensionslsst.daf.butler.DimensionUniverse, optional

Definitions for all dimensions.

dataset_typesMapping [ str, DatasetType ], optional

Mapping of dataset types to consider registered.

Raises:
ConnectionTypeConsistencyError

Raised if a prerequisite input for one task appears as a different kind of connection in any other task.

DuplicateOutputError

Raised if multiple tasks have the same dataset type as an output.

IncompatibleDatasetTypeError

Raised if different tasks have different definitions of a dataset type. Different but compatible storage classes are permitted.

MissingDatasetTypeError

Raised if a dataset type definition is required to exist in the data repository but none was found. This should only occur for dataset types that are not produced by a task in the pipeline and are consumed with different storage classes or as components by tasks in the pipeline.

EdgesChangedError

Raised if check_edges_unchanged=True and the edges of a task do change after import and reconfiguration.

Notes

The universe attribute is set to dimensions and used to set all TaskNode.dimensions attributes. Dataset type nodes are resolved by first looking for a registry definition, then using the producing task’s definition, then looking for consistency between all consuming task definitions.

sort() None

Sort this graph’s nodes topologically with deterministic (but unspecified) tiebreakers.

This does nothing if the graph is already known to be sorted.

split_independent() Iterable[PipelineGraph]

Iterate over independent subgraphs that together comprise this pipeline graph.

Returns:
subgraphsIterable [ PipelineGraph ]

An iterable over component subgraphs that could be run independently (they have only overall inputs in common). May be a lazy iterator.

Notes

All resolved dataset type nodes will be preserved.

If there is only one component, self may be returned as the only element in the iterable.

If has_been_sorted, all subgraphs will be sorted as well.