Testing a pipeline task¶
This document describes how to write unit tests for a pipeline task (i.e., a subtask of lsst.pipe.base.PipelineTask
).
It covers testing of functionality specific to PipelineTask
but not ordinary tasks, including:
PipelineTaskConnections
classeslogic for optional inputs and outputs
custom implementations of
runQuantum
This guide does not cover writing a PipelineTask
from scratch, nor testing of a task’s core functionality (e.g., whether it processes images correctly).
The examples in this guide make heavy use of the test Butler framework described in Using the Butler in unit tests.
Overview¶
The lsst.pipe.base.testUtils
module provides tools for testing pipeline tasks.
The tools are provided as stand-alone functions rather than as a special class like lsst.utils.tests.TestCase
to make them easier for developers to mix and match as needed for their specific tests.
Many of the tools provide no testing functionality directly, instead providing the infrastructure to run PipelineTask
-related code inside test environments.
Most tools require a real data repository to read task inputs from (and possibly write outputs to). See Using the Butler in unit tests for one way to create a repository in test cases.
Testing runQuantum¶
Many pipeline tasks override lsst.pipe.base.PipelineTask.runQuantum
to handle unusual inputs or data types.
runQuantum
may contain complex logic such as data ID manipulation, extra arguments to run
, or default values.
This logic cannot be tested without calling runQuantum
, but the input arguments are difficult to set up without knowledge of the daf_butler
package.
The lsst.pipe.base.testUtils.runTestQuantum
function wraps a call to the runQuantum
method so that the user need only provide the task object, a Butler
, and a Quantum
(which can be generated by calling lsst.pipe.base.testUtils.makeQuantum
).
The runTestQuantum
call can then be tested for particular behavior (e.g., raising exceptions, writing particular datasets, etc.)
Because the processing done by run
is potentially very slow, by default lsst.pipe.base.testUtils.runTestQuantum
replaces runQuantum
’s internal call(s) to run
with a unittest.mock.Mock
object.
It returns the mock, which can be tested for what would have been run using methods like assert_called_with
.
Such a test can be combined with separate unit tests of how run
handles different inputs to get complete coverage of the task code.
If you do need runQuantum
to call run
(for example, because the test needs real outputs written to the repository), setting the mockRun=False
argument will restore the normal behavior.
import lsst.daf.butler.tests as butlerTests
from lsst.pipe.base import testUtils
# A minimal Butler repo, see daf_butler documentation
repo = butlerTests.makeTestRepo(tempDir)
butlerTests.addDataIdValue(repo, "instrument", "notACam")
butlerTests.addDataIdValue(repo, "visit", 101)
butlerTests.addDataIdValue(repo, "visit", 102)
butlerTests.addDataIdValue(repo, "detector", 42)
butlerTests.addDatasetType(
repo, "InputType", {"instrument", "visit", "detector"},
"ExposureF")
butlerTests.addDatasetType(
repo, "OutputType", {"instrument", "visit", "detector"},
"ExposureF")
...
# Set up what we need
dataId = {"instrument": "notACam", "visit": 101, "detector": 42}
butler = butlerTests.makeTestCollection(repo)
task = AwesomeTask()
quantum = testUtils.makeQuantum(
task, butler, dataId,
{key: dataId for key in {"input", "output"}})
run = testUtils.runTestQuantum(task, butler, quantum)
# Actual input dataset omitted for simplicity
run.assert_called_once()
Testing run Output¶
A pipeline task must return a Struct
whose fields include any outputs reported by its PipelineTaskConnections
class.
The lsst.pipe.base.testUtils.assertValidOutput
function takes a task object and a Struct
and confirms that the latter conforms to the former’s connections.
Currently, it tests for missing fields and mixing up vector and scalar values; more tests may be added in the future.
import lsst.daf.butler.tests as butlerTests
from lsst.pipe.base import connectionTypes, PipelineTask, \
PipelineTaskConnections
from lsst.pipe.base import testUtils
class MyConnections(
PipelineTaskConnections,
dimensions=("instrument", "visit", "detector")):
image = connectionTypes.Output(
name="calexp",
storageClass="ExposureF",
dimensions=("instrument", "visit", "detector"))
catalog = connectionTypes.Output(
name="src",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector"))
class MyTask(PipelineTask):
def run(...):
# do processing that produces calexp, srcCat
...
# bug: wrong catalog name
return Struct(image=calexp, srcCat=srcCat)
task = MyTask()
result = task.run(...)
# raises because result.catalog does not exist
testUtils.assertValidOutput(task, result)
Testing task initOutputs¶
If a pipeline task has initOutputs, task objects must have one attribute for each such output.
The lsst.pipe.base.testUtils.assertValidInitOutput
function takes a task object and confirms that it has an attribute for each initOutput in its connections.
The tests are analogous to those for assertValidOutput.
import lsst.afw.table as afwTable
import lsst.daf.butler.tests as butlerTests
from lsst.pipe.base import connectionTypes, PipelineTask, \
PipelineTaskConnections
from lsst.pipe.base import testUtils
class MyConnections(
PipelineTaskConnections,
dimensions=("instrument", "visit", "detector")):
schema = connectionTypes.InitOutput(
name="srcSchema",
storageClass="SourceCatalog")
catalog = connectionTypes.Output(
name="src",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector"))
class MyTask(PipelineTask):
def __init__(config=None, log=None, initInputs=None):
super().__init__(config, log, initInputs)
# bug: should be SourceCatalog
self.schema = afwTable.Schema()
task = MyTask()
# raises because result.schema has wrong type
testUtils.assertValidInitOutput(task)
Testing optional/alternative inputs/outputs¶
Some tasks change their inputs depending on what processing is to be done (for example, IsrTask
loads dark frames if and only if it does dark subtraction).
The logic that activates or deactivates inputs is normally found in the PipelineTaskConnections
class’s constructor.
Input-selecting logic can be tested by calling lsst.pipe.base.testUtils.runTestQuantum
and checking which arguments were passed to run
.
Output-selecting logic can be tested with lsst.pipe.base.testUtils.assertValidOutput
.
Optional init-inputs can be tested by calling lsst.pipe.base.testUtils.getInitInputs
and checking which values are returned.
There is currently no test framework for the use of init-inputs in task constructors.
import lsst.daf.butler.tests as butlerTests
from lsst.pipe.base import connectionTypes, PipelineTask, \
PipelineTaskConnections, PipelineTaskConfig
from lsst.pipe.base import testUtils
# A task that can take an Exposure xor a Catalog
# Don't try this at home!
class OrConnections(PipelineTaskConnections,
dimensions=("instrument", "visit", "detector")):
exp = connectionTypes.Input(
name="calexp",
storageClass="ExposureF",
dimensions=("instrument", "visit", "detector"))
cat = connectionTypes.Input(
name="src",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector"))
def __init__(self, *, config=None):
super().__init__(config=config)
if config.doCatalog:
self.inputs.remove("exp")
else:
self.inputs.remove("cat")
class OrConfig(PipelineTaskConfig,
pipelineConnections=OrConnections):
doCatalog = Field(dtype=bool, default=False)
class OrTask(PipelineTask):
ConfigClass = OrConfig
def run(exp=None, cat=None):
...
# doCatalog == False
task = OrTask()
run = testUtils.runTestQuantum(task, butler, quantum)
run.assert_called_once_with(exp=testExposure)
# doCatalog == True
config = OrConfig()
config.doCatalog = True
task = OrTask(config=config)
run = testUtils.runTestQuantum(task, butler, quantum)
run.assert_called_once_with(cat=testCatalog)