DcrAssembleCoaddConnections¶
-
class
lsst.pipe.tasks.dcrAssembleCoadd.
DcrAssembleCoaddConnections
(*, config=None)¶ Bases:
lsst.pipe.tasks.assembleCoadd.AssembleCoaddConnections
Attributes Summary
allConnections
brightObjectMask
Class used for declaring PipelineTask prerequisite connections coaddExposure
dcrCoadds
dcrNImages
defaultTemplates
dimensions
initInputs
initOutputs
inputMap
inputWarps
inputs
nImage
outputs
prerequisiteInputs
selectedVisits
skyMap
templateExposure
Methods Summary
adjustQuantum
(inputs, …)Override to make adjustments to lsst.daf.butler.DatasetRef
objects in thelsst.daf.butler.core.Quantum
during the graph generation stage of the activator.buildDatasetRefs
(quantum)Builds QuantizedConnections corresponding to input Quantum Attributes Documentation
-
allConnections
= {'brightObjectMask': PrerequisiteInput(name='brightObjectMask', storageClass='ObjectMaskCatalog', doc='Input Bright Object Mask mask produced with external catalogs to be applied to the mask plane BRIGHT_OBJECT.', multiple=False, dimensions=('tract', 'patch', 'skymap', 'band'), isCalibration=False, deferLoad=False, minimum=1, lookupFunction=None), 'coaddExposure': Output(name='{outputCoaddName}Coadd{warpTypeSuffix}', storageClass='ExposureF', doc='Output coadded exposure, produced by stacking input warps', multiple=False, dimensions=('tract', 'patch', 'skymap', 'band'), isCalibration=False), 'dcrCoadds': Output(name='{fakesType}{outputCoaddName}Coadd{warpTypeSuffix}', storageClass='ExposureF', doc='Output coadded exposure, produced by stacking input warps', multiple=True, dimensions=('tract', 'patch', 'skymap', 'band', 'subfilter'), isCalibration=False), 'dcrNImages': Output(name='{outputCoaddName}Coadd_nImage', storageClass='ImageU', doc='Output image of number of input images per pixel', multiple=True, dimensions=('tract', 'patch', 'skymap', 'band', 'subfilter'), isCalibration=False), 'inputMap': Output(name='{outputCoaddName}Coadd_inputMap', storageClass='HealSparseMap', doc='Output healsparse map of input images', multiple=False, dimensions=('tract', 'patch', 'skymap', 'band'), isCalibration=False), 'inputWarps': Input(name='{inputWarpName}Coadd_{warpType}Warp', storageClass='ExposureF', doc='Input list of warps to be assembled i.e. stacked.Note that this will often be different than the inputCoaddName.WarpType (e.g. direct, psfMatched) is controlled by the warpType config parameter', multiple=True, dimensions=('tract', 'patch', 'skymap', 'visit', 'instrument'), isCalibration=False, deferLoad=True, minimum=1), 'nImage': Output(name='{outputCoaddName}Coadd_nImage', storageClass='ImageU', doc='Output image of number of input images per pixel', multiple=False, dimensions=('tract', 'patch', 'skymap', 'band'), isCalibration=False), 'selectedVisits': Input(name='{outputCoaddName}Visits', storageClass='StructuredDataDict', doc='Selected visits to be coadded.', multiple=False, dimensions=('instrument', 'tract', 'patch', 'skymap', 'band'), isCalibration=False, deferLoad=False, minimum=1), 'skyMap': Input(name='skyMap', storageClass='SkyMap', doc='Input definition of geometry/bbox and projection/wcs for coadded exposures', multiple=False, dimensions=('skymap',), isCalibration=False, deferLoad=False, minimum=1), 'templateExposure': Input(name='{fakesType}{inputCoaddName}Coadd{warpTypeSuffix}', storageClass='ExposureF', doc='Input coadded exposure, produced by previous call to AssembleCoadd', multiple=False, dimensions=('tract', 'patch', 'skymap', 'band'), isCalibration=False, deferLoad=False, minimum=1)}¶
-
brightObjectMask
¶ Class used for declaring PipelineTask prerequisite connections
Parameters: - name :
str
The default name used to identify the dataset type
- storageClass :
str
The storage class used when (un)/persisting the dataset type
- multiple :
bool
Indicates if this connection should expect to contain multiple objects of the given dataset type. Tasks with more than one connection with
multiple=True
with the same dimensions may want to implementPipelineTaskConnections.adjustQuantum
to ensure those datasets are consistent (i.e. zip-iterable) inPipelineTask.runQuantum
and notify the execution system as early as possible of outputs that will not be produced because the corresponding input is missing.- dimensions : iterable of
str
The
lsst.daf.butler.Butler
lsst.daf.butler.Registry
dimensions used to identify the dataset type identified by the specified name- minimum :
bool
Minimum number of datasets required for this connection, per quantum. This is checked in the base implementation of
PipelineTaskConnections.adjustQuantum
, which raisesFileNotFoundError
(causing QuantumGraph generation to fail).PipelineTask
implementations may provide customadjustQuantum
implementations for more fine-grained or configuration-driven constraints, as long as they are compatible with this minium.- lookupFunction: `typing.Callable`, optional
An optional callable function that will look up PrerequisiteInputs using the DatasetType, registry, quantum dataId, and input collections passed to it. If no function is specified, the default temporal spatial lookup will be used.
Raises: - TypeError
Raised if
minimum
is greater than one butmultiple=False
.
Notes
Prerequisite inputs are used for datasets that must exist in the data repository before a pipeline including this is run; they cannot be produced by another task in the same pipeline.
In exchange for this limitation, they have a number of advantages relative to regular
Input
connections:- The query used to find them then during
QuantumGraph
generation can be fully customized by providing alookupFunction
. - Failed searches for prerequisites during
QuantumGraph
generation will usually generate more helpful diagnostics than those for regularInput
connections. - The default query for prerequisite inputs relates the quantum dimensions
directly to the dimensions of its dataset type, without being constrained
by any of the other dimensions in the pipeline. This allows them to be
used for temporal calibration lookups (which regular
Input
connections cannot do at present) and to work aroundQuantumGraph
generation limitations involving cases where naive spatial overlap relationships between dimensions are not desired (e.g. a task that wants all detectors in each visit for which the visit overlaps a tract, not just those where that detector+visit combination overlaps the tract). - Prerequisite inputs may be optional (regular inputs are never optional).
- name :
-
coaddExposure
¶
-
dcrCoadds
¶
-
dcrNImages
¶
-
defaultTemplates
= {'fakesType': '', 'inputCoaddName': 'deep', 'inputWarpName': 'deep', 'outputCoaddName': 'dcr', 'warpType': 'direct', 'warpTypeSuffix': ''}¶
-
dimensions
= {'patch', 'band', 'skymap', 'tract'}¶
-
initInputs
= frozenset()¶
-
initOutputs
= frozenset()¶
-
inputMap
¶
-
inputWarps
¶
-
inputs
= frozenset({'inputWarps', 'templateExposure', 'selectedVisits', 'skyMap'})¶
-
nImage
¶
-
outputs
= frozenset({'dcrNImages', 'dcrCoadds', 'inputMap', 'coaddExposure', 'nImage'})¶
-
prerequisiteInputs
= frozenset({'brightObjectMask'})¶
-
selectedVisits
¶
-
skyMap
¶
-
templateExposure
¶
Methods Documentation
-
adjustQuantum
(inputs: typing.Dict[str, typing.Tuple[BaseInput, typing.Collection[DatasetRef]]], outputs: typing.Dict[str, typing.Tuple[Output, typing.Collection[DatasetRef]]], label: str, data_id: DataCoordinate) → tuple.Tuple[typing.Mapping[str, typing.Tuple[BaseInput, typing.Collection[DatasetRef]]], typing.Mapping[str, typing.Tuple[Output, typing.Collection[DatasetRef]]]]¶ Override to make adjustments to
lsst.daf.butler.DatasetRef
objects in thelsst.daf.butler.core.Quantum
during the graph generation stage of the activator.Parameters: - inputs :
dict
Dictionary whose keys are an input (regular or prerequisite) connection name and whose values are a tuple of the connection instance and a collection of associated
DatasetRef
objects. The exact type of the nested collections is unspecified; it can be assumed to be multi-pass iterable and supportlen
andin
, but it should not be mutated in place. In contrast, the outer dictionaries are guaranteed to be temporary copies that are truedict
instances, and hence may be modified and even returned; this is especially useful for delegating tosuper
(see notes below).- outputs :
Mapping
Mapping of output datasets, with the same structure as
inputs
.- label :
str
Label for this task in the pipeline (should be used in all diagnostic messages).
- data_id :
lsst.daf.butler.DataCoordinate
Data ID for this quantum in the pipeline (should be used in all diagnostic messages).
Returns: - adjusted_inputs :
Mapping
Mapping of the same form as
inputs
with updated containers of inputDatasetRef
objects. Connections that are not changed should not be returned at all. Datasets may only be removed, not added. Nested collections may be of any multi-pass iterable type, and the order of iteration will set the order of iteration withinPipelineTask.runQuantum
.- adjusted_outputs :
Mapping
Mapping of updated output datasets, with the same structure and interpretation as
adjusted_inputs
.
Raises: - ScalarError
Raised if any
Input
orPrerequisiteInput
connection hasmultiple
set toFalse
, but multiple datasets.- NoWorkFound
Raised to indicate that this quantum should not be run; not enough datasets were found for a regular
Input
connection, and the quantum should be pruned or skipped.- FileNotFoundError
Raised to cause QuantumGraph generation to fail (with the message included in this exception); not enough datasets were found for a
PrerequisiteInput
connection.
Notes
The base class implementation performs important checks. It always returns an empty mapping (i.e. makes no adjustments). It should always called be via
super
by custom implementations, ideally at the end of the custom implementation with already-adjusted mappings when any datasets are actually dropped, e.g.:def adjustQuantum(self, inputs, outputs, label, data_id): # Filter out some dataset refs for one connection. connection, old_refs = inputs["my_input"] new_refs = [ref for ref in old_refs if ...] adjusted_inputs = {"my_input", (connection, new_refs)} # Update the original inputs so we can pass them to super. inputs.update(adjusted_inputs) # Can ignore outputs from super because they are guaranteed # to be empty. super().adjustQuantum(inputs, outputs, label_data_id) # Return only the connections we modified. return adjusted_inputs, {}
Removing outputs here is guaranteed to affect what is actually passed to
PipelineTask.runQuantum
, but its effect on the larger graph may be deferred to execution, depending on the context in whichadjustQuantum
is being run: if one quantum removes an output that is needed by a second quantum as input, the second quantum may not be adjusted (and hence pruned or skipped) until that output is actually found to be missing at execution time.Tasks that desire zip-iteration consistency between any combinations of connections that have the same data ID should generally implement
adjustQuantum
to achieve this, even if they could also run that logic during execution; this allows the system to see outputs that will not be produced because the corresponding input is missing as early as possible.- inputs :
-
buildDatasetRefs
(quantum: lsst.daf.butler.core.quantum.Quantum) → Tuple[lsst.pipe.base.connections.InputQuantizedConnection, lsst.pipe.base.connections.OutputQuantizedConnection]¶ Builds QuantizedConnections corresponding to input Quantum
Parameters: - quantum :
lsst.daf.butler.Quantum
Quantum object which defines the inputs and outputs for a given unit of processing
Returns: - retVal :
tuple
of (InputQuantizedConnection
, OutputQuantizedConnection
) Namespaces mapping attribute names (identifiers of connections) to butler references defined in the inputlsst.daf.butler.Quantum
- quantum :
-