.. py:currentmodule:: lsst.pipe.base.testUtils .. _testing-a-pipeline-task: ####################### Testing a pipeline task ####################### This document describes how to write unit tests for a pipeline task (i.e., a subtask of `lsst.pipe.base.PipelineTask`). It covers testing of functionality specific to `~lsst.pipe.base.PipelineTask` but not ordinary tasks, including: * `~lsst.pipe.base.PipelineTaskConnections` classes * logic for optional inputs and outputs * custom implementations of `~lsst.pipe.base.PipelineTask.runQuantum` This guide does not cover writing a `~lsst.pipe.base.PipelineTask` from scratch, nor testing of a task's core functionality (e.g., whether it processes images correctly). The examples in this guide make heavy use of the test Butler framework described in :ref:`using-butler-in-tests`. .. _testing-a-pipeline-task-overview: Overview ======== The `lsst.pipe.base.testUtils` module provides tools for testing pipeline tasks. The tools are provided as stand-alone functions rather than as a special class like `lsst.utils.tests.TestCase` to make them easier for developers to mix and match as needed for their specific tests. Many of the tools provide no testing functionality directly, instead providing the infrastructure to run `~lsst.pipe.base.PipelineTask`-related code inside test environments. Most tools require a real data repository to read task inputs from (and possibly write outputs to). See :ref:`using-butler-in-tests` for one way to create a repository in test cases. .. _testing-a-pipeline-task-runQuantum: Testing runQuantum ================== Many pipeline tasks override `lsst.pipe.base.PipelineTask.runQuantum` to handle unusual inputs or data types. `~lsst.pipe.base.PipelineTask.runQuantum` may contain complex logic such as data ID manipulation, extra arguments to `~lsst.pipe.base.PipelineTask.run`, or default values. This logic cannot be tested without calling `~lsst.pipe.base.PipelineTask.runQuantum`, but the input arguments are difficult to set up without knowledge of the ``daf_butler`` package. The `lsst.pipe.base.testUtils.runTestQuantum` function wraps a call to the `~lsst.pipe.base.PipelineTask.runQuantum` method so that the user need only provide the task object, a `~lsst.daf.butler.Butler`, and a `~lsst.daf.butler.Quantum` (which can be generated by calling `lsst.pipe.base.testUtils.makeQuantum`). The `runTestQuantum` call can then be tested for particular behavior (e.g., raising exceptions, writing particular datasets, etc.) Because the processing done by `~lsst.pipe.base.PipelineTask.run` is potentially very slow, by default `lsst.pipe.base.testUtils.runTestQuantum` replaces `~lsst.pipe.base.PipelineTask.runQuantum`'s internal call(s) to `~lsst.pipe.base.PipelineTask.run` with a `unittest.mock.Mock` object. It returns the mock, which can be tested for what would have been run using methods like `~unittest.mock.Mock.assert_called_with`. Such a test can be combined with separate unit tests of how `~lsst.pipe.base.PipelineTask.run` handles different inputs to get complete coverage of the task code. If you do need `~lsst.pipe.base.PipelineTask.runQuantum` to call `~lsst.pipe.base.PipelineTask.run` (for example, because the test needs real outputs written to the repository), setting the ``mockRun=False`` argument will restore the normal behavior. .. code-block:: py :emphasize-lines: 20-29 import lsst.daf.butler.tests as butlerTests from lsst.pipe.base import testUtils # A minimal Butler repo, see daf_butler documentation dimensions = { "instrument": ["notACam"], "visit": [101, 102], "detector": [42], } repo = butlerTests.makeTestRepo(tempDir, dimensions) butlerTests.addDatasetType( repo, "InputType", {"instrument", "visit", "detector"}, "ExposureF") butlerTests.addDatasetType( repo, "OutputType", {"instrument", "visit", "detector"}, "ExposureF") ... # Set up what we need dataId = {"instrument": "notACam", "visit": 101, "detector": 42} butler = butlerTests.makeTestCollection(repo) task = AwesomeTask() quantum = testUtils.makeQuantum( task, butler, dataId, {key: dataId for key in {"input", "output"}}) run = testUtils.runTestQuantum(task, butler, quantum) # Actual input dataset omitted for simplicity run.assert_called_once() .. _testing-a-pipeline-task-run-output: Testing run Output ================== A pipeline task must return a `~lsst.pipe.base.Struct` whose fields include any outputs reported by its `~lsst.pipe.base.PipelineTaskConnections` class. The `lsst.pipe.base.testUtils.assertValidOutput` function takes a task object and a `~lsst.pipe.base.Struct` and confirms that the latter conforms to the former's connections. Currently, it tests for missing fields and mixing up vector and scalar values; more tests may be added in the future. .. code-block:: py :emphasize-lines: 29-31 import lsst.daf.butler.tests as butlerTests from lsst.pipe.base import connectionTypes, PipelineTask, \ PipelineTaskConnections from lsst.pipe.base import testUtils class MyConnections( PipelineTaskConnections, dimensions=("instrument", "visit", "detector")): image = connectionTypes.Output( name="calexp", storageClass="ExposureF", dimensions=("instrument", "visit", "detector")) catalog = connectionTypes.Output( name="src", storageClass="SourceCatalog", dimensions=("instrument", "visit", "detector")) class MyTask(PipelineTask): def run(...): # do processing that produces calexp, srcCat ... # bug: wrong catalog name return Struct(image=calexp, srcCat=srcCat) task = MyTask() result = task.run(...) # raises because result.catalog does not exist testUtils.assertValidOutput(task, result) .. _testing-a-pipeline-task-optional-connections: Testing optional/alternative inputs/outputs =========================================== Some tasks change their inputs depending on what processing is to be done (for example, `~lsst.ip.diffim.IsrTask` loads dark frames if and only if it does dark subtraction). The logic that activates or deactivates inputs is normally found in the `~lsst.pipe.base.PipelineTaskConnections` class's constructor. Input-selecting logic can be tested by calling `lsst.pipe.base.testUtils.runTestQuantum` and checking which arguments were passed to `~lsst.pipe.base.PipelineTask.run`. Output-selecting logic can be tested with `lsst.pipe.base.testUtils.verifyOutputConnections`. .. code-block:: py :emphasize-lines: 42-43, 49-50 import lsst.daf.butler.tests as butlerTests from lsst.pipe.base import connectionTypes, PipelineTask, \ PipelineTaskConnections, PipelineTaskConfig from lsst.pipe.base import testUtils # A task that can take an Exposure xor a Catalog # Don't try this at home! class OrConnections(PipelineTaskConnections, dimensions=("instrument", "visit", "detector")): exp = connectionTypes.Input( name="calexp", storageClass="ExposureF", dimensions=("instrument", "visit", "detector")) cat = connectionTypes.Input( name="src", storageClass="SourceCatalog", dimensions=("instrument", "visit", "detector")) def __init__(self, *, config=None): super().__init__(config=config) if config.doCatalog: self.inputs.remove("exp") else: self.inputs.remove("cat") class OrConfig(PipelineTaskConfig, pipelineConnections=OrConnections): doCatalog = Field(dtype=bool, default=False) class OrTask(PipelineTask): ConfigClass = OrConfig def run(exp=None, cat=None): ... # doCatalog == False task = OrTask() run = testUtils.runTestQuantum(task, butler, quantum) run.assert_called_once_with(exp=testExposure) # doCatalog == True config = OrConfig() config.doCatalog = True task = OrTask(config=config) run = testUtils.runTestQuantum(task, butler, quantum) run.assert_called_once_with(cat=testCatalog)