CatalogMeasurementBaseConnections

class lsst.faro.base.CatalogMeasurementBaseConnections(*, config: PipelineTaskConfig | None = None)

Bases: MetricConnections

Attributes Summary

allConnections

Mapping holding all connection attributes.

defaultTemplates

dimensions

Set of dimension names that define the unit of work for this task.

initInputs

Set with the names of all InitInput connection attributes.

initOutputs

Set with the names of all InitOutput connection attributes.

inputs

Set with the names of all connectionTypes.Input connection attributes.

measurement

Connection for output dataset.

outputs

Set with the names of all Output connection attributes.

prerequisiteInputs

Set with the names of all PrerequisiteInput connection attributes.

refCat

Class used for declaring PipelineTask prerequisite connections.

Methods Summary

adjustQuantum(inputs, outputs, label, data_id)

Override to make adjustments to lsst.daf.butler.DatasetRef objects in the lsst.daf.butler.core.Quantum during the graph generation stage of the activator.

buildDatasetRefs(quantum)

Build QuantizedConnection corresponding to input Quantum.

Attributes Documentation

allConnections: Mapping[str, BaseConnection] = {'measurement': Output(name='metricvalue_{package}_{metric}', storageClass='MetricValue', doc='The metric value computed by this task.', multiple=False, dimensions={'detector', 'instrument', 'visit'}, isCalibration=False), 'refCat': PrerequisiteInput(name='{refDataset}', storageClass='SimpleCatalog', doc='Reference catalog', multiple=True, dimensions=('skypix',), isCalibration=False, deferLoad=True, minimum=1, lookupFunction=None)}

Mapping holding all connection attributes.

This is a read-only view that is automatically updated when connection attributes are added, removed, or replaced in __init__. It is also updated after __init__ completes to reflect changes in inputs, prerequisiteInputs, outputs, initInputs, and initOutputs.

defaultTemplates = {'metric': None, 'package': None, 'refDataset': ''}
dimensions: set[str] = {'detector', 'instrument', 'visit'}

Set of dimension names that define the unit of work for this task.

Required and implied dependencies will automatically be expanded later and need not be provided.

This may be replaced or modified in __init__ to change the dimensions of the task. After __init__ it will be a frozenset and may not be replaced.

initInputs: set[str] = frozenset({})

Set with the names of all InitInput connection attributes.

See inputs for additional information.

initOutputs: set[str] = frozenset({})

Set with the names of all InitOutput connection attributes.

See inputs for additional information.

inputs: set[str] = frozenset({})

Set with the names of all connectionTypes.Input connection attributes.

This is updated automatically as class attributes are added, removed, or replaced in __init__. Removing entries from this set will cause those connections to be removed after __init__ completes, but this is supported only for backwards compatibility; new code should instead just delete the collection attributed directly. After __init__ this will be a frozenset and may not be replaced.

measurement

Connection for output dataset.

outputs: set[str] = frozenset({'measurement'})

Set with the names of all Output connection attributes.

See inputs for additional information.

prerequisiteInputs: set[str] = frozenset({'refCat'})

Set with the names of all PrerequisiteInput connection attributes.

See inputs for additional information.

refCat

Class used for declaring PipelineTask prerequisite connections.

Parameters:
namestr

The default name used to identify the dataset type

storageClassstr

The storage class used when (un)/persisting the dataset type

multiplebool

Indicates if this connection should expect to contain multiple objects of the given dataset type. Tasks with more than one connection with multiple=True with the same dimensions may want to implement PipelineTaskConnections.adjustQuantum to ensure those datasets are consistent (i.e. zip-iterable) in PipelineTask.runQuantum and notify the execution system as early as possible of outputs that will not be produced because the corresponding input is missing.

dimensionsiterable of str

The lsst.daf.butler.Butler lsst.daf.butler.Registry dimensions used to identify the dataset type identified by the specified name

minimumbool

Minimum number of datasets required for this connection, per quantum. This is checked in the base implementation of PipelineTaskConnections.adjustQuantum, which raises FileNotFoundError (causing QuantumGraph generation to fail). PipelineTask implementations may provide custom adjustQuantum implementations for more fine-grained or configuration-driven constraints, as long as they are compatible with this minium.

lookupFunction: `typing.Callable`, optional

An optional callable function that will look up PrerequisiteInputs using the DatasetType, registry, quantum dataId, and input collections passed to it. If no function is specified, the default temporal spatial lookup will be used.

Raises:
TypeError

Raised if minimum is greater than one but multiple=False.

Notes

Prerequisite inputs are used for datasets that must exist in the data repository before a pipeline including this is run; they cannot be produced by another task in the same pipeline.

In exchange for this limitation, they have a number of advantages relative to regular Input connections:

  • The query used to find them then during QuantumGraph generation can be fully customized by providing a lookupFunction.

  • Failed searches for prerequisites during QuantumGraph generation will usually generate more helpful diagnostics than those for regular Input connections.

  • The default query for prerequisite inputs relates the quantum dimensions directly to the dimensions of its dataset type, without being constrained by any of the other dimensions in the pipeline. This allows them to be used for temporal calibration lookups (which regular Input connections cannot do at present) and to work around QuantumGraph generation limitations involving cases where naive spatial overlap relationships between dimensions are not desired (e.g. a task that wants all detectors in each visit for which the visit overlaps a tract, not just those where that detector+visit combination overlaps the tract).

  • Prerequisite inputs may be optional (regular inputs are never optional).

Methods Documentation

adjustQuantum(inputs: dict[str, tuple[lsst.pipe.base.connectionTypes.BaseInput, collections.abc.Collection[lsst.daf.butler.core.datasets.ref.DatasetRef]]], outputs: dict[str, tuple[lsst.pipe.base.connectionTypes.Output, collections.abc.Collection[lsst.daf.butler.core.datasets.ref.DatasetRef]]], label: str, data_id: DataCoordinate) tuple[collections.abc.Mapping[str, tuple[lsst.pipe.base.connectionTypes.BaseInput, collections.abc.Collection[lsst.daf.butler.core.datasets.ref.DatasetRef]]], collections.abc.Mapping[str, tuple[lsst.pipe.base.connectionTypes.Output, collections.abc.Collection[lsst.daf.butler.core.datasets.ref.DatasetRef]]]]

Override to make adjustments to lsst.daf.butler.DatasetRef objects in the lsst.daf.butler.core.Quantum during the graph generation stage of the activator.

Parameters:
inputsdict

Dictionary whose keys are an input (regular or prerequisite) connection name and whose values are a tuple of the connection instance and a collection of associated DatasetRef objects. The exact type of the nested collections is unspecified; it can be assumed to be multi-pass iterable and support len and in, but it should not be mutated in place. In contrast, the outer dictionaries are guaranteed to be temporary copies that are true dict instances, and hence may be modified and even returned; this is especially useful for delegating to super (see notes below).

outputsMapping

Mapping of output datasets, with the same structure as inputs.

labelstr

Label for this task in the pipeline (should be used in all diagnostic messages).

data_idlsst.daf.butler.DataCoordinate

Data ID for this quantum in the pipeline (should be used in all diagnostic messages).

Returns:
adjusted_inputsMapping

Mapping of the same form as inputs with updated containers of input DatasetRef objects. Connections that are not changed should not be returned at all. Datasets may only be removed, not added. Nested collections may be of any multi-pass iterable type, and the order of iteration will set the order of iteration within PipelineTask.runQuantum.

adjusted_outputsMapping

Mapping of updated output datasets, with the same structure and interpretation as adjusted_inputs.

Raises:
ScalarError

Raised if any Input or PrerequisiteInput connection has multiple set to False, but multiple datasets.

NoWorkFound

Raised to indicate that this quantum should not be run; not enough datasets were found for a regular Input connection, and the quantum should be pruned or skipped.

FileNotFoundError

Raised to cause QuantumGraph generation to fail (with the message included in this exception); not enough datasets were found for a PrerequisiteInput connection.

Notes

The base class implementation performs important checks. It always returns an empty mapping (i.e. makes no adjustments). It should always called be via super by custom implementations, ideally at the end of the custom implementation with already-adjusted mappings when any datasets are actually dropped, e.g.:

def adjustQuantum(self, inputs, outputs, label, data_id):
    # Filter out some dataset refs for one connection.
    connection, old_refs = inputs["my_input"]
    new_refs = [ref for ref in old_refs if ...]
    adjusted_inputs = {"my_input", (connection, new_refs)}
    # Update the original inputs so we can pass them to super.
    inputs.update(adjusted_inputs)
    # Can ignore outputs from super because they are guaranteed
    # to be empty.
    super().adjustQuantum(inputs, outputs, label_data_id)
    # Return only the connections we modified.
    return adjusted_inputs, {}

Removing outputs here is guaranteed to affect what is actually passed to PipelineTask.runQuantum, but its effect on the larger graph may be deferred to execution, depending on the context in which adjustQuantum is being run: if one quantum removes an output that is needed by a second quantum as input, the second quantum may not be adjusted (and hence pruned or skipped) until that output is actually found to be missing at execution time.

Tasks that desire zip-iteration consistency between any combinations of connections that have the same data ID should generally implement adjustQuantum to achieve this, even if they could also run that logic during execution; this allows the system to see outputs that will not be produced because the corresponding input is missing as early as possible.

buildDatasetRefs(quantum: Quantum) tuple[lsst.pipe.base.connections.InputQuantizedConnection, lsst.pipe.base.connections.OutputQuantizedConnection]

Build QuantizedConnection corresponding to input Quantum.

Parameters:
quantumlsst.daf.butler.Quantum

Quantum object which defines the inputs and outputs for a given unit of processing.

Returns:
retValtuple of (InputQuantizedConnection,

OutputQuantizedConnection) Namespaces mapping attribute names (identifiers of connections) to butler references defined in the input lsst.daf.butler.Quantum.