PipelineGraph¶
- class lsst.pipe.base.pipeline_graph.PipelineGraph(*, description: str = '', universe: DimensionUniverse | None = None, data_id: DataCoordinate | Mapping[str, Any] | None = None)¶
Bases:
object
A graph representation of fully-configured pipeline.
PipelineGraph
instances are typically constructed by callingPipeline.to_graph
, but in rare cases constructing and then populating an empty one may be preferable.- Parameters:
- description
str
, optional String description for this pipeline.
- universe
lsst.daf.butler.DimensionUniverse
, optional Definitions for all butler dimensions. If not provided, some attributes will not be available until
resolve
is called.- data_id
lsst.daf.butler.DataCoordinate
or other data ID, optional Data ID that represents a constraint on all quanta generated by this pipeline. This typically just holds the instrument constraint included in the pipeline definition, if there was one.
- description
Attributes Summary
Data ID that represents a constraint on all quanta generated from this pipeline.
A mapping view of the dataset types in the graph.
String description for this pipeline.
Whether this graph's tasks and dataset types have been topologically sorted (with unspecified but deterministic tiebreakers) since the last modification to the graph.
Whether all of this graph's nodes are resolved.
Whether this graph's tasks and dataset types are topologically sorted with the exact same deterministic tiebreakers that
sort
would apply.A mapping of all labeled subsets of tasks.
A mapping view of the tasks in the graph.
Definitions for all butler dimensions.
Methods Summary
add_task
(label, task_class[, config, ...])Add a new task to the graph.
add_task_nodes
(nodes[, parent])Add one or more existing task nodes to the graph.
add_task_subset
(subset_label, task_labels[, ...])Add a label for a set of tasks that are already in the pipeline.
consumers_of
(dataset_type_name)Return the
TaskNode
and/orTaskInitNode
objects that read the given dataset type.consuming_edges_of
(dataset_type_name)Return the
ReadEdge
objects that link the named dataset type to the tasks that consume it.copy
()Return a copy of this graph that copies all mutable state.
diff_tasks
(other)Compare two pipeline graphs.
group_by_dimensions
([prerequisites])Group this graph's tasks and dataset types by their dimensions.
inputs_of
(task_label[, init])Return the dataset types that are inputs to a task.
iter_edges
([init])Iterate over edges in the graph.
Iterate over nodes in the graph.
Iterate over all of the dataset types that are consumed but not produced by the graph.
make_bipartite_xgraph
([init])Return a bipartite networkx representation of just the runtime or init-time pipeline graph.
make_dataset_type_xgraph
([init])Return a networkx representation of just the dataset types in the pipeline.
make_task_xgraph
([init])Return a networkx representation of just the tasks in the pipeline.
Export a networkx representation of the full pipeline graph, including both init and runtime edges.
outputs_of
(task_label[, init, ...])Return the dataset types that are outputs of a task.
producer_of
(dataset_type_name)Return the
TaskNode
orTaskInitNode
that writes the given dataset type.producing_edge_of
(dataset_type_name)Return the
WriteEdge
that links the producing task to the named dataset type.reconfigure_tasks
(*args[, ...])Update the configuration for one or more tasks.
remove_task_subset
(subset_label)Remove a labeled set of tasks.
remove_tasks
(labels[, drop_from_subsets])Remove one or more tasks from the graph.
resolve
([registry, dimensions, dataset_types])Resolve all dimensions and dataset types and check them for consistency.
sort
()Sort this graph's nodes topologically with deterministic (but unspecified) tiebreakers.
Iterate over independent subgraphs that together comprise this pipeline graph.
Attributes Documentation
- data_id¶
Data ID that represents a constraint on all quanta generated from this pipeline.
- dataset_types¶
A mapping view of the dataset types in the graph.
This mapping has
str
parent dataset type name keys, but only provides access to itsDatasetTypeNode
values ifresolve
has been called since the last modification involving a task that uses a dataset type. SeeDatasetTypeMappingView
for details.
- description¶
String description for this pipeline.
- has_been_sorted¶
Whether this graph’s tasks and dataset types have been topologically sorted (with unspecified but deterministic tiebreakers) since the last modification to the graph.
This may return
False
if the graph happens to be sorted butsort
was never called, but it is potentially much faster thanis_sorted
, which may attempt (and then discard) a full sort ifhas_been_sorted
isFalse
.
- is_fully_resolved¶
Whether all of this graph’s nodes are resolved.
- is_sorted¶
Whether this graph’s tasks and dataset types are topologically sorted with the exact same deterministic tiebreakers that
sort
would apply.This may perform (and then discard) a full sort if
has_been_sorted
isFalse
. If the goal is to obtain a sorted graph, it is better to just callsort
without guarding that with anif not graph.is_sorted
check.
- task_subsets¶
A mapping of all labeled subsets of tasks.
Keys are subset labels, values are sets of task labels. See
TaskSubset
for more information.Use
add_task_subset
to add a new subset. The subsets themselves may be modified in-place.
- tasks¶
A mapping view of the tasks in the graph.
This mapping has
str
task label keys andTaskNode
values. Iteration is topologically and deterministically ordered if and only ifsort
has been called since the last modification to the graph.
- universe¶
Definitions for all butler dimensions.
Methods Documentation
- add_task(label: str | None, task_class: type[PipelineTask], config: PipelineTaskConfig | None = None, connections: PipelineTaskConnections | None = None) TaskNode ¶
Add a new task to the graph.
- Parameters:
- label
str
orNone
Label for the task in the pipeline. If
None
,Task._DefaultName
is used.- task_class
type
[PipelineTask
] Class object for the task.
- config
PipelineTaskConfig
, optional Configuration for the task. If not provided, a default-constructed instance of
task_class.ConfigClass
is used.- connections
PipelineTaskConnections
, optional Object that describes the dataset types used by the task. If not provided, one will be constructed from the given configuration. If provided, it is assumed that
config
has already been validated and frozen.
- label
- Returns:
- node
TaskNode
The new task node added to the graph.
- node
- Raises:
- ValueError
Raised if configuration validation failed when constructing
connections
.- PipelineDataCycleError
Raised if the graph is cyclic after this addition.
- RuntimeError
Raised if an unexpected exception (which will be chained) occurred at a stage that may have left the graph in an inconsistent state. Other exceptions should leave the graph unchanged.
Notes
Checks for dataset type consistency and multiple producers do not occur until
resolve
is called, since the resolution depends on both the state of the data repository and all contributing tasks.Adding new tasks removes any existing resolutions of all dataset types it references and marks the graph as unsorted. It is most effiecient to add all tasks up front and only then resolve and/or sort the graph.
- add_task_nodes(nodes: Iterable[TaskNode], parent: PipelineGraph | None = None) None ¶
Add one or more existing task nodes to the graph.
- Parameters:
- nodes
Iterable
[TaskNode
] Iterable of task nodes to add. If any tasks have resolved dimensions, they must have the same dimension universe as the rest of the graph.
- parent
PipelineGraph
, optional If provided, another
PipelineGraph
from which these nodes were obtained. Any dataset type nodes already present inparent
that are referenced by the given tasks will be used in this graph if they are not already present, preserving any dataset type resolutions present in the parent graph. Adding nodes from a parent graph after the graph has its own nodes (e.g. fromadd_task
) or nodes from a third graph may result in invalid dataset type resolutions. It is safest to only use this argument when populating an empty graph for the first time.
- nodes
- Raises:
- PipelineDataCycleError
Raised if the graph is cyclic after this addition.
Notes
Checks for dataset type consistency and multiple producers do not occur until
resolve
is called, since the resolution depends on both the state of the data repository and all contributing tasks.Adding new tasks removes any existing resolutions of all dataset types it references (unless
parent is not None
and marks the graph as unsorted. It is most efficient to add all tasks up front and only then resolve and/or sort the graph.
- add_task_subset(subset_label: str, task_labels: Iterable[str], description: str = '') None ¶
Add a label for a set of tasks that are already in the pipeline.
- consumers_of(dataset_type_name: str) list[lsst.pipe.base.pipeline_graph._tasks.TaskNode | lsst.pipe.base.pipeline_graph._tasks.TaskInitNode] ¶
Return the
TaskNode
and/orTaskInitNode
objects that read the given dataset type.- Parameters:
- dataset_type_name
str
Dataset type name. Must not be a component.
- dataset_type_name
- Returns:
Notes
On resolved graphs, it may be slightly more efficient to use:
graph.dataset_types[dataset_type_name].producing_edges
but this method works on graphs with unresolved dataset types as well.
- consuming_edges_of(dataset_type_name: str) list[lsst.pipe.base.pipeline_graph._edges.ReadEdge] ¶
Return the
ReadEdge
objects that link the named dataset type to the tasks that consume it.- Parameters:
- dataset_type_name
str
Dataset type name. Must not be a component.
- dataset_type_name
- Returns:
Notes
On resolved graphs, it may be slightly more efficient to use:
graph.dataset_types[dataset_type_name].producing_edges
but this method works on graphs with unresolved dataset types as well.
- copy() PipelineGraph ¶
Return a copy of this graph that copies all mutable state.
- diff_tasks(other: PipelineGraph) list[str] ¶
Compare two pipeline graphs.
This only compares graph structure and task classes (including their edges). It does not compare full configuration (which is subject to spurious differences due to import-cache state), dataset type resolutions, or sort state.
- Parameters:
- other
PipelineGraph
Graph to compare to.
- other
- Returns:
- group_by_dimensions(prerequisites: bool = False) dict[lsst.daf.butler.dimensions._group.DimensionGroup, tuple[dict[str, lsst.pipe.base.pipeline_graph._tasks.TaskNode], dict[str, lsst.pipe.base.pipeline_graph._dataset_types.DatasetTypeNode]]] ¶
Group this graph’s tasks and dataset types by their dimensions.
- Parameters:
- Returns:
Notes
Init inputs and outputs are always included, but always have empty dimensions and are hence are all grouped together.
- inputs_of(task_label: str, init: bool = False) dict[str, lsst.pipe.base.pipeline_graph._dataset_types.DatasetTypeNode | None] ¶
Return the dataset types that are inputs to a task.
- Parameters:
- Returns:
- inputs
dict
[str
,DatasetTypeNode
orNone
] Dictionary parent dataset type name keys and either
DatasetTypeNode
values (if the dataset type has been resolved) orNone
values.
- inputs
Notes
To get the input edges of a task or task init node (which provide information about storage class overrides nd components) use:
graph.tasks[task_label].iter_all_inputs()
or
graph.tasks[task_label].init.iter_all_inputs()
or the various mapping attributes of the
TaskNode
andTaskInitNode
class.
- iter_edges(init: bool = False) Iterator[Edge] ¶
Iterate over edges in the graph.
- Parameters:
- Returns:
Notes
This method always returns _either_ init edges or runtime edges, never both. The full (internal) graph that contains both also includes a special edge that connects each task init node to its runtime node; that is also never returned by this method, since it is never a part of the init-only or runtime-only subgraphs.
- iter_nodes() DatasetTypeNode | None]] ¶
Iterate over nodes in the graph.
- Returns:
- nodes
Iterator
[tuple
] A lazy iterator over all of the nodes in the graph. Each yielded element is a tuple of:
the node type enum value (
NodeType
);the string name for the node (task label or parent dataset type name);
the node value (
TaskNode
,TaskInitNode
,DatasetTypeNode
, orNone
for dataset type nodes that have not been resolved).
- nodes
- iter_overall_inputs() Iterator[tuple[str, lsst.pipe.base.pipeline_graph._dataset_types.DatasetTypeNode | None]] ¶
Iterate over all of the dataset types that are consumed but not produced by the graph.
- Returns:
- dataset_types
Iterator
[tuple
] A lazy iterator over the overall-input dataset types (including overall init inputs and prerequisites). Each yielded element is a tuple of:
the parent dataset type name;
the resolved
DatasetTypeNode
, orNone
if the dataset type hasnot been resolved.
- dataset_types
- make_bipartite_xgraph(init: bool = False) MultiDiGraph ¶
Return a bipartite networkx representation of just the runtime or init-time pipeline graph.
- Parameters:
- Returns:
- xgraph
networkx.MultiDiGraph
Directed acyclic graph with parallel edges.
- xgraph
Notes
The returned graph uses
NodeKey
instances for nodes. Parallel edges represent the same dataset type appearing in multiple connections for the same task, and are hence rare. The connection name is used as the edge key to disambiguate those parallel edges.This graph is bipartite because each dataset type node only has edges that connect it to a task [init] node, and vice versa.
See
TaskNode
,TaskInitNode
,DatasetTypeNode
,ReadEdge
, andWriteEdge
for the descriptive node and edge attributes added.
- make_dataset_type_xgraph(init: bool = False) DiGraph ¶
Return a networkx representation of just the dataset types in the pipeline.
- Parameters:
- Returns:
- xgraph
networkx.DiGraph
Directed acyclic graph with no parallel edges.
- xgraph
Notes
The returned graph uses
NodeKey
instances for nodes. The tasks that link these tasks are not represented at all; edges have no attributes, and there are no parallel edges.See
DatasetTypeNode
for the descriptive node and attributes added.
- make_task_xgraph(init: bool = False) DiGraph ¶
Return a networkx representation of just the tasks in the pipeline.
- Parameters:
- Returns:
- xgraph
networkx.DiGraph
Directed acyclic graph with no parallel edges.
- xgraph
Notes
The returned graph uses
NodeKey
instances for nodes. The dataset types that link these tasks are not represented at all; edges have no attributes, and there are no parallel edges.See
TaskNode
andTaskInitNode
for the descriptive node and attributes added.
- make_xgraph() MultiDiGraph ¶
Export a networkx representation of the full pipeline graph, including both init and runtime edges.
- Returns:
- xgraph
networkx.MultiDiGraph
Directed acyclic graph with parallel edges.
- xgraph
Notes
The returned graph uses
NodeKey
instances for nodes. Parallel edges represent the same dataset type appearing in multiple connections for the same task, and are hence rare. The connection name is used as the edge key to disambiguate those parallel edges.Almost all edges connect dataset type nodes to task or task init nodes or vice versa, but there is also a special edge that connects each task init node to its runtime node. The existence of these edges makes the graph not quite bipartite, though its init-only and runtime-only subgraphs are bipartite.
See
TaskNode
,TaskInitNode
,DatasetTypeNode
,ReadEdge
, andWriteEdge
for the descriptive node and edge attributes added.
- outputs_of(task_label: str, init: bool = False, include_automatic_connections: bool = True) dict[str, lsst.pipe.base.pipeline_graph._dataset_types.DatasetTypeNode | None] ¶
Return the dataset types that are outputs of a task.
- Parameters:
- Returns:
- outputs
dict
[str
,DatasetTypeNode
orNone
] Dictionary parent dataset type name keys and either
DatasetTypeNode
values (if the dataset type has been resolved) orNone
values.
- outputs
Notes
To get the input edges of a task or task init node (which provide information about storage class overrides nd components) use:
graph.tasks[task_label].iter_all_outputs()
or
graph.tasks[task_label].init.iter_all_outputs()
or the various mapping attributes of the
TaskNode
andTaskInitNode
class.
- producer_of(dataset_type_name: str) TaskNode | TaskInitNode | None ¶
Return the
TaskNode
orTaskInitNode
that writes the given dataset type.- Parameters:
- dataset_type_name
str
Dataset type name. Must not be a component.
- dataset_type_name
- Returns:
- edge
TaskNode
,TaskInitNode
, orNone
Producing node or
None
if there isn’t one in this graph.
- edge
- Raises:
- DuplicateOutputError
Raised if there are multiple tasks defined to produce this dataset type. This is only possible if the graph’s dataset types are not resolved.
- producing_edge_of(dataset_type_name: str) WriteEdge | None ¶
Return the
WriteEdge
that links the producing task to the named dataset type.- Parameters:
- dataset_type_name
str
Dataset type name. Must not be a component.
- dataset_type_name
- Returns:
- Raises:
- DuplicateOutputError
Raised if there are multiple tasks defined to produce this dataset type. This is only possible if the graph’s dataset types are not resolved.
Notes
On resolved graphs, it may be slightly more efficient to use:
graph.dataset_types[dataset_type_name].producing_edge
but this method works on graphs with unresolved dataset types as well.
- reconfigure_tasks(*args: tuple[str, PipelineTaskConfig], check_edges_unchanged: bool = False, assume_edges_unchanged: bool = False, **kwargs: PipelineTaskConfig) None ¶
Update the configuration for one or more tasks.
- Parameters:
- *args
tuple
[str
,PipelineTaskConfig
] Positional arguments are each a 2-tuple of task label and new config object. Note that the same arguments may also be passed as
**kwargs
, which is usually more readable, but task labels in*args
are not required to be valid Python identifiers.- check_edges_unchanged
bool
, optional If
True
, require the edges (connections) of the modified tasks to remain unchanged after the configuration updates, and verify that this is the case.- assume_edges_unchanged
bool
, optional If
True
, the caller declares that the edges (connections) of the modified tasks will remain unchanged after the configuration updates, and that it is unnecessary to check this.- **kwargs
PipelineTaskConfig
New config objects or overrides to apply to copies of the current config objects, with task labels as the keywords.
- *args
- Raises:
- ValueError
Raised if
assume_edges_unchanged
andcheck_edges_unchanged
are bothTrue
, or if the same task appears twice.- EdgesChangedError
Raised if
check_edges_unchanged=True
and the edges of a task do change.
Notes
If reconfiguring a task causes its edges to change, any dataset type nodes connected to that task (not just those whose edges have changed!) will be unresolved.
- remove_task_subset(subset_label: str) None ¶
Remove a labeled set of tasks.
- Parameters:
- subset_label
str
Label for this set of tasks.
- subset_label
- remove_tasks(labels: Iterable[str], drop_from_subsets: bool = True) list[tuple[lsst.pipe.base.pipeline_graph._tasks.TaskNode, set[str]]] ¶
Remove one or more tasks from the graph.
- Parameters:
- Returns:
- Raises:
- PipelineGraphError
Raised if
drop_from_subsets
isFalse
and the task is still part of one or more subsets.
Notes
Removing a task will cause dataset nodes with no other referencing tasks to be removed. Any other dataset type nodes referenced by a removed task will be reset to an “unresolved” state.
- resolve(registry: Registry | None = None, dimensions: DimensionUniverse | None = None, dataset_types: Mapping[str, DatasetType] | None = None) None ¶
Resolve all dimensions and dataset types and check them for consistency.
Resolving a graph also causes it to be sorted.
- Parameters:
- registry
lsst.daf.butler.Registry
, optional Client for the data repository to resolve against. If not provided, both
dimensions
anddataset_types
must be.- dimensions
lsst.daf.butler.DimensionUniverse
, optional Definitions for all dimensions.
- dataset_types
Mapping
[str
,DatasetType
], optional Mapping of dataset types to consider registered.
- registry
- Raises:
- ConnectionTypeConsistencyError
Raised if a prerequisite input for one task appears as a different kind of connection in any other task.
- DuplicateOutputError
Raised if multiple tasks have the same dataset type as an output.
- IncompatibleDatasetTypeError
Raised if different tasks have different definitions of a dataset type. Different but compatible storage classes are permitted.
- MissingDatasetTypeError
Raised if a dataset type definition is required to exist in the data repository but none was found. This should only occur for dataset types that are not produced by a task in the pipeline and are consumed with different storage classes or as components by tasks in the pipeline.
- EdgesChangedError
Raised if
check_edges_unchanged=True
and the edges of a task do change after import and reconfiguration.
Notes
The
universe
attribute is set todimensions
and used to set allTaskNode.dimensions
attributes. Dataset type nodes are resolved by first looking for a registry definition, then using the producing task’s definition, then looking for consistency between all consuming task definitions.
- sort() None ¶
Sort this graph’s nodes topologically with deterministic (but unspecified) tiebreakers.
This does nothing if the graph is already known to be sorted.
- split_independent() Iterable[PipelineGraph] ¶
Iterate over independent subgraphs that together comprise this pipeline graph.
- Returns:
- subgraphs
Iterable
[PipelineGraph
] An iterable over component subgraphs that could be run independently (they have only overall inputs in common). May be a lazy iterator.
- subgraphs
Notes
All resolved dataset type nodes will be preserved.
If there is only one component,
self
may be returned as the only element in the iterable.If
has_been_sorted
, all subgraphs will be sorted as well.