.. py:currentmodule:: lsst.pipe.base.testUtils

.. _testing-a-pipeline-task:

#######################
Testing a pipeline task
#######################

This document describes how to write unit tests for a pipeline task (i.e., a subtask of `lsst.pipe.base.PipelineTask`).
It covers testing of functionality specific to `~lsst.pipe.base.PipelineTask` but not ordinary tasks, including:

* `~lsst.pipe.base.PipelineTaskConnections` classes
* logic for optional inputs and outputs
* custom implementations of `~lsst.pipe.base.PipelineTask.runQuantum`

This guide does not cover writing a `~lsst.pipe.base.PipelineTask` from scratch, nor testing of a task's core functionality (e.g., whether it processes images correctly).

The examples in this guide make heavy use of the test Butler framework described in :ref:`using-butler-in-tests`.

.. _testing-a-pipeline-task-overview:

Overview
========

The `lsst.pipe.base.testUtils` module provides tools for testing pipeline tasks.
The tools are provided as stand-alone functions rather than as a special class like `lsst.utils.tests.TestCase` to make them easier for developers to mix and match as needed for their specific tests.
Many of the tools provide no testing functionality directly, instead providing the infrastructure to run `~lsst.pipe.base.PipelineTask`-related code inside test environments.

Most tools require a real data repository to read task inputs from (and possibly write outputs to).
See :ref:`using-butler-in-tests` for one way to create a repository in test cases.

.. _testing-a-pipeline-task-runQuantum:

Testing runQuantum
==================

Many pipeline tasks override `lsst.pipe.base.PipelineTask.runQuantum` to handle unusual inputs or data types.
`~lsst.pipe.base.PipelineTask.runQuantum` may contain complex logic such as data ID manipulation, extra arguments to `~lsst.pipe.base.PipelineTask.run`, or default values.
This logic cannot be tested without calling `~lsst.pipe.base.PipelineTask.runQuantum`, but the input arguments are difficult to set up without knowledge of the ``daf_butler`` package.

The `lsst.pipe.base.testUtils.runTestQuantum` function wraps a call to the `~lsst.pipe.base.PipelineTask.runQuantum` method so that the user need only provide the task object, a `~lsst.daf.butler.Butler`, and a `~lsst.daf.butler.Quantum` (which can be generated by calling `lsst.pipe.base.testUtils.makeQuantum`).
The `runTestQuantum` call can then be tested for particular behavior (e.g., raising exceptions, writing particular datasets, etc.)

Because the processing done by `~lsst.pipe.base.PipelineTask.run` is potentially very slow, by default `lsst.pipe.base.testUtils.runTestQuantum` replaces `~lsst.pipe.base.PipelineTask.runQuantum`'s internal call(s) to `~lsst.pipe.base.PipelineTask.run` with a `unittest.mock.Mock` object.
It returns the mock, which can be tested for what would have been run using methods like `~unittest.mock.Mock.assert_called_with`.
Such a test can be combined with separate unit tests of how `~lsst.pipe.base.PipelineTask.run` handles different inputs to get complete coverage of the task code.

If you do need `~lsst.pipe.base.PipelineTask.runQuantum` to call `~lsst.pipe.base.PipelineTask.run` (for example, because the test needs real outputs written to the repository), setting the ``mockRun=False`` argument will restore the normal behavior.

.. code-block:: py
   :emphasize-lines: 20-29

   import lsst.daf.butler.tests as butlerTests
   from lsst.pipe.base import testUtils

   # A minimal Butler repo, see daf_butler documentation
   dimensions = {
       "instrument": ["notACam"],
       "visit": [101, 102],
       "detector": [42],
   }
   repo = butlerTests.makeTestRepo(tempDir, dimensions)
   butlerTests.addDatasetType(
       repo, "InputType", {"instrument", "visit", "detector"},
       "ExposureF")
   butlerTests.addDatasetType(
       repo, "OutputType", {"instrument", "visit", "detector"},
       "ExposureF")

   ...

   # Set up what we need
   dataId = {"instrument": "notACam", "visit": 101, "detector": 42}
   butler = butlerTests.makeTestCollection(repo)
   task = AwesomeTask()
   quantum = testUtils.makeQuantum(
       task, butler, dataId,
       {key: dataId for key in {"input", "output"}})
   run = testUtils.runTestQuantum(task, butler, quantum)
   # Actual input dataset omitted for simplicity
   run.assert_called_once()

.. _testing-a-pipeline-task-run-output:

Testing run Output
==================

A pipeline task must return a `~lsst.pipe.base.Struct` whose fields include any outputs reported by its `~lsst.pipe.base.PipelineTaskConnections` class.

The `lsst.pipe.base.testUtils.assertValidOutput` function takes a task object and a `~lsst.pipe.base.Struct` and confirms that the latter conforms to the former's connections.
Currently, it tests for missing fields and mixing up vector and scalar values; more tests may be added in the future.

.. code-block:: py
   :emphasize-lines: 29-31

   import lsst.daf.butler.tests as butlerTests
   from lsst.pipe.base import connectionTypes, PipelineTask, \
       PipelineTaskConnections
   from lsst.pipe.base import testUtils


   class MyConnections(
           PipelineTaskConnections,
           dimensions=("instrument", "visit", "detector")):
       image = connectionTypes.Output(
           name="calexp",
           storageClass="ExposureF",
           dimensions=("instrument", "visit", "detector"))
       catalog = connectionTypes.Output(
           name="src",
           storageClass="SourceCatalog",
           dimensions=("instrument", "visit", "detector"))


   class MyTask(PipelineTask):
       def run(...):
           # do processing that produces calexp, srcCat
           ...
           # bug: wrong catalog name
           return Struct(image=calexp, srcCat=srcCat)


   task = MyTask()
   result = task.run(...)
   # raises because result.catalog does not exist
   testUtils.assertValidOutput(task, result)

.. _testing-a-pipeline-task-optional-connections:

Testing optional/alternative inputs/outputs
===========================================

Some tasks change their inputs depending on what processing is to be done (for example, `~lsst.ip.diffim.IsrTask` loads dark frames if and only if it does dark subtraction).
The logic that activates or deactivates inputs is normally found in the `~lsst.pipe.base.PipelineTaskConnections` class's constructor.

Input-selecting logic can be tested by calling `lsst.pipe.base.testUtils.runTestQuantum` and checking which arguments were passed to `~lsst.pipe.base.PipelineTask.run`.
Output-selecting logic can be tested with `lsst.pipe.base.testUtils.verifyOutputConnections`.

.. code-block:: py
   :emphasize-lines: 42-43, 49-50

   import lsst.daf.butler.tests as butlerTests
   from lsst.pipe.base import connectionTypes, PipelineTask, \
       PipelineTaskConnections, PipelineTaskConfig
   from lsst.pipe.base import testUtils

   # A task that can take an Exposure xor a Catalog
   # Don't try this at home!

   class OrConnections(PipelineTaskConnections,
                       dimensions=("instrument", "visit", "detector")):
       exp = connectionTypes.Input(
           name="calexp",
           storageClass="ExposureF",
           dimensions=("instrument", "visit", "detector"))
       cat = connectionTypes.Input(
           name="src",
           storageClass="SourceCatalog",
           dimensions=("instrument", "visit", "detector"))

       def __init__(self, *, config=None):
           super().__init__(config=config)
           if config.doCatalog:
               self.inputs.remove("exp")
           else:
               self.inputs.remove("cat")


   class OrConfig(PipelineTaskConfig,
                  pipelineConnections=OrConnections):
       doCatalog = Field(dtype=bool, default=False)


   class OrTask(PipelineTask):
       ConfigClass = OrConfig

       def run(exp=None, cat=None):
           ...


   # doCatalog == False
   task = OrTask()
   run = testUtils.runTestQuantum(task, butler, quantum)
   run.assert_called_once_with(exp=testExposure)

   # doCatalog == True
   config = OrConfig()
   config.doCatalog = True
   task = OrTask(config=config)
   run = testUtils.runTestQuantum(task, butler, quantum)
   run.assert_called_once_with(cat=testCatalog)