PipelineTaskConnections¶
- class lsst.pipe.base.PipelineTaskConnections(*, config: PipelineTaskConfig | None = None)¶
Bases:
objectPipelineTaskConnections is a class used to declare desired IO when a PipelineTask is run by an activator.
- Parameters:
- config
PipelineTaskConfig A
PipelineTaskConfigclass instance whose class has been configured to use thisPipelineTaskConnectionsclass.
- config
See also
iterConnectionsIterator over selected connections.
Notes
PipelineTaskConnectionclasses are created by declaring class attributes of types defined inlsst.pipe.base.connectionTypesand are listed as follows:InitInput- Defines connections in a quantum graph which are used as inputs to the__init__function of thePipelineTaskcorresponding to this classInitOuput- Defines connections in a quantum graph which are to be persisted using a butler at the end of the__init__function of thePipelineTaskcorresponding to this class. The variable name used to define this connection should be the same as an attribute name on thePipelineTaskinstance. E.g. if anInitOutputis declared with the nameoutputSchemain aPipelineTaskConnectionsclass, then aPipelineTaskinstance should have an attributeself.outputSchemadefined. Its value is what will be saved by the activator framework.PrerequisiteInput- An input connection type that defines alsst.daf.butler.DatasetTypethat must be present at execution time, but that will not be used during the course of creating the quantum graph to be executed. These most often are things produced outside the processing pipeline, such as reference catalogs.Input- Inputlsst.daf.butler.DatasetTypeobjects that will be used in therunmethod of aPipelineTask. The name used to declare class attribute must match a function argument name in therunmethod of aPipelineTask. E.g. If thePipelineTaskConnectionsdefines anInputwith the namecalexp, then the corresponding signature should bePipelineTask.run(calexp, ...)Output- Alsst.daf.butler.DatasetTypethat will be produced by an execution of aPipelineTask. The name used to declare the connection must correspond to an attribute of aStructthat is returned by aPipelineTaskrunmethod. E.g. if an output connection is defined with the namemeasCat, then the correspondingPipelineTask.runmethod must returnStruct(measCat=X,..)where X matches thestorageClasstype defined on the output connection.
Attributes of these types can also be created, replaced, or deleted on the
PipelineTaskConnectionsinstance in the__init__method, if more than just the name depends on the configuration. It is preferred to define them in the class when possible (even if configuration may cause the connection to be removed from the instance).The process of declaring a
PipelineTaskConnectionclass involves parameters passed in the declaration statement.The first parameter is
dimensionswhich is an iterable of strings which defines the unit of processing the run method of a correspondingPipelineTaskwill operate on. These dimensions must match dimensions that exist in the butler registry which will be used in executing the correspondingPipelineTask. The dimensions may be also modified in subclass__init__methods if they need to depend on configuration.The second parameter is labeled
defaultTemplatesand is conditionally optional. The name attributes of connections can be specified as python format strings, with named format arguments. If any of the name parameters on connections defined in aPipelineTaskConnectionsclass contain a template, then a default template value must be specified in thedefaultTemplatesargument. This is done by passing a dictionary with keys corresponding to a template identifier, and values corresponding to the value to use as a default when formatting the string. For example ifConnectionsClass.calexp.name = '{input}Coadd_calexp'thendefaultTemplates= {‘input’: ‘deep’}.Once a
PipelineTaskConnectionsclass is created, it is used in the creation of aPipelineTaskConfig. This is further documented in the documentation ofPipelineTaskConfig. For the purposes of this documentation, the relevant information is that the config class allows configuration of connection names by users when running a pipeline.Instances of a
PipelineTaskConnectionsclass are used by the pipeline task execution framework to introspect what a correspondingPipelineTaskwill require, and what it will produce.Examples
>>> from lsst.pipe.base import connectionTypes as cT >>> from lsst.pipe.base import PipelineTaskConnections >>> from lsst.pipe.base import PipelineTaskConfig >>> class ExampleConnections(PipelineTaskConnections, ... dimensions=("A", "B"), ... defaultTemplates={"foo": "Example"}): ... inputConnection = cT.Input(doc="Example input", ... dimensions=("A", "B"), ... storageClass=Exposure, ... name="{foo}Dataset") ... outputConnection = cT.Output(doc="Example output", ... dimensions=("A", "B"), ... storageClass=Exposure, ... name="{foo}output") >>> class ExampleConfig(PipelineTaskConfig, ... pipelineConnections=ExampleConnections): ... pass >>> config = ExampleConfig() >>> config.connections.foo = Modified >>> config.connections.outputConnection = "TotallyDifferent" >>> connections = ExampleConnections(config=config) >>> assert(connections.inputConnection.name == "ModifiedDataset") >>> assert(connections.outputConnection.name == "TotallyDifferent")
Attributes Summary
Mapping holding all connection attributes.
Set with the names of all
InitInputconnection attributes.Set with the names of all
InitOutputconnection attributes.Set with the names of all
connectionTypes.Inputconnection attributes.Set with the names of all
Outputconnection attributes.Set with the names of all
PrerequisiteInputconnection attributes.Methods Summary
adjustQuantum(inputs, outputs, label, data_id)Override to make adjustments to
lsst.daf.butler.DatasetRefobjects in thelsst.daf.butler.Quantumduring the graph generation stage of the activator.buildDatasetRefs(quantum)Build
QuantizedConnectioncorresponding to inputQuantum.Return the names of regular input and output connections whose data IDs should be used to compute the spatial bounds of this task's quanta.
Return the names of regular input and output connections whose data IDs should be used to compute the temporal bounds of this task's quanta.
Attributes Documentation
- allConnections: Mapping[str, BaseConnection] = {}¶
Mapping holding all connection attributes.
This is a read-only view that is automatically updated when connection attributes are added, removed, or replaced in
__init__. It is also updated after__init__completes to reflect changes ininputs,prerequisiteInputs,outputs,initInputs, andinitOutputs.
- initInputs: set[str] = frozenset({})¶
Set with the names of all
InitInputconnection attributes.See
inputsfor additional information.
- initOutputs: set[str] = frozenset({})¶
Set with the names of all
InitOutputconnection attributes.See
inputsfor additional information.
- inputs: set[str] = frozenset({})¶
Set with the names of all
connectionTypes.Inputconnection attributes.This is updated automatically as class attributes are added, removed, or replaced in
__init__. Removing entries from this set will cause those connections to be removed after__init__completes, but this is supported only for backwards compatibility; new code should instead just delete the collection attributed directly. After__init__this will be afrozensetand may not be replaced.
- outputs: set[str] = frozenset({})¶
Set with the names of all
Outputconnection attributes.See
inputsfor additional information.
- prerequisiteInputs: set[str] = frozenset({})¶
Set with the names of all
PrerequisiteInputconnection attributes.See
inputsfor additional information.
Methods Documentation
- adjustQuantum(inputs: dict[str, tuple[lsst.pipe.base.connectionTypes.BaseInput, collections.abc.Collection[lsst.daf.butler._dataset_ref.DatasetRef]]], outputs: dict[str, tuple[lsst.pipe.base.connectionTypes.Output, collections.abc.Collection[lsst.daf.butler._dataset_ref.DatasetRef]]], label: str, data_id: DataCoordinate) tuple[collections.abc.Mapping[str, tuple[lsst.pipe.base.connectionTypes.BaseInput, collections.abc.Collection[lsst.daf.butler._dataset_ref.DatasetRef]]], collections.abc.Mapping[str, tuple[lsst.pipe.base.connectionTypes.Output, collections.abc.Collection[lsst.daf.butler._dataset_ref.DatasetRef]]]]¶
Override to make adjustments to
lsst.daf.butler.DatasetRefobjects in thelsst.daf.butler.Quantumduring the graph generation stage of the activator.- Parameters:
- inputs
dict Dictionary whose keys are an input (regular or prerequisite) connection name and whose values are a tuple of the connection instance and a collection of associated
DatasetRefobjects. The exact type of the nested collections is unspecified; it can be assumed to be multi-pass iterable and supportlenandin, but it should not be mutated in place. In contrast, the outer dictionaries are guaranteed to be temporary copies that are truedictinstances, and hence may be modified and even returned; this is especially useful for delegating tosuper(see notes below).- outputs
Mapping Mapping of output datasets, with the same structure as
inputs.- label
str Label for this task in the pipeline (should be used in all diagnostic messages).
- data_id
lsst.daf.butler.DataCoordinate Data ID for this quantum in the pipeline (should be used in all diagnostic messages).
- inputs
- Returns:
- adjusted_inputs
Mapping Mapping of the same form as
inputswith updated containers of inputDatasetRefobjects. Connections that are not changed should not be returned at all. Datasets may only be removed, not added. Nested collections may be of any multi-pass iterable type, and the order of iteration will set the order of iteration withinPipelineTask.runQuantum.- adjusted_outputs
Mapping Mapping of updated output datasets, with the same structure and interpretation as
adjusted_inputs.
- adjusted_inputs
- Raises:
- ScalarError
Raised if any
InputorPrerequisiteInputconnection hasmultipleset toFalse, but multiple datasets.- NoWorkFound
Raised to indicate that this quantum should not be run; not enough datasets were found for a regular
Inputconnection, and the quantum should be pruned or skipped.- FileNotFoundError
Raised to cause QuantumGraph generation to fail (with the message included in this exception); not enough datasets were found for a
PrerequisiteInputconnection.
Notes
The base class implementation performs important checks. It always returns an empty mapping (i.e. makes no adjustments). It should always called be via
superby custom implementations, ideally at the end of the custom implementation with already-adjusted mappings when any datasets are actually dropped, e.g.:def adjustQuantum(self, inputs, outputs, label, data_id): # Filter out some dataset refs for one connection. connection, old_refs = inputs["my_input"] new_refs = [ref for ref in old_refs if ...] adjusted_inputs = {"my_input", (connection, new_refs)} # Update the original inputs so we can pass them to super. inputs.update(adjusted_inputs) # Can ignore outputs from super because they are guaranteed # to be empty. super().adjustQuantum(inputs, outputs, label_data_id) # Return only the connections we modified. return adjusted_inputs, {}
Removing outputs here is guaranteed to affect what is actually passed to
PipelineTask.runQuantum, but its effect on the larger graph may be deferred to execution, depending on the context in whichadjustQuantumis being run: if one quantum removes an output that is needed by a second quantum as input, the second quantum may not be adjusted (and hence pruned or skipped) until that output is actually found to be missing at execution time.Tasks that desire zip-iteration consistency between any combinations of connections that have the same data ID should generally implement
adjustQuantumto achieve this, even if they could also run that logic during execution; this allows the system to see outputs that will not be produced because the corresponding input is missing as early as possible.
- buildDatasetRefs(quantum: Quantum) tuple[lsst.pipe.base.connections.InputQuantizedConnection, lsst.pipe.base.connections.OutputQuantizedConnection]¶
Build
QuantizedConnectioncorresponding to inputQuantum.- Parameters:
- quantum
lsst.daf.butler.Quantum Quantum object which defines the inputs and outputs for a given unit of processing.
- quantum
- Returns:
- retVal
tupleof (InputQuantizedConnection, OutputQuantizedConnection) Namespaces mapping attribute names (identifiers of connections) to butler references defined in the inputlsst.daf.butler.Quantum.
- retVal
- getSpatialBoundsConnections() Iterable[str]¶
Return the names of regular input and output connections whose data IDs should be used to compute the spatial bounds of this task’s quanta.
The spatial bound for a quantum is defined as the union of the regions of all data IDs of all connections returned here, along with the region of the quantum data ID (if the task has spatial dimensions).
- Returns:
- connection_names
collections.abc.Iterable[str] Names of collections with spatial dimensions. These are the task-internal connection names, not butler dataset type names.
- connection_names
Notes
The spatial bound is used to search for prerequisite inputs that have skypix dimensions. The default implementation returns an empty iterable, which is usually sufficient for tasks with spatial dimensions, but if a task’s inputs or outputs are associated with spatial regions that extend beyond the quantum data ID’s region, this method may need to be overridden to expand the set of prerequisite inputs found.
Tasks that do not have spatial dimensions that have skypix prerequisite inputs should always override this method, as the default spatial bounds otherwise cover the full sky.
- getTemporalBoundsConnections() Iterable[str]¶
Return the names of regular input and output connections whose data IDs should be used to compute the temporal bounds of this task’s quanta.
The temporal bound for a quantum is defined as the union of the timespans of all data IDs of all connections returned here, along with the timespan of the quantum data ID (if the task has temporal dimensions).
- Returns:
- connection_names
collections.abc.Iterable[str] Names of collections with temporal dimensions. These are the task-internal connection names, not butler dataset type names.
- connection_names
Notes
The temporal bound is used to search for prerequisite inputs that are calibration datasets. The default implementation returns an empty iterable, which is usually sufficient for tasks with temporal dimensions, but if a task’s inputs or outputs are associated with timespans that extend beyond the quantum data ID’s timespan, this method may need to be overridden to expand the set of prerequisite inputs found.
Tasks that do not have temporal dimensions that do not implement this method will use an infinite timespan for any calibration lookups.