PipelineTaskConnections

class lsst.pipe.base.PipelineTaskConnections(*, config: 'PipelineTaskConfig' | None = None)

Bases: object

PipelineTaskConnections is a class used to declare desired IO when a PipelineTask is run by an activator

Parameters:
config : PipelineTaskConfig

A PipelineTaskConfig class instance whose class has been configured to use this PipelineTaskConnectionsClass

See also

iterConnections

Notes

PipelineTaskConnection classes are created by declaring class attributes of types defined in lsst.pipe.base.connectionTypes and are listed as follows:

  • InitInput - Defines connections in a quantum graph which are used as inputs to the __init__ function of the PipelineTask corresponding to this class
  • InitOuput - Defines connections in a quantum graph which are to be persisted using a butler at the end of the __init__ function of the PipelineTask corresponding to this class. The variable name used to define this connection should be the same as an attribute name on the PipelineTask instance. E.g. if an InitOutput is declared with the name outputSchema in a PipelineTaskConnections class, then a PipelineTask instance should have an attribute self.outputSchema defined. Its value is what will be saved by the activator framework.
  • PrerequisiteInput - An input connection type that defines a lsst.daf.butler.DatasetType that must be present at execution time, but that will not be used during the course of creating the quantum graph to be executed. These most often are things produced outside the processing pipeline, such as reference catalogs.
  • Input - Input lsst.daf.butler.DatasetType objects that will be used in the run method of a PipelineTask. The name used to declare class attribute must match a function argument name in the run method of a PipelineTask. E.g. If the PipelineTaskConnections defines an Input with the name calexp, then the corresponding signature should be PipelineTask.run(calexp, ...)
  • Output - A lsst.daf.butler.DatasetType that will be produced by an execution of a PipelineTask. The name used to declare the connection must correspond to an attribute of a Struct that is returned by a PipelineTask run method. E.g. if an output connection is defined with the name measCat, then the corresponding PipelineTask.run method must return Struct(measCat=X,..) where X matches the storageClass type defined on the output connection.

The process of declaring a PipelineTaskConnection class involves parameters passed in the declaration statement.

The first parameter is dimensions which is an iterable of strings which defines the unit of processing the run method of a corresponding PipelineTask will operate on. These dimensions must match dimensions that exist in the butler registry which will be used in executing the corresponding PipelineTask.

The second parameter is labeled defaultTemplates and is conditionally optional. The name attributes of connections can be specified as python format strings, with named format arguments. If any of the name parameters on connections defined in a PipelineTaskConnections class contain a template, then a default template value must be specified in the defaultTemplates argument. This is done by passing a dictionary with keys corresponding to a template identifier, and values corresponding to the value to use as a default when formatting the string. For example if ConnectionClass.calexp.name = '{input}Coadd_calexp' then defaultTemplates = {‘input’: ‘deep’}.

Once a PipelineTaskConnections class is created, it is used in the creation of a PipelineTaskConfig. This is further documented in the documentation of PipelineTaskConfig. For the purposes of this documentation, the relevant information is that the config class allows configuration of connection names by users when running a pipeline.

Instances of a PipelineTaskConnections class are used by the pipeline task execution framework to introspect what a corresponding PipelineTask will require, and what it will produce.

Examples

>>> from lsst.pipe.base import connectionTypes as cT
>>> from lsst.pipe.base import PipelineTaskConnections
>>> from lsst.pipe.base import PipelineTaskConfig
>>> class ExampleConnections(PipelineTaskConnections,
...                          dimensions=("A", "B"),
...                          defaultTemplates={"foo": "Example"}):
...     inputConnection = cT.Input(doc="Example input",
...                                dimensions=("A", "B"),
...                                storageClass=Exposure,
...                                name="{foo}Dataset")
...     outputConnection = cT.Output(doc="Example output",
...                                  dimensions=("A", "B"),
...                                  storageClass=Exposure,
...                                  name="{foo}output")
>>> class ExampleConfig(PipelineTaskConfig,
...                     pipelineConnections=ExampleConnections):
...    pass
>>> config = ExampleConfig()
>>> config.connections.foo = Modified
>>> config.connections.outputConnection = "TotallyDifferent"
>>> connections = ExampleConnections(config=config)
>>> assert(connections.inputConnection.name == "ModifiedDataset")
>>> assert(connections.outputConnection.name == "TotallyDifferent")

Attributes Summary

allConnections
initInputs
initOutputs
inputs
outputs
prerequisiteInputs

Methods Summary

adjustQuantum(inputs, …) Override to make adjustments to lsst.daf.butler.DatasetRef objects in the lsst.daf.butler.core.Quantum during the graph generation stage of the activator.
buildDatasetRefs(quantum) Builds QuantizedConnections corresponding to input Quantum

Attributes Documentation

allConnections = {}
initInputs = frozenset()
initOutputs = frozenset()
inputs = frozenset()
outputs = frozenset()
prerequisiteInputs = frozenset()

Methods Documentation

adjustQuantum(inputs: Dict[str, Tuple[lsst.pipe.base.connectionTypes.BaseInput, Collection[lsst.daf.butler.core.datasets.ref.DatasetRef]]], outputs: Dict[str, Tuple[lsst.pipe.base.connectionTypes.Output, Collection[lsst.daf.butler.core.datasets.ref.DatasetRef]]], label: str, data_id: lsst.daf.butler.core.dimensions._coordinate.DataCoordinate) → Tuple[Mapping[str, Tuple[lsst.pipe.base.connectionTypes.BaseInput, Collection[lsst.daf.butler.core.datasets.ref.DatasetRef]]], Mapping[str, Tuple[lsst.pipe.base.connectionTypes.Output, Collection[lsst.daf.butler.core.datasets.ref.DatasetRef]]]]

Override to make adjustments to lsst.daf.butler.DatasetRef objects in the lsst.daf.butler.core.Quantum during the graph generation stage of the activator.

Parameters:
inputs : dict

Dictionary whose keys are an input (regular or prerequisite) connection name and whose values are a tuple of the connection instance and a collection of associated DatasetRef objects. The exact type of the nested collections is unspecified; it can be assumed to be multi-pass iterable and support len and in, but it should not be mutated in place. In contrast, the outer dictionaries are guaranteed to be temporary copies that are true dict instances, and hence may be modified and even returned; this is especially useful for delegating to super (see notes below).

outputs : Mapping

Mapping of output datasets, with the same structure as inputs.

label : str

Label for this task in the pipeline (should be used in all diagnostic messages).

data_id : lsst.daf.butler.DataCoordinate

Data ID for this quantum in the pipeline (should be used in all diagnostic messages).

Returns:
adjusted_inputs : Mapping

Mapping of the same form as inputs with updated containers of input DatasetRef objects. Connections that are not changed should not be returned at all. Datasets may only be removed, not added. Nested collections may be of any multi-pass iterable type, and the order of iteration will set the order of iteration within PipelineTask.runQuantum.

adjusted_outputs : Mapping

Mapping of updated output datasets, with the same structure and interpretation as adjusted_inputs.

Raises:
ScalarError

Raised if any Input or PrerequisiteInput connection has multiple set to False, but multiple datasets.

NoWorkFound

Raised to indicate that this quantum should not be run; not enough datasets were found for a regular Input connection, and the quantum should be pruned or skipped.

FileNotFoundError

Raised to cause QuantumGraph generation to fail (with the message included in this exception); not enough datasets were found for a PrerequisiteInput connection.

Notes

The base class implementation performs important checks. It always returns an empty mapping (i.e. makes no adjustments). It should always called be via super by custom implementations, ideally at the end of the custom implementation with already-adjusted mappings when any datasets are actually dropped, e.g.:

def adjustQuantum(self, inputs, outputs, label, data_id):
    # Filter out some dataset refs for one connection.
    connection, old_refs = inputs["my_input"]
    new_refs = [ref for ref in old_refs if ...]
    adjusted_inputs = {"my_input", (connection, new_refs)}
    # Update the original inputs so we can pass them to super.
    inputs.update(adjusted_inputs)
    # Can ignore outputs from super because they are guaranteed
    # to be empty.
    super().adjustQuantum(inputs, outputs, label_data_id)
    # Return only the connections we modified.
    return adjusted_inputs, {}

Removing outputs here is guaranteed to affect what is actually passed to PipelineTask.runQuantum, but its effect on the larger graph may be deferred to execution, depending on the context in which adjustQuantum is being run: if one quantum removes an output that is needed by a second quantum as input, the second quantum may not be adjusted (and hence pruned or skipped) until that output is actually found to be missing at execution time.

Tasks that desire zip-iteration consistency between any combinations of connections that have the same data ID should generally implement adjustQuantum to achieve this, even if they could also run that logic during execution; this allows the system to see outputs that will not be produced because the corresponding input is missing as early as possible.

buildDatasetRefs(quantum: lsst.daf.butler.core.quantum.Quantum) → Tuple[lsst.pipe.base.connections.InputQuantizedConnection, lsst.pipe.base.connections.OutputQuantizedConnection]

Builds QuantizedConnections corresponding to input Quantum

Parameters:
quantum : lsst.daf.butler.Quantum

Quantum object which defines the inputs and outputs for a given unit of processing

Returns:
retVal : tuple of (InputQuantizedConnection,

OutputQuantizedConnection) Namespaces mapping attribute names (identifiers of connections) to butler references defined in the input lsst.daf.butler.Quantum