Testing a pipeline task

This document describes how to write unit tests for a pipeline task (i.e., a subtask of lsst.pipe.base.PipelineTask). It covers testing of functionality specific to PipelineTask but not ordinary tasks, including:

This guide does not cover writing a PipelineTask from scratch, nor testing of a task’s core functionality (e.g., whether it processes images correctly).

The examples in this guide make heavy use of the test Butler framework described in Using the Butler in unit tests.

Overview

The lsst.pipe.base.testUtils module provides tools for testing pipeline tasks. The tools are provided as stand-alone functions rather than as a special class like lsst.utils.tests.TestCase to make them easier for developers to mix and match as needed for their specific tests. Many of the tools provide no testing functionality directly, instead providing the infrastructure to run PipelineTask-related code inside test environments.

Most tools require a real data repository to read task inputs from (and possibly write outputs to). See Using the Butler in unit tests for one way to create a repository in test cases.

Testing runQuantum

Many pipeline tasks override lsst.pipe.base.PipelineTask.runQuantum to handle unusual inputs or data types. runQuantum may contain complex logic such as data ID manipulation, extra arguments to run, or default values. This logic cannot be tested without calling runQuantum, but the input arguments are difficult to set up without knowledge of the daf_butler package.

The lsst.pipe.base.testUtils.runTestQuantum function wraps a call to the runQuantum method so that the user need only provide the task object, a Butler, and a Quantum (which can be generated by calling lsst.pipe.base.testUtils.makeQuantum). The runTestQuantum call can then be tested for particular behavior (e.g., raising exceptions, writing particular datasets, etc.)

Because the processing done by run is potentially very slow, by default lsst.pipe.base.testUtils.runTestQuantum replaces runQuantum’s internal call(s) to run with a unittest.mock.Mock object. It returns the mock, which can be tested for what would have been run using methods like assert_called_with. Such a test can be combined with separate unit tests of how run handles different inputs to get complete coverage of the task code.

If you do need runQuantum to call run (for example, because the test needs real outputs written to the repository), setting the mockRun=False argument will restore the normal behavior.

import lsst.daf.butler.tests as butlerTests
from lsst.pipe.base import testUtils

# A minimal Butler repo, see daf_butler documentation
dimensions = {
    "instrument": ["notACam"],
    "visit": [101, 102],
    "detector": [42],
}
repo = butlerTests.makeTestRepo(tempDir, dimensions)
butlerTests.addDatasetType(
    repo, "InputType", {"instrument", "visit", "detector"},
    "ExposureF")
butlerTests.addDatasetType(
    repo, "OutputType", {"instrument", "visit", "detector"},
    "ExposureF")

...

# Set up what we need
dataId = {"instrument": "notACam", "visit": 101, "detector": 42}
butler = butlerTests.makeTestCollection(repo)
task = AwesomeTask()
quantum = testUtils.makeQuantum(
    task, butler, dataId,
    {key: dataId for key in {"input", "output"}})
run = testUtils.runTestQuantum(task, butler, quantum)
# Actual input dataset omitted for simplicity
run.assert_called_once()

Testing run Output

A pipeline task must return a Struct whose fields include any outputs reported by its PipelineTaskConnections class.

The lsst.pipe.base.testUtils.assertValidOutput function takes a task object and a Struct and confirms that the latter conforms to the former’s connections. Currently, it tests for missing fields and mixing up vector and scalar values; more tests may be added in the future.

import lsst.daf.butler.tests as butlerTests
from lsst.pipe.base import connectionTypes, PipelineTask, \
    PipelineTaskConnections
from lsst.pipe.base import testUtils


class MyConnections(
        PipelineTaskConnections,
        dimensions=("instrument", "visit", "detector")):
    image = connectionTypes.Output(
        name="calexp",
        storageClass="ExposureF",
        dimensions=("instrument", "visit", "detector"))
    catalog = connectionTypes.Output(
        name="src",
        storageClass="SourceCatalog",
        dimensions=("instrument", "visit", "detector"))


class MyTask(PipelineTask):
    def run(...):
        # do processing that produces calexp, srcCat
        ...
        # bug: wrong catalog name
        return Struct(image=calexp, srcCat=srcCat)


task = MyTask()
result = task.run(...)
# raises because result.catalog does not exist
testUtils.assertValidOutput(task, result)

Testing optional/alternative inputs/outputs

Some tasks change their inputs depending on what processing is to be done (for example, IsrTask loads dark frames if and only if it does dark subtraction). The logic that activates or deactivates inputs is normally found in the PipelineTaskConnections class’s constructor.

Input-selecting logic can be tested by calling lsst.pipe.base.testUtils.runTestQuantum and checking which arguments were passed to run. Output-selecting logic can be tested with lsst.pipe.base.testUtils.verifyOutputConnections.

import lsst.daf.butler.tests as butlerTests
from lsst.pipe.base import connectionTypes, PipelineTask, \
    PipelineTaskConnections, PipelineTaskConfig
from lsst.pipe.base import testUtils

# A task that can take an Exposure xor a Catalog
# Don't try this at home!

class OrConnections(PipelineTaskConnections,
                    dimensions=("instrument", "visit", "detector")):
    exp = connectionTypes.Input(
        name="calexp",
        storageClass="ExposureF",
        dimensions=("instrument", "visit", "detector"))
    cat = connectionTypes.Input(
        name="src",
        storageClass="SourceCatalog",
        dimensions=("instrument", "visit", "detector"))

    def __init__(self, *, config=None):
        super().__init__(config=config)
        if config.doCatalog:
            self.inputs.remove("exp")
        else:
            self.inputs.remove("cat")


class OrConfig(PipelineTaskConfig,
               pipelineConnections=OrConnections):
    doCatalog = Field(dtype=bool, default=False)


class OrTask(PipelineTask):
    ConfigClass = OrConfig

    def run(exp=None, cat=None):
        ...


# doCatalog == False
task = OrTask()
run = testUtils.runTestQuantum(task, butler, quantum)
run.assert_called_once_with(exp=testExposure)

# doCatalog == True
config = OrConfig()
config.doCatalog = True
task = OrTask(config=config)
run = testUtils.runTestQuantum(task, butler, quantum)
run.assert_called_once_with(cat=testCatalog)