Creating a PipelineTask

So you want to build a PipelineTask, where should you begin? Right here in this guide! This guide will create an example PipelineTask to measure aperture photometry, using progressively more features of Gen3 middleware. So let’s begin.

From Task to PipelineTask

A PipelineTask is at its heart just a Task, so the best place to start in creating a task is to create a Config class.

import numpy as np

import lsst.pipe.base as pipeBase
import lsst.pex.config as pexConfig
import lsst.afw.table as afwTable
import lsst.afw.image as afwImage

from lsst.pipe.base import connectionTypes


class ApertureTaskConfig(pexConfig.Config):
    apRad = pexConfig.Field(doc="Radius of aperture", dtype=int, default=4)

Next, create a Task class that performs the measurements:

class ApertureTask(pipeBase.Task):
    ConfigClass = ApertureTaskConfig
    _DefaultName = "apertureDemoTask"

    def __init__(self, config: pexConfig.Config, *args, **kwargs):
        super().__init__(config, *args, **kwargs)
        self.apRad = self.config.apRad

        self.outputSchema = afwTable.SourceTable.makeMinimalSchema()
        self.apKey = self.outputSchema.addField(
            "apFlux", type=np.float64, doc="Ap flux measured"
        )

        self.outputCatalog = afwTable.SourceCatalog(self.outputSchema)

    def run(
        self, exposure: afwImage.Exposure, inputCatalog: afwTable.SourceCatalog
    ) -> pipeBase.Struct:
        # set dimension cutouts to 3 times the apRad times 2 (for diameter)
        dimensions = (3 * self.apRad * 2, 3 * self.apRad * 2)

        # Get indexes for each pixel
        indy, indx = np.indices(dimensions)

        # Loop over each record in the catalog
        for source in inputCatalog:
            # Create an aperture and measure the flux
            center = Point2I(source.getCentroid())
            center = (center.getY(), center.getX())

            stamp = exposure.image.array[
                center[0] - 3 * self.apRad : center[0] + 3 * self.apRad,
                center[1] - 3 * self.apRad : center[1] + 3 * self.apRad,
            ]
            mask = (
                (indy - center[0]) ** 2 + (indx - center[0]) ** 2
            ) ** 0.5 < self.apRad
            flux = np.sum(stamp * mask)

            # Add a record to the output catalog
            tmpRecord = self.outputCatalog.addNew()
            tmpRecord.set(self.apKey, flux)

        return self.outputCatalog

So now you have a task that takes an exposure and inputCatalog in its run method and returns a new output catalog with apertures measured. As with all Tasks, this will work well for in-memory data products, but you want to read and write datasets so you will need to write a PipelineTask. To do this we need to have our task inherit from PipelineTask primitives instead of base objects. Let’s start by converting our Config class.

class ApertureTaskConfig(pipeBase.PipelineTaskConfig):
    ...

Now however when we try and import the module containing this config, an exception is thrown complaining about a missing PipelineTaskConnections class. In generation 3 middleware Tasks that do IO need to declare what kind of data-products they need, what kind they will produce, and what identifiers are used to fetch that data. Additionally PipelineTaskConnections defined a unit of work over which the task will operate, such as measurements on coadds will work on an individual Tracts, Patches, and Filter. By declaring this information, the generation 3 middleware is able to orchestrate the loading, saving, and running of one or more tasks. With that in mind, let’s create a PipelineTaskConnections class for our new Task. You would expect that this task is going to work on individual ccds, looking at processed calexps and associated measurement catalogs (that were produced by some previous task).

class ApertureTaskConnections(
    pipeBase.PipelineTaskConnections, dimensions=("visit", "detector", "band")
):
    exposure = connectionTypes.Input(
        doc="Input exposure to make measurements on",
        dimensions=("visit", "detector"),
        storageClass="ExposureF",
        name="calexp",
    )
    inputCatalog = connectionTypes.Input(
        doc="Input catalog with existing measurements",
        dimensions=("visit", "detector", "band"),
        storageClass="SourceCatalog",
        name="src",
    )
    outputCatalog = connectionTypes.Output(
        doc="Aperture measurements",
        dimensions=("visit", "detector", "band"),
        storageClass="SourceCatalog",
        name="customAperture",
    )

So what is going on here? The first thing that happens is that ApertureTaskConnections inherits from PipelineTaskConnections, that is fairly standard. What might be new syntax for you is the dimensions=.... This is how we tell the connections class what unit of work a Task that uses this connection class will operate on. The unit of work for a PipelineTask is known as a quantum. In the case of our Task, it will work on a visit (a single image taken by the camera), detector (an individual ccd out of the camera’s mosaic), band (an abstract notion of a filter, say r band, that is not tied to the exact band passes of an individual telescope filter).

Next, take a look at the fields defined on your new connection class. These are defined in a similar way as defining a configuration class, but instead of using Field types from lsst.pex.config, connection classes make use of connection types defined in lsst.pipe.base.connectionTypes. These connections define the inputs and outputs that a PipelineTask will expect to make use of. Each of these connections documents what the connection is, what dimensions represent this data product (in this case they are the same as the task itself, in the Processing multiple datasets of the same dataset type section we will cover when they are different), what kind of storage class represents this data type on disk, and the name of the data type itself. In this connections class you have defined two inputs and an output. The inputs the Task will make use of are calexp calibrated exposures and src catalogs, both of which are produced by the CalibrateTask during single frame processing. Our Task will produce a new SourceCatalog of aperture measurements and save it out with a dataset type we have named customAperture.

So now you have a connections class, how do you use it? The good news is that you only need to make one small change to our ApertureTaskConfig, passing the connections class in as a parameter to the configuration class declaration.

class ApertureTaskConfig(
    pipeBase.PipelineTaskConfig, pipelineConnections=ApertureTaskConnections
):
    ...

That’s it. All the rest of the Config class stays the same. Below, examples demonstrate what this change does, and how to use it, but first take a look at what changes when you turn a Task into a PipelineTask.

class ApertureTask(pipeBase.PipelineTask):
    ...

    def run(
        self, exposure: afwImage.Exposure, inputCatalog: afwTable.SourceCatalog
    ) -> pipeBase.Struct:
        ...
        return pipeBase.Struct(outputCatalog=self.outputCatalog)

In a simple PipelineTask like this, these are all the changes that need to be made. Firstly the base class to is changed to PipelineTask. This inheritance provides all the base machinery that the middleware will need to run this task. The second change you need to make a task into a PipelineTask is to change the signature of the run method. A run method in a PipelineTask must return a lsst.pipe.base.Struct object whose field names correspond to the names of the outputs defined in the connection class. In our connection class we defined the output collection with the identifier outputCatalog, so in our returned Struct has a field with that name as well. Another thing worth highlighting, though it was not a change that was made, is the names of the arguments to the run method. These names also must (and do) correspond to the identifiers used for the input connections. The names of the variables of the inputs and outputs are how the PipelineTask activator maps connections into the in-memory data products that the algorithm requires.

The complete source integrating these changes can be used in Appendix A.

Arguments and returns for __init__

In this exercise you will make this task a little bit more advanced. Instead of creating a new catalog that only contains the aperture flux measurements, you will create a catalog that contains all the input data, and add an additional column for the aperture measurements. To do this you will need the schema for the input catalog in the init method so that you can correctly construct our output catalog to contain all the appropriate fields. This is accomplished by using the InitInput connection type in the connection class. Data products specified with this connection type will be provided to the __init__ method of a task in a dictionary named initInputs when the task is executed. Take a look what the connection class looks like.

class ApertureTaskConnections(
    pipeBase.PipelineTaskConnections, dimensions=("visit", "detector", "band")
):
    inputSchema = connectionTypes.InitInput(
        doc="Schema associated with a src catalog",
        storageClass="SourceCatalog",
        name="src_schema",
    )
    ...


class ApertureTask(pipeBase.PipelineTask):
    ...

    def __init__(self, config: pexConfig.Config, initInput: Mapping, *args, **kwargs):
        ...
        inputSchema = initInput["inputSchema"].schema

        # Create a camera mapper to create a copy of the input schema
        self.mapper = afwTable.SchemaMapper(inputSchema)
        self.mapper.addMinimalSchema(inputSchema, True)

        # Add the new field
        self.apKey = self.mapper.editOutputSchema().addField(
            "apFlux", type=np.float64, doc="Ap flux measured"
        )

        # Get the output schema
        self.schema = self.mapper.getOutputSchema()

        # create the catalog in which new measurements will be stored
        self.outputCatalog = afwTable.SourceCatalog(self.schema)

    def run(
        self, exposure: afwImage.Exposure, inputCatalog: afwTable.SourceCatalog
    ) -> pipeBase.Struct:
        # Add in all the records from the input catalog into what will be the
        # output catalog
        self.outputCatalog.extend(inputCatalog, mapper=self.mapper)

        # set dimension cutouts to 3 times the apRad times 2 (for diameter)
        dimensions = (3 * self.apRad * 2, 3 * self.apRad * 2)

        # Get indexes for each pixel
        indy, indx = np.indices(dimensions)

        # Loop over each record in the catalog
        for source in inputCatalog:
            # Create an aperture and measure the flux
            center = Point2I(source.getCentroid())
            center = (center.getY(), center.getX())
            # Create a cutout
            stamp = exposure.image.array[
                center[0] - 3 * self.apRad : center[0] + 3 * self.apRad,
                center[1] - 3 * self.apRad : center[1] + 3 * self.apRad,
            ]
            mask = (
                (indy - center[0]) ** 2 + (indx - center[0]) ** 2
            ) ** 0.5 < self.apRad
            flux = np.sum(stamp * mask)

            # Set the flux field of this source
            source.set(self.apKey, flux)

        return pipeBase.Struct(outputCatalog=self.outputCatalog)

These changes allow you to load in and use schemas to initialize your task before any actual data is loaded to be passed to the algorithm code located in the run method. Inside the run method, the output catalog copies all the records from the input into itself. The loop can the go over the output catalog, and insert the new measurements right into the output catalog.

One thing to note about InitInput connections is that they do not take any dimensions. This is because the sort of data loaded will correspond to a given dataset type produced by a task, and not by (possibly multiple) executions of a run method over datasets that have dimensions. In other words, these datasets are unique to the task itself and not tied to the unit of work that the task operates on.

In the same way the input schema from some previous stage of processing was added, the output schema for your task should also be persisted so some other PipelineTask can make use of it. To do this you should add an InitOutput connection to your connection class.

class ApertureTaskConnections(
    pipeBase.PipelineTaskConnections, dimensions=("visit", "detector", "band", "skymap")
):
    ...
    outputSchema = connectionTypes.InitOutput(
        doc="Schema created in Aperture PipelineTask",
        storageClass="SourceCatalog",
        name="customAperture_schema",
    )

    ...


class ApertureTask(pipeBase.PipelineTask):
    ...

    def __init__(self, config: pexConfig.Config, initInput: Mapping, *args, **kwargs):
        ...
        # Get the output schema
        self.schema = mapper.getOutputSchema()

        # Create the output catalog
        self.outputCatalog = afwTable.SourceCatalog(self.schema)

        # Put the outputSchema into a SourceCatalog container. This var name
        # matches an initOut so will be persisted
        self.outputSchema = afwTable.SourceCatalog(self.schema)

In the init method we associate the variable we would like to output with a name that matches the variable name used in the connection class. The activator uses this shared name to know that variable should be persisted.

The complete updated example can be found in Appendix B.

Optional and Dynamic Connections

Sometimes it is useful to have a task that optionally uses a dataset. In the case of the example task you have been building, this might be a background model that was previously removed. You may want your task to add back in the background so that it can do a new local background estimate. To start add the background dataset to our connection class like you did for your other datasets.

class ApertureTaskConnections(
    pipeBase.PipelineTaskConnections, dimensions=("visit", "detector", "band")
):
    ...
    background = connectionTypes.Input(
        doc="Background model for the exposure",
        storageClass="Background",
        name="calexpBackground",
        dimensions=("visit", "detector", "band"),
    )
    ...

Now your PipelineTask will load the background each time the task is run. How do you make this optional? The simplest approach is to add a configuration field in your config class to allow the user to specify if it is to be loaded:

class ApertureTaskConfig(
    pipeBase.PipelineTaskConfig, pipelineConnections=ApertureTaskConnections
):
    ...
    doLocalBackground = pexConfig.Field(
        doc="Should the background be added before doing photometry",
        dtype=bool,
        default=False,
    )

The __init__ method of the connection class is given an instance of the task’s configuration class after all overrides have been applied. This provides you an opportunity change the behavior of the connection class according to various configuration options.

class ApertureTaskConnections(
    pipeBase.PipelineTaskConnections, dimensions=("visit", "detector", "band", "skymap")
):
    ...
    background = ct.Input(
        doc="Background model for the exposure",
        storageClass="Background",
        name="calexpBackground",
        dimensions=("visit", "detector", "band", "skymap"),
    )
    ...

    def __init__(self, *, config=None):
        if not config.doLocalBackground:
            del self.background

Your connection class now looks at the value of doLocalBackground on the config object and if it is False, removes that attribute. Attributes can also be replaced entirely to change (e.g.) their dimensions, storage class, or other properties (connection objects are immutable, so they cannot be modified in place). New connection attributes can even be added programmatically in __init__, but these cannot have configurations for their dataset type names added automatically, and are recommended only when it is not possible to define a placeholder in the connections class to be modified later, such as when even the number of connections is dynamic. The dimensions of the task itself can also be modified in __init__, where self.dimensions is a mutable set that can be modified in place or replaced completely.

Note

In previous versions, connection attributes could not be deleted or modified directly, and the preferred pattern was to instead remove the name of the connection from a special set, after first delegating to super:

def __init__(self, *, config=None):
    super().__init__(config=config)
    if not config.doLocalBackground:
        self.inputs.remove("background")

This still works, but it is more verbose and less intuitive than the now-recommended attribute-manipulation approach. It is now no longer to delegate to super in either case, since the base class super does nothing (the first steps of initialization are instead handled by a metaclass).

The last step in modifying this task will be to update the run method to take into account that a background may or may not be supplied.

...
import lsst.afw.math as afwMath

...


class ApertureTask(pipeBase.PipelineTask):
    ...

    def run(
        self,
        exposure: afwImage.Exposure,
        inputCatalog: afwTable.SourceCatalog,
        background: afwMath.BackgroundList | None = None,
    ) -> pipeBase.Struct:
        # If a background is supplied, add it back to the image so local
        # background subtraction can be done.
        if background is not None:
            exposure.image.array += background.image

        ...

        # Loop over each record in the catalog
        for source in outputCatalog:
            ...
            distance = ((indy - center[0]) ** 2 + (indx - center[0]) ** 2) ** 0.5
            mask = distance < self.apRad
            flux = np.sum(stamp * mask)

            # Do local background subtraction
            if background is not None:
                outerAn = distance < 2.5 * self.apRad
                innerAn = distance < 1.5 * self.apRad
                annulus = outerAn - innerAn
                localBackground = np.mean(exposure.image.array * annulus)
                flux -= np.sum(mask) * localBackground

            ...

        return pipeBase.Struct(outputCatalog=self.outputCatalogs)

The run method now takes an argument named background, which defaults to a value of None. If the connection is removed from the connection class, there will be no argument passed to run with that name. Conversely, when the connection is present, the background is un-persisted by the butler, and is passed on to the run method. The body of the run method checks if the background has been passed, and if so adds it back in and does a local background subtraction.

To bring this all together, see Appendix C

Dataset name configuration and templates

Now that you have the option to control results of processing with a configuration option (turning on and off local background subtraction) it may be useful for a user who turns on local background subtraction to change the name of the dataset produced so as to tell what the configuration was without looking at the persisted configurations. The user may make a configuration override file that looks something like the following:

config.doLocalBackground = True
config.connections.outputSchema = "customAperture_localBg_schema"
config.connections.outputCatalog = "customAperture_localBg"

This config file introduces the special attribute connections that exists on every PipelineTaskConfig class. This attribute is dynamically built from the linked PipelineTaskConnections class. The connections attribute is a sub-config that has Fields corresponding to the variable names of the connections, with values of those fields corresponding to the name of a connection. So by default config.connections.outputCatalog would be customAperture and config.connections.exposure would be calexp etc. Assigning to these Fields has the effect of changing the name of the dataset type defined in the connection.

In this config file you are changing the name of the dataset type that will be persisted to include the information that local background subtraction was done. It is interesting to note that there are no hard coded dataset type names that must be adhered to, the user is free to pick any name. The only consequence of changing a dataset type name, is that any downstream code that is to use the output dataset must have its default name changed to match. As an aside, pipetask requires that the first time a dataset name is used the activator command is run with the --register-dataset-types switch. This is to prevent accidental typos becoming new dataset types.

Looking at the config file, there are two different Fields that are being set to the same value. In the case of other tasks this number may even be higher. This leads not only to the issue of config overrides needing to potentially be lengthy, but that there may be a typo, and the fields will be set inconsistently. PipelineTask tasks address this by providing a way to template dataset type names.

class ApertureTaskConnections(
    pipeBase.PipelineTaskConnections,
    defaultTemplates={"outputName": "customAperture"},
    dimensions=("visit", "detector", "band"),
):
    ...

    outputCatalog = connectionTypes.Output(
        doc="Aperture measurements",
        dimensions=("visit", "detector", "band"),
        storageClass="SourceCatalog",
        name="{outputName}",
    )
    ...
    outputSchema = connectionTypes.InitOutput(
        doc="Schema created in Aperture PipelineTask",
        storageClass="SourceCatalog",
        name="{outputName}_schema",
    )

In the modified connection class, the outputSchema and outputCatalog connections now have python format strings, which are referred to with the template name outputName. This template will be formatted with a specified string to become a dataset type name prior to any data being read and supplied to the task. You may have noticed the class declaration also has a new argument: defaultTemplates. The strings defined inside defaultTemplates are what will be used to format the name string if a user does not provide any overrides. The defaults are supplied as a Python dictionary of template identifiers as keys, and default strings as values. If there are any templates used in a connection class, you must supply a default template for each template identifier. A TypeError will be thrown if you attempt to import a module containing a PipelineTaskConnections class that does not have defaults for all the defined templates.

With these changes, have a look at how your configuration override file changes.

config.doLocalBackground = True
config.connections.outputName = "customAperture_localBg"

The connections sub-config now contains a Field called outputName, the same as your template identifier. Each template identifier will have a corresponding field on the connections sub-config. Setting the value on these configs has the effect of setting the templates wherever they are used.

Setting a template config field does not preclude you from also setting the name of a dataset type directly. This may be useful in PipelineTasks with templates used in lots of places. Though not needed in this example, such a config would look something like the following:

config.doLocalBackground = True
config.connections.outputName = "customAperture_localBg"
config.connections.outputSchema = "different_name_schema"

View the complete code in Appendix D

Prerequisite inputs

Some tasks make use of datasets that are created outside the processing environment of the Science Pipelines. These may be things like reference catalogs, bright star masks, and some calibrations. To account for this PipelineTasks have a special type of connection called, PrerequisiteInput. This type of input tells the execution system, that this is a special type of dataset, and not to expect it to be produced anywhere in a processing pipeline. If this dataset is not found, the system will raise a hard error and tell you the dataset type is missing instead of inferring that there is some processing step that is missing.

These connections are specified the same way as any input, but with a different connection type name. In the example task you are creating, you will add in a prerequisite type on a mask, that is assumed to be human-created. If a source center happens to fall in a masked area, no aperture photometry will be performed. Because this is almost the same as an Input connection type, the complete updated example is shown in Appendix E, in lieu of building it up a piece at a time.

Prerequisite inputs have an additional attribute named lookupFunction. This attribute is optional and can be used to manually lookup datasets associated with a prerequisite input. Using a lookupFunction is an advanced feature that should only be needed very rarely. As such, they are beyond the scope of this tutorial, and are only brought up for completeness sake.

Processing multiple datasets of the same dataset type

The dimensions you have used in your task up to this point have specified that the unit of processing is to be done on individual detectors on a per-visit basis. This makes sense, as this is a natural way to parallelize this data. However, sometimes it is useful to consider an entire focal plane at a time, or some other larger scale concept, like a tract. By changing the dimensions of the unit of processing for your task, you change what sort of inputs your task will expect. These changes are reflected in the connections class as follows.

class ApertureTaskConnections(
    pipeBase.PipelineTaskConnections,
    defaultTemplates={"outputName": "customAperture"},
    dimensions=("visit", "band"),
):
    exposures = connectionTypes.Input(
        doc="Input exposure to make measurements on",
        dimensions=("visit", "detector", "band"),
        storageClass="ExposureF",
        name="calexp",
        multiple=True,
    )
    backgrounds = connectionTypes.Input(
        doc="Background model for the exposure",
        storageClass="Background",
        name="calexpBackground",
        dimensions=("visit", "detector", "band"),
        multiple=True,
    )
    inputCatalogs = connectionTypes.Input(
        doc="Input catalog with existing measurements",
        dimensions=(
            "visit",
            "detector",
            "band",
        ),
        storageClass="SourceCatalog",
        name="src",
        multiple=True,
    )
    outputCatalogs = connectionTypes.Output(
        doc="Aperture measurements",
        dimensions=("visit", "detector", "band"),
        storageClass="SourceCatalog",
        name="{outputName}",
        multiple=True,
    )
    ...
    areaMasks = connectionTypes.PrerequisiteInput(
        doc="A mask of areas to be ignored",
        storageClass="Mask",
        dimensions=("visit", "detector", "band"),
        name="ApAreaMask",
        multiple=True,
    )
    ...

The dimensions of your ApertureTaskConnections class are now visit and band. However, all of your input datasets are themselves still defined over each of these dimensions, and also detector. That is to say you get one calexp for ever unique combination of exposure‘s dimensions. Because the tasks’s dimensions are a more inclusive set of dimensions (less specified) you should expect that for a given unit of processing, there will be multiple values for each of the input dataset types along the detector dimension. For example, in LSST there will be 189 detectors in each visit. You indicate to the execution framework that you expect there to be a list of datasets for each connection by adding multiple=True to its declaration. This ensures the values passed will be inside of a list container.

As a caveat, depending on the exact data that has been ingested/processed, there may only be one dataset that matches this combination of dimensions (i.e. only one raw was ingested for a visit) but the multiple flag will still ensure that the system passes this one dataset along inside contained inside a list. This ensures a uniform api to program against. Make note that the connection variable names change to add an s on connections marked with multi to reflect that they will potentially contain multiple values.

With this in mind, go ahead and make the changes needed accommodate multiple datasets in each inputs to the run method. Because this task is inherently parallel over detectors, these modifications are not the most natural way to code this behavior, but are done to demonstrate how to make use of the multi flag for situations that are not so trivial.

class ApertureTask(pipeBase.PipelineTask):
    ...

    def run(
        self,
        exposures: List[afwImage.Exposure],
        inputCatalogs: List[afwTable.SourceCatalog],
        areaMasks: List[afwImage.Mask],
        backgrounds: list[afwMath.BackgroundList] | None = None,
    ) -> pipeBase.Struct:
        # Track the length of each catalog as to know which exposure to use
        # in later processing
        cumulativeLength = 0
        lengths = []

        # Add in all the input catalogs into the output catalog
        for inCat in inputCatalogs:
            self.outputCatalog.extend(inCat, mapper=self.mapper)
            lengths.append(len(inCat) + cumulativeLength)
            cumulativeLength += len(inCat)

        ...

        # track which image is being used
        imageIndex = 0
        exposure = exposures[imageIndex]
        areaMask = areaMasks[imageIndex]
        background = areaMasks[imageIndex]

        # Loop over each record in the catalog
        for i, source in enumerate(self.outputCatalog):
            # get the associated exposure
            if i >= lengths[imageIndex]:
                # only update if this is not the last index
                if imageIndex < len(lengths) - 1:
                    imageIndex += 1
                exposure = exposures[imageIndex]
                areaMask = areaMasks[imageIndex]
                background = areaMasks[imageIndex]
            # If a background is supplied, add it back to the image so local
            # background subtraction can be done.
            if backgrounds is not None:
                exposure.image.array += background.image

            ...

            # Skip measuring flux if the center of a source is in a masked
            # pixel
            if areaMask.array[center[0], center[1]] != 0:
                ...

            ...

            # Do local background subtraction
            if backgrounds is not None:
                ...

            ...

        return pipeBase.Struct(outputCatalog=self.outputCatalog)

The signature of the run method now reflects the change in the names of the connections. Next, the output catalog must be extended to include all the supplied input catalogs. When the loop proceeds through the list of output sources, it checks if that source as come from the next inputCatalog in the list. If it has then a counter reflecting which image arrays to use is updated to match. The specifics of this code are not as important as the way in which they use the variables supplied to run, namely that the arguments are now lists of variables that each need handled.

The complete example can be found in Appendix F.

Deferred Loading

An astute eye will notice that this code will potentially consume a large amount of memory. Loading in every catalog, image, background, and mask for an entire LSST focal plain will put a lot of pressure on the memory of the computer running this code. Fortunately the middleware gives you a way to lighten this load. You can add an argument to a connection in our connection class that informs the middleware to not load the data at the time of running, but supply a variable that allows a task to load the data when the task needs it. This argument is called deferLoad, and takes a boolean value which is by default false. Take a look at the connection class with this in place.

class ApertureTaskConnections(
    pipeBase.PipelineTaskConnections,
    defaultTemplates={"outputName": "customAperture"},
    dimensions=("visit", "band"),
):
    exposures = connectionTypes.Input(
        doc="Input exposure to make measurements on",
        dimensions=("visit", "detector", "band"),
        storageClass="ExposureF",
        name="calexp",
        multiple=True,
        deferLoad=True,
    )
    backgrounds = connectionTypes.Input(
        doc="Background model for the exposure",
        storageClass="Background",
        name="calexpBackground",
        dimensions=("visit", "detector", "band"),
        multiple=True,
        deferLoad=True,
    )
    inputCatalogs = connectionTypes.Input(
        doc="Input catalog with existing measurements",
        dimensions=(
            "visit",
            "detector",
            "band",
        ),
        storageClass="SourceCatalog",
        name="src",
        multiple=True,
        deferLoad=True,
    )
    ...
    areaMasks = connectionTypes.PrerequisiteInput(
        doc="A mask of areas to be ignored",
        storageClass="Mask",
        dimensions=("visit", "detector", "band"),
        name="ApAreaMask",
        multiple=True,
        deferLoad=True,
    )
    ...

Take a look at how the run method changes to make use of this.

class ApertureTask(pipeBase.PipelineTask):
    ...

    def run(
        self,
        exposures: List[afwImage.Exposure],
        inputCatalogs: List[afwTable.SourceCatalog],
        areaMasks: List[afwImage.Mask],
        backgrounds: list[afwMath.BackgroundList] | None = None,
    ) -> pipeBase.Struct:
        ...

        # track which image is being used
        imageIndex = 0
        exposure = exposures[imageIndex].get()
        areaMask = areaMasks[imageIndex].get()
        background = areaMasks[imageIndex].get()

        # Loop over each record in the catalog
        for i, source in enumerate(self.outputCatalog):
            # get the associated exposure
            if i >= lengths[imageIndex]:
                # only update if this is not the last index
                if imageIndex < len(lengths) - 1:
                    imageIndex += 1
                exposure = exposures[imageIndex].get()
                areaMask = areaMasks[imageIndex].get()
                background = areaMasks[imageIndex].get()
            ...

        return pipeBase.Struct(outputCatalog=self.outputCatalog)

In this modified run method the only code addition is the use of the get method on input arguments. When a connection is marked with deferLoad, the middleware will supply an DeferredDatasetHandle. This handle has a get method which loads and returns the object specified by the handle. The get` method also optionally supports a parameters argument that can be used in the same manor as a normal Butler get call. This allows things like fetching only part of an image, loading only the wcs from an exposure, etc. See Butler documentation for more info on the parameters argument.

Appendix G is now the complete example for this code.

Checking and altering quanta before execution

When a QuantumGraph is first created, the tasks it will run are given an opportunity to make limited adjustments and raise exceptions if a quantum does not meet its needs in more subtle ways, via a call to PipelineTaskConnections.adjustQuantum. This method is also called just before each quantum is executed, since predecessor quanta may not have actually produced some of the outputs they were predicted to, and that will change the inputs that are available to the current quantum.

For the vast majority of PipelineTask subclasses, the default implementation provided by the base class should be adequate, and even classes that wish to override it should always delegate to super, as the base class implementation performs the checks implied by the multiple and minumum arguments to connection fields. Overriding adjustQuantum is most useful for tasks that have more than one connection with multiple=True, and the data IDs of these connections are closely related, such as when the presence of one input dataset implies the production of a corresponding output dataset. That’s exactly what should happens with ApertureTask now; it should produce an outputCatalog dataset for a data ID only when all input datasets for that data ID are present:

class ApertureTaskConnections(
    pipeBase.PipelineTaskConnections,
    defaultTemplates={"outputName": "customAperture"},
    dimensions=("visit", "detector", "band"),
):
    ...

    def adjustQuantum(self, inputs, outputs, label, data_id):
        # Find the data IDs common to all multiple=True inputs.
        input_names = ("exposures", "inputCatalogs", "backgrounds")
        inputs_by_data_id = []
        for name in input_names:
            inputs_by_data_id.append({ref.dataId: ref for ref in inputs[name][1]})
        # Intersection looks messy because dict_keys only supports |.
        # not an "intersection" method.
        data_ids_to_keep = functools.reduce(
            operator.__and__, (d.keys() for d in inputs_by_data_id)
        )
        # Pull out just the DatasetRefs that are in common in the inputs
        # and order them consistently (note that consistent ordering is not
        # automatic).
        adjusted_inputs = {}
        for name, refs in zip(input_names, inputs_by_data_id):
            adjusted_inputs[name] = (
                inputs[name][0],
                [refs[data_id] for data_id in data_ids_to_keep],
            )
            # Also update the full dict of inputs, so we can pass it to
            # super() later.
            inputs[name] = adjusted_inputs[name]
        # Do the same for the outputs.
        outputs_by_data_id = {ref.dataId: ref for ref in outputs["outputCatalogs"][1]}
        adjusted_outputs = {
            "outputCatalogs": (
                outputs["outputCatalogs"][0],
                [outputs_by_data_id[data_id] for data_id in data_ids_to_keep],
            )
        }
        outputs["outputCatalogs"] = adjusted_outputs["outputCatalogs"]
        # Delegate to super(); ignore results because they are guaranteed
        # to be empty.
        super().adjustQuantum(inputs, outputs, label, data_id)
        return adjusted_inputs, adjusted_outputs

See the documentation for adjustQuantum for additional details on overriding it.

The aggregated task can be seen in Appendix H

Overriding Task execution

Overriding the PipelineTask method runQuantum is another advanced tool. This method is used in task execution and is supplied identifiers for input dataset references to be used in processing, and output dataset references for datasets the middleware frame work expects to be written at the end of processing. The runQuantum method is responsible for fetching inputs, calling the run method, and writing outputs using the supplied data ids. After an introduction to the default implementation of runQuantum, this guide will talk about variations that will all accomplish the same thing, but in different ways. This will hopefully give some introduction to what is possible with runQuantum and some ideas as to why you many need to override it. Take a look at how runQuantum is defined in PipelineTask.

class PipelineTask(Task):
    ...

    def runQuantum(
        self,
        butlerQC: QuantumContext,
        inputRefs: InputQuantizedConnection,
        outputRefs: OutputQuantizedConnection,
    ):
        inputs = butlerQC.get(inputRefs)
        outputs = self.run(**inputs)
        butlerQC.put(outputs, outputRefs)

    ...

If this looks pretty straight forward to you, then great! This function is tightly packed, but short to make the barrier to overloading it as low as possible.

The first argument is named butlerQC which may cause you to wonder how it relates to a butler. If you are paying attention, you might know from the type annotation that the QC stands for QuantumContext, but that does not really give many clues does it? A QuantumContext object is simply a Butler that has special information about the unit of data your task will be processing, i.e. the quantum, as well as extra functionality attached to it.

DatasetRefs that can be used to interact with specific datasets managed by the butler. See the description of the InputQuantizedConnection type in the section Checking and altering quanta before execution for more information on QuantizedConnections, noting that OutputQuantizedConnection functions in the same manner but with output lsst.daf.butler.DatasetRefs.

The butlerQC object has a get method that knows how understand the structure of an InputQuantizedConnection and load in all of the inputs supplied in the inputRefs. However, the get method also understand lists or single instances of DatasetRefs. Likewise the butlerQC object also has a put method that mirrors all the capabilities of the get method, put for putting outputs into the butler.

For examples sake, add a runQuantum method to your photometry task that loads in all the input references one connection at a time. Your task only expects to write a single dataset out, so the runQuantum will also put with that single lsst.daf.butler.DatasetRef.

class ApertureTask(pipeBase.PipelineTask):
    ...

    def runQuantum(
        self,
        butlerQC: pipeBase.QuantumContext,
        inputRefs: pipeBase.InputQuantizedConnection,
        outputRefs: pipeBase.OutputQuantizedConnection,
    ):
        inputs = {}
        for name, refs in inputRefs:
            inputs[name] = butlerQC.get(refs)
        output = self.run(**inputs)
        butlerQC.put(output, outputRefs.OutputCatalog)

Overriding runQuantum also provides the opportunity to do a transformation on input data, or some other related calculation. This allows the run method to have a convenient interface for user interaction within a notebook or shell, but still match the types of input PipelineTasks will get when run by the middleware system.

To demonstrate this, modify the runQuantum and run methods in such a way that the output catalog of the task is already pre-populated with all of the input catalogs. The user then only needs to supply the lengths of each of the input catalogs that went in to creating the output catalog. This change is likely not worth doing in a production PipelineTask but is perfect for demoing the concepts here.

...


def runQuantum(
    self,
    butlerQC: pipeBase.ButlerQuantumContext,
    inputRefs: pipeBase.InputQuantizedConnection,
    outputRefs: pipeBase.OutputQuantizedConnection,
):
    inputs = {}
    for name, refs in inputRefs:
        inputs[name] = butlerQC.get(refs)

    # Record the lengths of each input catalog
    lengths = []

    # Remove the input catalogs from the list of inputs to the run method
    inputCatalogs = inputs.pop("inputCatalogs")

    # Add in all the input catalogs into the output catalog
    cumulativeLength = 0
    for inCatHandle in inputCatalogs:
        inCat = inCatHandle.get()
        lengths.append(len(inCat) + cumulativeLength)
        cumulativeLength += len(inCat)
        self.outputCatalog.extend(inCat, mapper=self.mapper)

    # Add the catalog lengths to the inputs to the run method
    inputs["lengths"] = lengths
    output = self.run(**inputs)
    butlerQC.put(output, outputRefs.OutputCatalog)


def run(
    self,
    exposures: List[afwImage.Exposure],
    lengths: List[int],
    areaMasks: List[afwImage.Mask],
    backgrounds: list[afwMath.BackgroundList] | None = None,
) -> pipeBase.Struct:
    ...

Adding additional logic to a PipelineTask in this way is very powerful, but should be using sparingly and with great thought. Putting logic in runQuantum not only makes it more difficult to follow the flow of the algorithm, but creates the need for duplication of that logic in contexts where the run method is called outside of PipelineTask execution.

See Appendix I for the complete example.

Appendix A

Appendix B

Appendix C

Appendix D

Appendix E

Appendix F

Appendix G

Appendix H

Appendix I