QuantumGraph

class lsst.pipe.base.QuantumGraph(quanta: Mapping[lsst.pipe.base.pipeline.TaskDef, Set[lsst.daf.butler.core.quantum.Quantum]], metadata: Optional[Mapping[str, Any]] = None)

Bases: object

QuantumGraph is a directed acyclic graph of QuantumNode objects

This data structure represents a concrete workflow generated from a Pipeline.

Parameters:
quanta : Mapping of TaskDef to sets of Quantum

This maps tasks (and their configs) to the sets of data they are to process.

metadata : Optional Mapping of str to primitives

This is an optional parameter of extra data to carry with the graph. Entries in this mapping should be able to be serialized in JSON.

Attributes Summary

allDatasetTypes Return all the DatasetTypeName objects that are contained inside the graph.
graph Return a graph representing the relations between all the QuantumNode objects.
graphID Returns the ID generated by the graph at construction time
inputQuanta Make a list of all QuantumNode objects that are ‘input’ nodes to the graph, meaning those nodes to not depend on any other nodes in the graph.
isConnected Return True if all of the nodes in the graph are connected, ignores directionality of connections.
metadata
outputQuanta Make a list of all QuantumNode objects that are ‘output’ nodes to the graph, meaning those nodes have no nodes that depend them in the graph.
taskGraph Return a graph representing the relations between the tasks inside the quantum graph.

Methods Summary

checkQuantumInGraph(quantum) Check if specified quantum appears in the graph as part of a node.
determineAncestorsOfQuantumNode(node) Return a graph of the specified node and all the ancestor nodes directly reachable by walking edges.
determineConnectionsOfQuantumNode(node) Return a graph of QuantumNode that are direct inputs and outputs of a specified node.
determineInputsToQuantumNode(node) Return a set of QuantumNode that are direct inputs to a specified node.
determineOutputsOfQuantumNode(node) Return a set of QuantumNode that are direct outputs of a specified node.
findCycle() Check a graph for the presense of cycles and returns the edges of any cycles found, or an empty list if there is no cycle.
findQuantaWithDSType(datasetTypeName) Return all the Quantum that contain a specified DatasetTypeName.
findTaskDefByLabel(label) Determine which TaskDef objects in this graph are associated with a str representing a tasks label.
findTaskDefByName(taskName) Determine which TaskDef objects in this graph are associated with a str representing a task name (looks at the taskName property of TaskDef objects).
findTaskWithOutput(datasetTypeName) Find all tasks that have the specified dataset type name as an output.
findTasksWithInput(datasetTypeName) Find all tasks that have the specified dataset type name as an input.
getNodesForTask(taskDef) Return all the QuantumNodes associated with a TaskDef.
getQuantaForTask(taskDef) Return all the Quantum associated with a TaskDef.
getQuantumNodeByNodeId(nodeId) Lookup a QuantumNode from an id associated with the node.
iterTaskGraph() Iterate over the taskGraph attribute in topological order
load(file, universe, nodes, graphID) Read QuantumGraph from a file that was made by save.
loadUri(uri, str], universe, nodes, graphID) Read QuantumGraph from a URI.
save(file) Save QuantumGraph to a file.
saveUri(uri) Save QuantumGraph to the specified URI.
subset(nodes, …) Create a new graph object that contains the subset of the nodes specified as input.
subsetToConnected() Generate a list of subgraphs where each is connected.
tasksWithDSType(datasetTypeName) Find all tasks that are associated with the specified dataset type name.
writeDotGraph(output, io.BufferedIOBase]) Write out the graph as a dot graph.

Attributes Documentation

allDatasetTypes

Return all the DatasetTypeName objects that are contained inside the graph.

Returns:
tuple of `DatasetTypeName`

All the data set type names that are present in the graph

graph

Return a graph representing the relations between all the QuantumNode objects. Largely it should be preferred to iterate over, and use methods of this class, but sometimes direct access to the networkx object may be helpful

Returns:
graph : networkx.Digraph

Internal datastructure that holds relations of QuantumNode objects

graphID

Returns the ID generated by the graph at construction time

inputQuanta

Make a list of all QuantumNode objects that are ‘input’ nodes to the graph, meaning those nodes to not depend on any other nodes in the graph.

Returns:
inputNodes : iterable of QuantumNode

A list of nodes that are inputs to the graph

isConnected

Return True if all of the nodes in the graph are connected, ignores directionality of connections.

metadata
outputQuanta

Make a list of all QuantumNode objects that are ‘output’ nodes to the graph, meaning those nodes have no nodes that depend them in the graph.

Returns:
outputNodes : iterable of QuantumNode

A list of nodes that are outputs of the graph

taskGraph

Return a graph representing the relations between the tasks inside the quantum graph.

Returns:
taskGraph : networkx.Digraph

Internal datastructure that holds relations of TaskDef objects

Methods Documentation

checkQuantumInGraph(quantum: lsst.daf.butler.core.quantum.Quantum) → bool

Check if specified quantum appears in the graph as part of a node.

Parameters:
quantum : Quantum

The quantum to search for

Returns:
`bool`

The result of searching for the quantum

determineAncestorsOfQuantumNode(node: lsst.pipe.base.graph.quantumNode.QuantumNode) → _T

Return a graph of the specified node and all the ancestor nodes directly reachable by walking edges.

Parameters:
node : QuantumNode

The node for which all ansestors are to be determined

Returns:
graph of `QuantumNode`

Graph of node and all of its ansestors

determineConnectionsOfQuantumNode(node: lsst.pipe.base.graph.quantumNode.QuantumNode) → _T

Return a graph of QuantumNode that are direct inputs and outputs of a specified node.

Parameters:
node : QuantumNode

The node of the graph for which connected nodes are to be determined.

Returns:
graph : graph of QuantumNode

All the nodes that are directly connected to specified node

determineInputsToQuantumNode(node: lsst.pipe.base.graph.quantumNode.QuantumNode) → Set[lsst.pipe.base.graph.quantumNode.QuantumNode]

Return a set of QuantumNode that are direct inputs to a specified node.

Parameters:
node : QuantumNode

The node of the graph for which inputs are to be determined

Returns:
set of `QuantumNode`

All the nodes that are direct inputs to specified node

determineOutputsOfQuantumNode(node: lsst.pipe.base.graph.quantumNode.QuantumNode) → Set[lsst.pipe.base.graph.quantumNode.QuantumNode]

Return a set of QuantumNode that are direct outputs of a specified node.

Parameters:
node : QuantumNode

The node of the graph for which outputs are to be determined

Returns:
set of `QuantumNode`

All the nodes that are direct outputs to specified node

findCycle() → List[Tuple[lsst.pipe.base.graph.quantumNode.QuantumNode, lsst.pipe.base.graph.quantumNode.QuantumNode]]

Check a graph for the presense of cycles and returns the edges of any cycles found, or an empty list if there is no cycle.

Returns:
result : list of tuple of QuantumNode, QuantumNode

A list of any graph edges that form a cycle, or an empty list if there is no cycle. Empty list to so support if graph.find_cycle() syntax as an empty list is falsy.

findQuantaWithDSType(datasetTypeName: NewType.<locals>.new_type) → Set[lsst.daf.butler.core.quantum.Quantum]

Return all the Quantum that contain a specified DatasetTypeName.

Parameters:
datasetTypeName : str

The name of the dataset type to search for as a string, can also accept a DatasetTypeName which is a NewType of str for type safety in static type checking.

Returns:
result : set of QuantumNode objects

A set of QuantumNode`s that contain specified `DatasetTypeName

Raises:
KeyError

Raised if the DatasetTypeName is not part of the QuantumGraph

findTaskDefByLabel(label: str) → Optional[lsst.pipe.base.pipeline.TaskDef]

Determine which TaskDef objects in this graph are associated with a str representing a tasks label.

Parameters:
taskName : str

Name of a task to search for

Returns:
result : TaskDef

TaskDef objects that has the specified label.

findTaskDefByName(taskName: str) → List[lsst.pipe.base.pipeline.TaskDef]

Determine which TaskDef objects in this graph are associated with a str representing a task name (looks at the taskName property of TaskDef objects).

Returns a list of TaskDef objects as a PipelineTask may appear multiple times in a graph with different labels.

Parameters:
taskName : str

Name of a task to search for

Returns:
result : list of TaskDef

List of the TaskDef objects that have the name specified. Multiple values are returned in the case that a task is used multiple times with different labels.

findTaskWithOutput(datasetTypeName: NewType.<locals>.new_type) → Optional[lsst.pipe.base.pipeline.TaskDef]

Find all tasks that have the specified dataset type name as an output.

Parameters:
datasetTypeName : str

A string representing the name of a dataset type to be queried, can also accept a DatasetTypeName which is a NewType of str for type safety in static type checking.

Returns:
`TaskDef` or `None`

TaskDef that outputs DatasetTypeName as an output or None if none of the tasks produce this DatasetTypeName.

Raises:
KeyError

Raised if the DatasetTypeName is not part of the QuantumGraph

findTasksWithInput(datasetTypeName: NewType.<locals>.new_type) → Iterable[lsst.pipe.base.pipeline.TaskDef]

Find all tasks that have the specified dataset type name as an input.

Parameters:
datasetTypeName : str

A string representing the name of a dataset type to be queried, can also accept a DatasetTypeName which is a NewType of str for type safety in static type checking.

Returns:
tasks : iterable of TaskDef

TaskDef objects that have the specified DatasetTypeName as an input, list will be empty if no tasks use specified DatasetTypeName as an input.

Raises:
KeyError

Raised if the DatasetTypeName is not part of the QuantumGraph

getNodesForTask(taskDef: lsst.pipe.base.pipeline.TaskDef) → FrozenSet[lsst.pipe.base.graph.quantumNode.QuantumNode]

Return all the QuantumNodes associated with a TaskDef.

Parameters:
taskDef : TaskDef

The TaskDef for which Quantum are to be queried

Returns:
frozenset of `QuantumNodes`

The frozenset of QuantumNodes that is associated with the specified TaskDef.

getQuantaForTask(taskDef: lsst.pipe.base.pipeline.TaskDef) → FrozenSet[lsst.daf.butler.core.quantum.Quantum]

Return all the Quantum associated with a TaskDef.

Parameters:
taskDef : TaskDef

The TaskDef for which Quantum are to be queried

Returns:
frozenset of `Quantum`

The set of Quantum that is associated with the specified TaskDef.

getQuantumNodeByNodeId(nodeId: lsst.pipe.base.graph.quantumNode.NodeId) → lsst.pipe.base.graph.quantumNode.QuantumNode

Lookup a QuantumNode from an id associated with the node.

Parameters:
nodeId : NodeId

The number associated with a node

Returns:
node : QuantumNode

The node corresponding with input number

Raises:
IndexError

Raised if the requested nodeId is not in the graph.

IncompatibleGraphError

Raised if the nodeId was built with a different graph than is not this instance (or a graph instance that produced this instance through and operation such as subset)

iterTaskGraph() → Generator[lsst.pipe.base.pipeline.TaskDef, None, None]

Iterate over the taskGraph attribute in topological order

Yields:
taskDef : TaskDef

TaskDef objects in topological order

classmethod load(file: io.IO[bytes], universe: DimensionUniverse, nodes: Optional[Iterable[int]] = None, graphID: Optional[BuildId] = None) → QuantumGraph

Read QuantumGraph from a file that was made by save.

Parameters:
file : io.IO of bytes

File with pickle data open in binary mode.

universe: `~lsst.daf.butler.DimensionUniverse`

DimensionUniverse instance, not used by the method itself but needed to ensure that registry data structures are initialized.

nodes: iterable of `int` or None

Numbers that correspond to nodes in the graph. If specified, only these nodes will be loaded. Defaults to None, in which case all nodes will be loaded.

graphID : str or None

If specified this ID is verified against the loaded graph prior to loading any Nodes. This defaults to None in which case no validation is done.

Returns:
graph : QuantumGraph

Resulting QuantumGraph instance.

Raises:
TypeError

Raised if pickle contains instance of a type other than QuantumGraph.

ValueError

Raised if one or more of the nodes requested is not in the QuantumGraph or if graphID parameter does not match the graph being loaded or if the supplied uri does not point at a valid QuantumGraph save file.

Notes

Reading Quanta from pickle requires existence of singleton DimensionUniverse which is usually instantiated during Registry initialization. To make sure that DimensionUniverse exists this method accepts dummy DimensionUniverse argument.

classmethod loadUri(uri: Union[lsst.daf.butler.core._butlerUri._butlerUri.ButlerURI, str], universe: lsst.daf.butler.core.dimensions._universe.DimensionUniverse, nodes: Optional[Iterable[int]] = None, graphID: Optional[NewType.<locals>.new_type] = None) → lsst.pipe.base.graph.graph.QuantumGraph

Read QuantumGraph from a URI.

Parameters:
uri : ButlerURI or str

URI from where to load the graph.

universe: `~lsst.daf.butler.DimensionUniverse`

DimensionUniverse instance, not used by the method itself but needed to ensure that registry data structures are initialized.

nodes: iterable of `int` or None

Numbers that correspond to nodes in the graph. If specified, only these nodes will be loaded. Defaults to None, in which case all nodes will be loaded.

graphID : str or None

If specified this ID is verified against the loaded graph prior to loading any Nodes. This defaults to None in which case no validation is done.

Returns:
graph : QuantumGraph

Resulting QuantumGraph instance.

Raises:
TypeError

Raised if pickle contains instance of a type other than QuantumGraph.

ValueError

Raised if one or more of the nodes requested is not in the QuantumGraph or if graphID parameter does not match the graph being loaded or if the supplied uri does not point at a valid QuantumGraph save file.

Notes

Reading Quanta from pickle requires existence of singleton DimensionUniverse which is usually instantiated during Registry initialization. To make sure that DimensionUniverse exists this method accepts dummy DimensionUniverse argument.

save(file: io.IO[bytes])

Save QuantumGraph to a file.

Presently we store QuantumGraph in pickle format, this could potentially change in the future if better format is found.

Parameters:
file : io.BufferedIOBase

File to write pickle data open in binary mode.

saveUri(uri)

Save QuantumGraph to the specified URI.

Parameters:
uri : ButlerURI or str

URI to where the graph should be saved.

subset(nodes: Union[lsst.pipe.base.graph.quantumNode.QuantumNode, Iterable[lsst.pipe.base.graph.quantumNode.QuantumNode]]) → _T

Create a new graph object that contains the subset of the nodes specified as input. Node number is preserved.

Parameters:
nodes : QuantumNode or iterable of QuantumNode
Returns:
graph : instance of graph type

An instance of the type from which the subset was created

subsetToConnected() → Tuple[_T, ...]

Generate a list of subgraphs where each is connected.

Returns:
result : list of QuantumGraph

A list of graphs that are each connected

tasksWithDSType(datasetTypeName: NewType.<locals>.new_type) → Iterable[lsst.pipe.base.pipeline.TaskDef]

Find all tasks that are associated with the specified dataset type name.

Parameters:
datasetTypeName : str

A string representing the name of a dataset type to be queried, can also accept a DatasetTypeName which is a NewType of str for type safety in static type checking.

Returns:
result : iterable of TaskDef

TaskDef objects that are associated with the specified DatasetTypeName

Raises:
KeyError

Raised if the DatasetTypeName is not part of the QuantumGraph

writeDotGraph(output: Union[str, io.BufferedIOBase])

Write out the graph as a dot graph.

Parameters:
output : str or io.BufferedIOBase

Either a filesystem path to write to, or a file handle object