QuantumGraph¶
- class lsst.pipe.base.QuantumGraph(quanta: Mapping[TaskDef, set[lsst.daf.butler._quantum.Quantum]], metadata: Mapping[str, Any] | None = None, universe: DimensionUniverse | None = None, initInputs: Mapping[TaskDef, Iterable[DatasetRef]] | None = None, initOutputs: Mapping[TaskDef, Iterable[DatasetRef]] | None = None, globalInitOutputs: Iterable[DatasetRef] | None = None, registryDatasetTypes: Iterable[DatasetType] | None = None)¶
Bases:
object
QuantumGraph is a directed acyclic graph of
QuantumNode
objects.This data structure represents a concrete workflow generated from a
Pipeline
.- Parameters:
- quanta
Mapping
[TaskDef
,set
[Quantum
] ] This maps tasks (and their configs) to the sets of data they are to process.
- metadataOptional
Mapping
ofstr
to primitives This is an optional parameter of extra data to carry with the graph. Entries in this mapping should be able to be serialized in JSON.
- universe
DimensionUniverse
, optional The dimensions in which quanta can be defined. Need only be provided if no quanta have data IDs.
- initInputs
Mapping
, optional Maps tasks to their InitInput dataset refs. Dataset refs can be either resolved or non-resolved. Presently the same dataset refs are included in each
Quantum
for the same task.- initOutputs
Mapping
, optional Maps tasks to their InitOutput dataset refs. Dataset refs can be either resolved or non-resolved. For intermediate resolved refs their dataset ID must match
initInputs
and QuantuminitInputs
.- globalInitOutputsiterable [
DatasetRef
], optional Dataset refs for some global objects produced by pipeline. These objects include task configurations and package versions. Typically they have an empty DataId, but there is no real restriction on what can appear here.
- registryDatasetTypesiterable [
DatasetType
], optional Dataset types which are used by this graph, their definitions must match registry. If registry does not define dataset type yet, then it should match one that will be created later.
- quanta
- Raises:
- ValueError
Raised if the graph is pruned such that some tasks no longer have nodes associated with them.
Attributes Summary
All the data set type names that are present in the graph (
tuple
[str
]).A graph representing the relations between all the
QuantumNode
objects (networkx.DiGraph
).The ID generated by the graph at construction time (
str
).The nodes that are inputs to the graph (iterable [
QuantumNode
]).Whether all of the nodes in the graph are connected, ignoring directionality of connections (
bool
).The nodes that are outputs of the graph (iterable [
QuantumNode
]).A graph representation of the tasks and dataset types in the quantum graph.
A graph representing the relations between the tasks inside the quantum graph (
networkx.DiGraph
).Dimension universe associated with this graph (
DimensionUniverse
).Methods Summary
Create a header that would be used in a save of this object and prints it out to standard out.
checkQuantumInGraph
(quantum)Check if specified quantum appears in the graph as part of a node.
Return a graph of the specified node and all the ancestor nodes directly reachable by walking edges.
Return a graph of
QuantumNode
that are direct inputs and outputs of a specified node.Return a set of
QuantumNode
that are direct inputs to a specified node.Return a set of
QuantumNode
that are direct outputs of a specified node.Check a graph for the presense of cycles and returns the edges of any cycles found, or an empty list if there is no cycle.
findQuantaWithDSType
(datasetTypeName)Return all the
Quantum
that contain a specifiedDatasetTypeName
.findTaskDefByLabel
(label)Determine which
TaskDef
objects in this graph are associated with astr
representing a tasks label.findTaskDefByName
(taskName)Determine which
TaskDef
objects in this graph are associated with astr
representing a task name (looks at thetaskName
property ofTaskDef
objects).findTaskWithOutput
(datasetTypeName)Find all tasks that have the specified dataset type name as an output.
findTasksWithInput
(datasetTypeName)Find all tasks that have the specified dataset type name as an input.
getNodesForTask
(taskDef)Return all the
QuantumNode
s associated with aTaskDef
.getNumberOfQuantaForTask
(taskDef)getQuantaForTask
(taskDef)getQuantumNodeByNodeId
(nodeId)Lookup a
QuantumNode
from an id associated with the node.Create summary of graph.
get_init_input_refs
(task_label)Return the DatasetRefs for the given task's init inputs.
get_init_output_refs
(task_label)Return the DatasetRefs for the given task's init outputs.
get_task_quanta
(label)Return the quanta associated with the given task label.
Return DatasetRefs for global InitOutputs.
initInputRefs
(taskDef)Return DatasetRefs for a given task InitInputs.
initOutputRefs
(taskDef)Return DatasetRefs for a given task InitOutputs.
init_output_run
(butler[, existing])Initialize a new output RUN collection by writing init-output datasets (including configs and packages).
Iterate over the
taskGraph
attribute in topological order.load
(file[, universe, nodes, graphID, ...])Read
QuantumGraph
from a file that was made bysave
.loadUri
(uri[, universe, nodes, graphID, ...])Read
QuantumGraph
from a URI.make_init_qbb
(butler_config, *[, ...])Construct an quantum-backed butler suitable for reading and writing init input and init output datasets, respectively.
readHeader
(uri[, minimumVersion])Read the header of a
QuantumGraph
pointed to by the uri parameter and return it as a string.Return dataset types used by this graph, their definitions match dataset types from registry.
save
(file)Save QuantumGraph to a file.
saveUri
(uri)Save
QuantumGraph
to the specified URI.subset
(nodes)Create a new graph object that contains the subset of the nodes specified as input.
Generate a list of subgraphs where each is connected.
tasksWithDSType
(datasetTypeName)Find all tasks that are associated with the specified dataset type name.
updateRun
(run, *[, metadata_key, ...])Change output run and dataset ID for each output dataset.
writeDotGraph
(output)Write out the graph as a dot graph.
write_configs
(butler[, compare_existing])Write the config datasets for all tasks in the quantum graph.
write_init_outputs
(butler[, skip_existing])Write the init-output datasets for all tasks in the quantum graph.
write_packages
(butler[, compare_existing])Write the 'packages' dataset for the currently-active software versions.
Attributes Documentation
- allDatasetTypes¶
All the data set type names that are present in the graph (
tuple
[str
]).These types do not include global init-outputs.
- graph¶
A graph representing the relations between all the
QuantumNode
objects (networkx.DiGraph
).The graph should usually be iterated over, or passed to methods of this class, but sometimes direct access to the
networkx
object may be helpful.
- inputQuanta¶
The nodes that are inputs to the graph (iterable [
QuantumNode
]).These are the nodes that do not depend on any other nodes in the graph.
- isConnected¶
Whether all of the nodes in the graph are connected, ignoring directionality of connections (
bool
).
- metadata¶
Extra data carried with the graph (mapping [
str
] orNone
).The mapping is a dynamic view of this object’s metadata. Values should be able to be serialized in JSON.
- outputQuanta¶
The nodes that are outputs of the graph (iterable [
QuantumNode
]).These are the nodes that have no nodes that depend on them in the graph.
- pipeline_graph¶
A graph representation of the tasks and dataset types in the quantum graph.
- taskGraph¶
A graph representing the relations between the tasks inside the quantum graph (
networkx.DiGraph
).
- universe¶
Dimension universe associated with this graph (
DimensionUniverse
).
Methods Documentation
- buildAndPrintHeader() None ¶
Create a header that would be used in a save of this object and prints it out to standard out.
- checkQuantumInGraph(quantum: Quantum) bool ¶
Check if specified quantum appears in the graph as part of a node.
- Parameters:
- quantum
lsst.daf.butler.Quantum
The quantum to search for.
- quantum
- Returns:
- in_graph
bool
The result of searching for the quantum.
- in_graph
- determineAncestorsOfQuantumNode(node: QuantumNode) _T ¶
Return a graph of the specified node and all the ancestor nodes directly reachable by walking edges.
- Parameters:
- node
QuantumNode
The node for which all ancestors are to be determined.
- node
- Returns:
- ancestorsgraph of
QuantumNode
Graph of node and all of its ancestors.
- ancestorsgraph of
- determineConnectionsOfQuantumNode(node: QuantumNode) _T ¶
Return a graph of
QuantumNode
that are direct inputs and outputs of a specified node.- Parameters:
- node
QuantumNode
The node of the graph for which connected nodes are to be determined.
- node
- Returns:
- graphgraph of
QuantumNode
All the nodes that are directly connected to specified node.
- graphgraph of
- determineInputsToQuantumNode(node: QuantumNode) set[lsst.pipe.base.graph.quantumNode.QuantumNode] ¶
Return a set of
QuantumNode
that are direct inputs to a specified node.- Parameters:
- node
QuantumNode
The node of the graph for which inputs are to be determined.
- node
- Returns:
- inputs
set
ofQuantumNode
All the nodes that are direct inputs to specified node.
- inputs
- determineOutputsOfQuantumNode(node: QuantumNode) set[lsst.pipe.base.graph.quantumNode.QuantumNode] ¶
Return a set of
QuantumNode
that are direct outputs of a specified node.- Parameters:
- node
QuantumNode
The node of the graph for which outputs are to be determined.
- node
- Returns:
- outputs
set
ofQuantumNode
All the nodes that are direct outputs to specified node.
- outputs
- findCycle() list[tuple[lsst.pipe.base.graph.quantumNode.QuantumNode, lsst.pipe.base.graph.quantumNode.QuantumNode]] ¶
Check a graph for the presense of cycles and returns the edges of any cycles found, or an empty list if there is no cycle.
- Returns:
- result
list
oftuple
of [QuantumNode
,QuantumNode
] A list of any graph edges that form a cycle, or an empty list if there is no cycle. Empty list to so support if graph.find_cycle() syntax as an empty list is falsy.
- result
- findQuantaWithDSType(datasetTypeName: DatasetTypeName) set[lsst.daf.butler._quantum.Quantum] ¶
Return all the
Quantum
that contain a specifiedDatasetTypeName
.- Parameters:
- Returns:
- result
set
ofQuantumNode
objects A
set
ofQuantumNode
s that contain specifiedDatasetTypeName
.
- result
- Raises:
- KeyError
Raised if the
DatasetTypeName
is not part of theQuantumGraph
.
- findTaskDefByLabel(label: str) TaskDef | None ¶
Determine which
TaskDef
objects in this graph are associated with astr
representing a tasks label.
- findTaskDefByName(taskName: str) list[lsst.pipe.base.pipeline.TaskDef] ¶
Determine which
TaskDef
objects in this graph are associated with astr
representing a task name (looks at thetaskName
property ofTaskDef
objects).Returns a list of
TaskDef
objects as aPipelineTask
may appear multiple times in a graph with different labels.
- findTaskWithOutput(datasetTypeName: DatasetTypeName) TaskDef | None ¶
Find all tasks that have the specified dataset type name as an output.
- Parameters:
- Returns:
- Raises:
- KeyError
Raised if the
DatasetTypeName
is not part of theQuantumGraph
.
- findTasksWithInput(datasetTypeName: DatasetTypeName) Iterable[TaskDef] ¶
Find all tasks that have the specified dataset type name as an input.
- Parameters:
- Returns:
- Raises:
- KeyError
Raised if the
DatasetTypeName
is not part of theQuantumGraph
.
- getNodesForTask(taskDef: TaskDef) frozenset[lsst.pipe.base.graph.quantumNode.QuantumNode] ¶
Return all the
QuantumNode
s associated with aTaskDef
.- Parameters:
- Returns:
- nodes
frozenset
[QuantumNode
] A
frozenset
ofQuantumNode
that is associated with the specifiedTaskDef
.
- nodes
- getQuantaForTask(taskDef: TaskDef) frozenset[lsst.daf.butler._quantum.Quantum] ¶
- getQuantumNodeByNodeId(nodeId: UUID) QuantumNode ¶
Lookup a
QuantumNode
from an id associated with the node.- Parameters:
- nodeId
NodeId
The number associated with a node.
- nodeId
- Returns:
- node
QuantumNode
The node corresponding with input number.
- node
- Raises:
- KeyError
Raised if the requested nodeId is not in the graph.
- getSummary() QgraphSummary ¶
Create summary of graph.
- Returns:
- summary
QgraphSummary
Summary of QuantumGraph.
- summary
- get_init_input_refs(task_label: str) list[lsst.daf.butler._dataset_ref.DatasetRef] ¶
Return the DatasetRefs for the given task’s init inputs.
- Parameters:
- task_label
str
Label of the task.
- task_label
- Returns:
- refs
list
[lsst.daf.butler.DatasetRef
] Dataset references. Guaranteed to be a new list, not internal state.
- refs
- get_init_output_refs(task_label: str) list[lsst.daf.butler._dataset_ref.DatasetRef] ¶
Return the DatasetRefs for the given task’s init outputs.
- Parameters:
- task_label
str
Label of the task.
- task_label
- Returns:
- refs
list
[lsst.daf.butler.DatasetRef
] Dataset references. Guaranteed to be a new list, not internal state.
- refs
- get_task_quanta(label: str) Mapping[UUID, Quantum] ¶
Return the quanta associated with the given task label.
- globalInitOutputRefs() list[lsst.daf.butler._dataset_ref.DatasetRef] ¶
Return DatasetRefs for global InitOutputs.
- Returns:
- refs
list
[DatasetRef
] DatasetRefs for global InitOutputs.
- refs
- initInputRefs(taskDef: TaskDef) list[lsst.daf.butler._dataset_ref.DatasetRef] | None ¶
Return DatasetRefs for a given task InitInputs.
- Parameters:
- taskDef
TaskDef
Task definition structure.
- taskDef
- Returns:
- refs
list
[DatasetRef
] orNone
DatasetRef for the task InitInput, can be
None
. This can return either resolved or non-resolved reference.
- refs
- initOutputRefs(taskDef: TaskDef) list[lsst.daf.butler._dataset_ref.DatasetRef] | None ¶
Return DatasetRefs for a given task InitOutputs.
- Parameters:
- taskDef
TaskDef
Task definition structure.
- taskDef
- Returns:
- refs
list
[DatasetRef
] orNone
DatasetRefs for the task InitOutput, can be
None
. This can return either resolved or non-resolved reference. Resolved reference will match Quantum’s initInputs if this is an intermediate dataset type.
- refs
- init_output_run(butler: LimitedButler, existing: bool = True) None ¶
Initialize a new output RUN collection by writing init-output datasets (including configs and packages).
- Parameters:
- butler
lsst.daf.butler.LimitedButler
A limited butler data repository client.
- existing
bool
, optional If
True
check or ignore outputs that already exist. IfFalse
, always raise if an output dataset already exists.
- butler
- Raises:
- lsst.daf.butler.registry.ConflictingDefinitionError
Raised if there are existing init output datasets, and either
existing=False
or their contents are not compatible with this graph.
- iterTaskGraph() Generator[TaskDef, None, None] ¶
Iterate over the
taskGraph
attribute in topological order.
- classmethod load(file: BinaryIO, universe: DimensionUniverse | None = None, nodes: Iterable[UUID] | None = None, graphID: BuildId | None = None, minimumVersion: int = 3) QuantumGraph ¶
Read
QuantumGraph
from a file that was made bysave
.- Parameters:
- file
io.IO
of bytes File with data open in binary mode.
- universe
DimensionUniverse
, optional If
None
it is loaded from theQuantumGraph
saved structure. If supplied, theDimensionUniverse
from the loadedQuantumGraph
will be validated against the supplied argument for compatibility.- nodesiterable of
uuid.UUID
orNone
UUIDs that correspond to nodes in the graph. If specified, only these nodes will be loaded. Defaults to None, in which case all nodes will be loaded.
- graphID
str
orNone
If specified this ID is verified against the loaded graph prior to loading any Nodes. This defaults to None in which case no validation is done.
- minimumVersion
int
Minimum version of a save file to load. Set to -1 to load all versions. Older versions may need to be loaded, and re-saved to upgrade them to the latest format before they can be used in production.
- file
- Returns:
- graph
QuantumGraph
Resulting QuantumGraph instance.
- graph
- Raises:
- TypeError
Raised if data contains instance of a type other than
QuantumGraph
.- ValueError
Raised if one or more of the nodes requested is not in the
QuantumGraph
or if graphID parameter does not match the graph being loaded or if the supplied uri does not point at a validQuantumGraph
save file.
- classmethod loadUri(uri: str | ParseResult | ResourcePath | Path, universe: DimensionUniverse | None = None, nodes: Iterable[UUID] | None = None, graphID: BuildId | None = None, minimumVersion: int = 3) QuantumGraph ¶
Read
QuantumGraph
from a URI.- Parameters:
- uriconvertible to
ResourcePath
URI from where to load the graph.
- universe
DimensionUniverse
, optional If
None
it is loaded from theQuantumGraph
saved structure. If supplied, theDimensionUniverse
from the loadedQuantumGraph
will be validated against the supplied argument for compatibility.- nodesiterable of
uuid.UUID
orNone
UUIDs that correspond to nodes in the graph. If specified, only these nodes will be loaded. Defaults to None, in which case all nodes will be loaded.
- graphID
str
orNone
If specified this ID is verified against the loaded graph prior to loading any Nodes. This defaults to None in which case no validation is done.
- minimumVersion
int
Minimum version of a save file to load. Set to -1 to load all versions. Older versions may need to be loaded, and re-saved to upgrade them to the latest format before they can be used in production.
- uriconvertible to
- Returns:
- graph
QuantumGraph
Resulting QuantumGraph instance.
- graph
- Raises:
- TypeError
Raised if file contains instance of a type other than
QuantumGraph
.- ValueError
Raised if one or more of the nodes requested is not in the
QuantumGraph
or if graphID parameter does not match the graph being loaded or if the supplied uri does not point at a validQuantumGraph
save file.- RuntimeError
Raise if Supplied
DimensionUniverse
is not compatible with theDimensionUniverse
saved in the graph.
- make_init_qbb(butler_config: Config | str | ParseResult | ResourcePath | Path, *, config_search_paths: Iterable[str] | None = None) QuantumBackedButler ¶
Construct an quantum-backed butler suitable for reading and writing init input and init output datasets, respectively.
This requires the full graph to have been loaded.
- Parameters:
- Returns:
- qbb
QuantumBackedButler
A limited butler that can
get
init-input datasets andput
init-output datasets.
- qbb
- classmethod readHeader(uri: str | ParseResult | ResourcePath | Path, minimumVersion: int = 3) str | None ¶
Read the header of a
QuantumGraph
pointed to by the uri parameter and return it as a string.- Parameters:
- uriconvertible to
ResourcePath
The location of the
QuantumGraph
to load. If the argument is a string, it must correspond to a validResourcePath
path.- minimumVersion
int
Minimum version of a save file to load. Set to -1 to load all versions. Older versions may need to be loaded, and re-saved to upgrade them to the latest format before they can be used in production.
- uriconvertible to
- Returns:
- header
str
orNone
The header associated with the specified
QuantumGraph
it there is one, elseNone
.
- header
- Raises:
- ValueError
Raised if the extension of the file specified by uri is not a
QuantumGraph
extension.
- registryDatasetTypes() list[lsst.daf.butler._dataset_type.DatasetType] ¶
Return dataset types used by this graph, their definitions match dataset types from registry.
- Returns:
- refs
list
[DatasetType
] Dataset types for this graph.
- refs
- save(file: BinaryIO) None ¶
Save QuantumGraph to a file.
- Parameters:
- file
io.BufferedIOBase
File to write data open in binary mode.
- file
- saveUri(uri: str | ParseResult | ResourcePath | Path) None ¶
Save
QuantumGraph
to the specified URI.- Parameters:
- uriconvertible to
ResourcePath
URI to where the graph should be saved.
- uriconvertible to
- subset(nodes: QuantumNode | Iterable[QuantumNode]) _T ¶
Create a new graph object that contains the subset of the nodes specified as input. Node number is preserved.
- Parameters:
- nodes
QuantumNode
or iterable ofQuantumNode
Nodes from which to create subset.
- nodes
- Returns:
- graphinstance of graph type
An instance of the type from which the subset was created.
- subsetToConnected() tuple[_T, ...] ¶
Generate a list of subgraphs where each is connected.
- Returns:
- result
list
ofQuantumGraph
A list of graphs that are each connected.
- result
- tasksWithDSType(datasetTypeName: DatasetTypeName) Iterable[TaskDef] ¶
Find all tasks that are associated with the specified dataset type name.
- Parameters:
- Returns:
- Raises:
- KeyError
Raised if the
DatasetTypeName
is not part of theQuantumGraph
.
- updateRun(run: str, *, metadata_key: str | None = None, update_graph_id: bool = False) None ¶
Change output run and dataset ID for each output dataset.
- Parameters:
- run
str
New output run name.
- metadata_key
str
orNone
Specifies matadata key corresponding to output run name to update with new run name. If
None
or if metadata is missing it is not updated. If metadata is present but key is missing, it will be added.- update_graph_id
bool
, optional If
True
then also update graph ID with a new unique value.
- run
- writeDotGraph(output: str | BufferedIOBase) None ¶
Write out the graph as a dot graph.
- Parameters:
- output
str
orio.BufferedIOBase
Either a filesystem path to write to, or a file handle object.
- output
- write_configs(butler: LimitedButler, compare_existing: bool = True) None ¶
Write the config datasets for all tasks in the quantum graph.
- Parameters:
- butler
lsst.daf.butler.LimitedButler
A limited butler data repository client.
- compare_existing
bool
, optional If
True
check configs that already exist for consistency. IfFalse
, always raise if configs already exist.
- butler
- Raises:
- lsst.daf.butler.registry.ConflictingDefinitionError
Raised if an config dataset already exists and
compare_existing=False
, or if the existing config is not consistent with the config in the quantum graph.
- write_init_outputs(butler: LimitedButler, skip_existing: bool = True) None ¶
Write the init-output datasets for all tasks in the quantum graph.
- Parameters:
- butler
lsst.daf.butler.LimitedButler
A limited butler data repository client.
- skip_existing
bool
, optional If
True
(default) ignore init-outputs that already exist. IfFalse
, raise.
- butler
- Raises:
- lsst.daf.butler.registry.ConflictingDefinitionError
Raised if an init-output dataset already exists and
skip_existing=False
.
- write_packages(butler: LimitedButler, compare_existing: bool = True) None ¶
Write the ‘packages’ dataset for the currently-active software versions.
- Parameters:
- butler
lsst.daf.butler.LimitedButler
A limited butler data repository client.
- compare_existing
bool
, optional If
True
check packages that already exist for consistency. IfFalse
, always raise if the packages dataset already exists.
- butler
- Raises:
- lsst.daf.butler.registry.ConflictingDefinitionError
Raised if the packages dataset already exists and is not consistent with the current packages.