QuantumGraph

class lsst.pipe.base.QuantumGraph(quanta: Mapping[TaskDef, set[lsst.daf.butler._quantum.Quantum]], metadata: Mapping[str, Any] | None = None, universe: DimensionUniverse | None = None, initInputs: Mapping[TaskDef, Iterable[DatasetRef]] | None = None, initOutputs: Mapping[TaskDef, Iterable[DatasetRef]] | None = None, globalInitOutputs: Iterable[DatasetRef] | None = None, registryDatasetTypes: Iterable[DatasetType] | None = None)

Bases: object

QuantumGraph is a directed acyclic graph of QuantumNode objects.

This data structure represents a concrete workflow generated from a Pipeline.

Parameters:
quantaMapping [ TaskDef, set [ Quantum ] ]

This maps tasks (and their configs) to the sets of data they are to process.

metadataOptional Mapping of str to primitives

This is an optional parameter of extra data to carry with the graph. Entries in this mapping should be able to be serialized in JSON.

universeDimensionUniverse, optional

The dimensions in which quanta can be defined. Need only be provided if no quanta have data IDs.

initInputsMapping, optional

Maps tasks to their InitInput dataset refs. Dataset refs can be either resolved or non-resolved. Presently the same dataset refs are included in each Quantum for the same task.

initOutputsMapping, optional

Maps tasks to their InitOutput dataset refs. Dataset refs can be either resolved or non-resolved. For intermediate resolved refs their dataset ID must match initInputs and Quantum initInputs.

globalInitOutputsiterable [ DatasetRef ], optional

Dataset refs for some global objects produced by pipeline. These objects include task configurations and package versions. Typically they have an empty DataId, but there is no real restriction on what can appear here.

registryDatasetTypesiterable [ DatasetType ], optional

Dataset types which are used by this graph, their definitions must match registry. If registry does not define dataset type yet, then it should match one that will be created later.

Raises:
ValueError

Raised if the graph is pruned such that some tasks no longer have nodes associated with them.

Attributes Summary

allDatasetTypes

All the data set type names that are present in the graph (tuple [str]).

graph

A graph representing the relations between all the QuantumNode objects (networkx.DiGraph).

graphID

The ID generated by the graph at construction time (str).

inputQuanta

The nodes that are inputs to the graph (iterable [QuantumNode]).

isConnected

Whether all of the nodes in the graph are connected, ignoring directionality of connections (bool).

metadata

Extra data carried with the graph (mapping [str] or None).

outputQuanta

The nodes that are outputs of the graph (iterable [QuantumNode]).

pipeline_graph

A graph representation of the tasks and dataset types in the quantum graph.

taskGraph

A graph representing the relations between the tasks inside the quantum graph (networkx.DiGraph).

universe

Dimension universe associated with this graph (DimensionUniverse).

Methods Summary

buildAndPrintHeader()

Create a header that would be used in a save of this object and prints it out to standard out.

checkQuantumInGraph(quantum)

Check if specified quantum appears in the graph as part of a node.

determineAncestorsOfQuantumNode(node)

Return a graph of the specified node and all the ancestor nodes directly reachable by walking edges.

determineConnectionsOfQuantumNode(node)

Return a graph of QuantumNode that are direct inputs and outputs of a specified node.

determineInputsToQuantumNode(node)

Return a set of QuantumNode that are direct inputs to a specified node.

determineOutputsOfQuantumNode(node)

Return a set of QuantumNode that are direct outputs of a specified node.

findCycle()

Check a graph for the presense of cycles and returns the edges of any cycles found, or an empty list if there is no cycle.

findQuantaWithDSType(datasetTypeName)

Return all the Quantum that contain a specified DatasetTypeName.

findTaskDefByLabel(label)

Determine which TaskDef objects in this graph are associated with a str representing a tasks label.

findTaskDefByName(taskName)

Determine which TaskDef objects in this graph are associated with a str representing a task name (looks at the taskName property of TaskDef objects).

findTaskWithOutput(datasetTypeName)

Find all tasks that have the specified dataset type name as an output.

findTasksWithInput(datasetTypeName)

Find all tasks that have the specified dataset type name as an input.

getNodesForTask(taskDef)

Return all the QuantumNodes associated with a TaskDef.

getNumberOfQuantaForTask(taskDef)

Return the number of Quantum associated with a TaskDef.

getQuantaForTask(taskDef)

Return all the Quantum associated with a TaskDef.

getQuantumNodeByNodeId(nodeId)

Lookup a QuantumNode from an id associated with the node.

getSummary()

Create summary of graph.

get_task_quanta(label)

Return the quanta associated with the given task label.

globalInitOutputRefs()

Return DatasetRefs for global InitOutputs.

initInputRefs(taskDef)

Return DatasetRefs for a given task InitInputs.

initOutputRefs(taskDef)

Return DatasetRefs for a given task InitOutputs.

iterTaskGraph()

Iterate over the taskGraph attribute in topological order.

load(file[, universe, nodes, graphID, ...])

Read QuantumGraph from a file that was made by save.

loadUri(uri[, universe, nodes, graphID, ...])

Read QuantumGraph from a URI.

readHeader(uri[, minimumVersion])

Read the header of a QuantumGraph pointed to by the uri parameter and return it as a string.

registryDatasetTypes()

Return dataset types used by this graph, their definitions match dataset types from registry.

save(file)

Save QuantumGraph to a file.

saveUri(uri)

Save QuantumGraph to the specified URI.

subset(nodes)

Create a new graph object that contains the subset of the nodes specified as input.

subsetToConnected()

Generate a list of subgraphs where each is connected.

tasksWithDSType(datasetTypeName)

Find all tasks that are associated with the specified dataset type name.

updateRun(run, *[, metadata_key, ...])

Change output run and dataset ID for each output dataset.

writeDotGraph(output)

Write out the graph as a dot graph.

Attributes Documentation

allDatasetTypes

All the data set type names that are present in the graph (tuple [str]).

These types do not include global init-outputs.

graph

A graph representing the relations between all the QuantumNode objects (networkx.DiGraph).

The graph should usually be iterated over, or passed to methods of this class, but sometimes direct access to the networkx object may be helpful.

graphID

The ID generated by the graph at construction time (str).

inputQuanta

The nodes that are inputs to the graph (iterable [QuantumNode]).

These are the nodes that do not depend on any other nodes in the graph.

isConnected

Whether all of the nodes in the graph are connected, ignoring directionality of connections (bool).

metadata

Extra data carried with the graph (mapping [str] or None).

The mapping is a dynamic view of this object’s metadata. Values should be able to be serialized in JSON.

outputQuanta

The nodes that are outputs of the graph (iterable [QuantumNode]).

These are the nodes that have no nodes that depend on them in the graph.

pipeline_graph

A graph representation of the tasks and dataset types in the quantum graph.

taskGraph

A graph representing the relations between the tasks inside the quantum graph (networkx.DiGraph).

universe

Dimension universe associated with this graph (DimensionUniverse).

Methods Documentation

buildAndPrintHeader() None

Create a header that would be used in a save of this object and prints it out to standard out.

checkQuantumInGraph(quantum: Quantum) bool

Check if specified quantum appears in the graph as part of a node.

Parameters:
quantumlsst.daf.butler.Quantum

The quantum to search for.

Returns:
in_graphbool

The result of searching for the quantum.

determineAncestorsOfQuantumNode(node: QuantumNode) _T

Return a graph of the specified node and all the ancestor nodes directly reachable by walking edges.

Parameters:
nodeQuantumNode

The node for which all ancestors are to be determined.

Returns:
ancestorsgraph of QuantumNode

Graph of node and all of its ancestors.

determineConnectionsOfQuantumNode(node: QuantumNode) _T

Return a graph of QuantumNode that are direct inputs and outputs of a specified node.

Parameters:
nodeQuantumNode

The node of the graph for which connected nodes are to be determined.

Returns:
graphgraph of QuantumNode

All the nodes that are directly connected to specified node.

determineInputsToQuantumNode(node: QuantumNode) set[lsst.pipe.base.graph.quantumNode.QuantumNode]

Return a set of QuantumNode that are direct inputs to a specified node.

Parameters:
nodeQuantumNode

The node of the graph for which inputs are to be determined.

Returns:
inputsset of QuantumNode

All the nodes that are direct inputs to specified node.

determineOutputsOfQuantumNode(node: QuantumNode) set[lsst.pipe.base.graph.quantumNode.QuantumNode]

Return a set of QuantumNode that are direct outputs of a specified node.

Parameters:
nodeQuantumNode

The node of the graph for which outputs are to be determined.

Returns:
outputsset of QuantumNode

All the nodes that are direct outputs to specified node.

findCycle() list[tuple[lsst.pipe.base.graph.quantumNode.QuantumNode, lsst.pipe.base.graph.quantumNode.QuantumNode]]

Check a graph for the presense of cycles and returns the edges of any cycles found, or an empty list if there is no cycle.

Returns:
resultlist of tuple of [ QuantumNode, QuantumNode ]

A list of any graph edges that form a cycle, or an empty list if there is no cycle. Empty list to so support if graph.find_cycle() syntax as an empty list is falsy.

findQuantaWithDSType(datasetTypeName: DatasetTypeName) set[lsst.daf.butler._quantum.Quantum]

Return all the Quantum that contain a specified DatasetTypeName.

Parameters:
datasetTypeNamestr

The name of the dataset type to search for as a string, can also accept a DatasetTypeName which is a NewType of str for type safety in static type checking.

Returns:
resultset of QuantumNode objects

A set of QuantumNodes that contain specified DatasetTypeName.

Raises:
KeyError

Raised if the DatasetTypeName is not part of the QuantumGraph.

findTaskDefByLabel(label: str) TaskDef | None

Determine which TaskDef objects in this graph are associated with a str representing a tasks label.

Parameters:
labelstr

Name of a task to search for.

Returns:
resultTaskDef

TaskDef objects that has the specified label.

findTaskDefByName(taskName: str) list[lsst.pipe.base.pipeline.TaskDef]

Determine which TaskDef objects in this graph are associated with a str representing a task name (looks at the taskName property of TaskDef objects).

Returns a list of TaskDef objects as a PipelineTask may appear multiple times in a graph with different labels.

Parameters:
taskNamestr

Name of a task to search for.

Returns:
resultlist of TaskDef

List of the TaskDef objects that have the name specified. Multiple values are returned in the case that a task is used multiple times with different labels.

findTaskWithOutput(datasetTypeName: DatasetTypeName) TaskDef | None

Find all tasks that have the specified dataset type name as an output.

Parameters:
datasetTypeNamestr

A string representing the name of a dataset type to be queried, can also accept a DatasetTypeName which is a NewType of str for type safety in static type checking.

Returns:
resultTaskDef or None

TaskDef that outputs DatasetTypeName as an output or None if none of the tasks produce this DatasetTypeName.

Raises:
KeyError

Raised if the DatasetTypeName is not part of the QuantumGraph.

findTasksWithInput(datasetTypeName: DatasetTypeName) Iterable[TaskDef]

Find all tasks that have the specified dataset type name as an input.

Parameters:
datasetTypeNamestr

A string representing the name of a dataset type to be queried, can also accept a DatasetTypeName which is a NewType of str for type safety in static type checking.

Returns:
tasksiterable of TaskDef

TaskDef objects that have the specified DatasetTypeName as an input, list will be empty if no tasks use specified DatasetTypeName as an input.

Raises:
KeyError

Raised if the DatasetTypeName is not part of the QuantumGraph.

getNodesForTask(taskDef: TaskDef) frozenset[lsst.pipe.base.graph.quantumNode.QuantumNode]

Return all the QuantumNodes associated with a TaskDef.

Parameters:
taskDefTaskDef

The TaskDef for which Quantum are to be queried.

Returns:
nodesfrozenset [ QuantumNode ]

A frozenset of QuantumNode that is associated with the specified TaskDef.

getNumberOfQuantaForTask(taskDef: TaskDef) int

Return the number of Quantum associated with a TaskDef.

Parameters:
taskDefTaskDef

The TaskDef for which Quantum are to be queried.

Returns:
countint

The number of Quantum that are associated with the specified TaskDef.

getQuantaForTask(taskDef: TaskDef) frozenset[lsst.daf.butler._quantum.Quantum]

Return all the Quantum associated with a TaskDef.

Parameters:
taskDefTaskDef

The TaskDef for which Quantum are to be queried.

Returns:
quantafrozenset of Quantum

The set of Quantum that is associated with the specified TaskDef.

getQuantumNodeByNodeId(nodeId: UUID) QuantumNode

Lookup a QuantumNode from an id associated with the node.

Parameters:
nodeIdNodeId

The number associated with a node.

Returns:
nodeQuantumNode

The node corresponding with input number.

Raises:
KeyError

Raised if the requested nodeId is not in the graph.

getSummary() QgraphSummary

Create summary of graph.

Returns:
summaryQgraphSummary

Summary of QuantumGraph.

get_task_quanta(label: str) Mapping[UUID, Quantum]

Return the quanta associated with the given task label.

Parameters:
labelstr

Task label.

Returns:
quantaMapping [ uuid.UUID, Quantum ]

Mapping from quantum ID to quantum. Empty if label does not correspond to a task in this graph.

globalInitOutputRefs() list[lsst.daf.butler._dataset_ref.DatasetRef]

Return DatasetRefs for global InitOutputs.

Returns:
refslist [ DatasetRef ]

DatasetRefs for global InitOutputs.

initInputRefs(taskDef: TaskDef) list[lsst.daf.butler._dataset_ref.DatasetRef] | None

Return DatasetRefs for a given task InitInputs.

Parameters:
taskDefTaskDef

Task definition structure.

Returns:
refslist [ DatasetRef ] or None

DatasetRef for the task InitInput, can be None. This can return either resolved or non-resolved reference.

initOutputRefs(taskDef: TaskDef) list[lsst.daf.butler._dataset_ref.DatasetRef] | None

Return DatasetRefs for a given task InitOutputs.

Parameters:
taskDefTaskDef

Task definition structure.

Returns:
refslist [ DatasetRef ] or None

DatasetRefs for the task InitOutput, can be None. This can return either resolved or non-resolved reference. Resolved reference will match Quantum’s initInputs if this is an intermediate dataset type.

iterTaskGraph() Generator[TaskDef, None, None]

Iterate over the taskGraph attribute in topological order.

Yields:
taskDefTaskDef

TaskDef objects in topological order.

classmethod load(file: BinaryIO, universe: DimensionUniverse | None = None, nodes: Iterable[UUID] | None = None, graphID: BuildId | None = None, minimumVersion: int = 3) QuantumGraph

Read QuantumGraph from a file that was made by save.

Parameters:
fileio.IO of bytes

File with data open in binary mode.

universeDimensionUniverse, optional

If None it is loaded from the QuantumGraph saved structure. If supplied, the DimensionUniverse from the loaded QuantumGraph will be validated against the supplied argument for compatibility.

nodesiterable of uuid.UUID or None

UUIDs that correspond to nodes in the graph. If specified, only these nodes will be loaded. Defaults to None, in which case all nodes will be loaded.

graphIDstr or None

If specified this ID is verified against the loaded graph prior to loading any Nodes. This defaults to None in which case no validation is done.

minimumVersionint

Minimum version of a save file to load. Set to -1 to load all versions. Older versions may need to be loaded, and re-saved to upgrade them to the latest format before they can be used in production.

Returns:
graphQuantumGraph

Resulting QuantumGraph instance.

Raises:
TypeError

Raised if data contains instance of a type other than QuantumGraph.

ValueError

Raised if one or more of the nodes requested is not in the QuantumGraph or if graphID parameter does not match the graph being loaded or if the supplied uri does not point at a valid QuantumGraph save file.

classmethod loadUri(uri: str | ParseResult | ResourcePath | Path, universe: DimensionUniverse | None = None, nodes: Iterable[UUID] | None = None, graphID: BuildId | None = None, minimumVersion: int = 3) QuantumGraph

Read QuantumGraph from a URI.

Parameters:
uriconvertible to ResourcePath

URI from where to load the graph.

universeDimensionUniverse, optional

If None it is loaded from the QuantumGraph saved structure. If supplied, the DimensionUniverse from the loaded QuantumGraph will be validated against the supplied argument for compatibility.

nodesiterable of uuid.UUID or None

UUIDs that correspond to nodes in the graph. If specified, only these nodes will be loaded. Defaults to None, in which case all nodes will be loaded.

graphIDstr or None

If specified this ID is verified against the loaded graph prior to loading any Nodes. This defaults to None in which case no validation is done.

minimumVersionint

Minimum version of a save file to load. Set to -1 to load all versions. Older versions may need to be loaded, and re-saved to upgrade them to the latest format before they can be used in production.

Returns:
graphQuantumGraph

Resulting QuantumGraph instance.

Raises:
TypeError

Raised if file contains instance of a type other than QuantumGraph.

ValueError

Raised if one or more of the nodes requested is not in the QuantumGraph or if graphID parameter does not match the graph being loaded or if the supplied uri does not point at a valid QuantumGraph save file.

RuntimeError

Raise if Supplied DimensionUniverse is not compatible with the DimensionUniverse saved in the graph.

classmethod readHeader(uri: str | ParseResult | ResourcePath | Path, minimumVersion: int = 3) str | None

Read the header of a QuantumGraph pointed to by the uri parameter and return it as a string.

Parameters:
uriconvertible to ResourcePath

The location of the QuantumGraph to load. If the argument is a string, it must correspond to a valid ResourcePath path.

minimumVersionint

Minimum version of a save file to load. Set to -1 to load all versions. Older versions may need to be loaded, and re-saved to upgrade them to the latest format before they can be used in production.

Returns:
headerstr or None

The header associated with the specified QuantumGraph it there is one, else None.

Raises:
ValueError

Raised if the extension of the file specified by uri is not a QuantumGraph extension.

registryDatasetTypes() list[lsst.daf.butler._dataset_type.DatasetType]

Return dataset types used by this graph, their definitions match dataset types from registry.

Returns:
refslist [ DatasetType ]

Dataset types for this graph.

save(file: BinaryIO) None

Save QuantumGraph to a file.

Parameters:
fileio.BufferedIOBase

File to write data open in binary mode.

saveUri(uri: str | ParseResult | ResourcePath | Path) None

Save QuantumGraph to the specified URI.

Parameters:
uriconvertible to ResourcePath

URI to where the graph should be saved.

subset(nodes: QuantumNode | Iterable[QuantumNode]) _T

Create a new graph object that contains the subset of the nodes specified as input. Node number is preserved.

Parameters:
nodesQuantumNode or iterable of QuantumNode

Nodes from which to create subset.

Returns:
graphinstance of graph type

An instance of the type from which the subset was created.

subsetToConnected() tuple[_T, ...]

Generate a list of subgraphs where each is connected.

Returns:
resultlist of QuantumGraph

A list of graphs that are each connected.

tasksWithDSType(datasetTypeName: DatasetTypeName) Iterable[TaskDef]

Find all tasks that are associated with the specified dataset type name.

Parameters:
datasetTypeNamestr

A string representing the name of a dataset type to be queried, can also accept a DatasetTypeName which is a NewType of str for type safety in static type checking.

Returns:
resultiterable of TaskDef

TaskDef objects that are associated with the specified DatasetTypeName.

Raises:
KeyError

Raised if the DatasetTypeName is not part of the QuantumGraph.

updateRun(run: str, *, metadata_key: str | None = None, update_graph_id: bool = False) None

Change output run and dataset ID for each output dataset.

Parameters:
runstr

New output run name.

metadata_keystr or None

Specifies matadata key corresponding to output run name to update with new run name. If None or if metadata is missing it is not updated. If metadata is present but key is missing, it will be added.

update_graph_idbool, optional

If True then also update graph ID with a new unique value.

writeDotGraph(output: str | BufferedIOBase) None

Write out the graph as a dot graph.

Parameters:
outputstr or io.BufferedIOBase

Either a filesystem path to write to, or a file handle object.