#
# LSST Data Management System
# Copyright 2008-2016 AURA/LSST.
#
# This product includes software developed by the
# LSST Project (http://www.lsst.org/).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the LSST License Statement and
# the GNU General Public License along with this program. If not,
# see <http://www.lsstcorp.org/LegalNotices/>.
#
from __future__ import absolute_import, division
import contextlib
from builtins import object
import lsstDebug
from lsst.pex.config import ConfigurableField
from lsst.log import Log
import lsst.daf.base as dafBase
from .timer import logInfo
__all__ = ["Task", "TaskError"]
[docs]class TaskError(Exception):
"""Use to report errors for which a traceback is not useful.
Notes
-----
Examples of such errors:
- processCcd is asked to run detection, but not calibration, and no calexp is found.
- coadd finds no valid images in the specified patch.
"""
pass
[docs]class Task(object):
"""Base class for data processing tasks.
See :ref:`task-framework-overview` to learn what tasks are, and :ref:`creating-a-task` for more
information about writing tasks.
Parameters
----------
config : `Task.ConfigClass` instance, optional
Configuration for this task (an instance of Task.ConfigClass, which is a task-specific subclass of
`lsst.pex.config.Config`, or `None`. If `None`:
- If parentTask specified then defaults to parentTask.config.\<name>
- If parentTask is None then defaults to self.ConfigClass()
name : `str`, optional
Brief name of task, or `None`; if `None` then defaults to `Task._DefaultName`
parentTask : `Task`-type, optional
The parent task of this subtask, if any.
- If `None` (a top-level task) then you must specify config and name is ignored.
- If not `None` (a subtask) then you must specify name.
log : `lsst.log.Log`, optional
Log whose name is used as a log name prefix, or `None` for no prefix. Ignored if is parentTask
specified, in which case ``parentTask.log``\ 's name is used as a prefix. The task's log name is
``prefix + "." + name`` if a prefix exists, else ``name``. The task's log is then a child logger of
``parentTask.log`` (if ``parentTask`` specified), or a child logger of the log from the argument
(if ``log`` is not `None`).
Raises
------
RuntimeError
Raised under these circumstances:
- If ``parentTask`` is `None` and ``config`` is `None`.
- If ``parentTask`` is not `None` and ``name`` is `None`.
- If ``name`` is `None` and ``_DefaultName`` does not exist.
Notes
-----
Useful attributes include:
- ``log``: an lsst.log.Log
- ``config``: task-specific configuration; an instance of ``ConfigClass`` (see below).
- ``metadata``: an `lsst.daf.base.PropertyList` for collecting task-specific metadata,
e.g. data quality and performance metrics. This is data that is only meant to be
persisted, never to be used by the task.
Subclasses typically have a method named ``run`` to perform the main data processing. Details:
- ``run`` should process the minimum reasonable amount of data, typically a single CCD.
Iteration, if desired, is performed by a caller of the run method. This is good design and allows
multiprocessing without the run method having to support it directly.
- If ``run`` can persist or unpersist data:
- ``run`` should accept a butler data reference (or a collection of data references, if appropriate,
e.g. coaddition).
- There should be a way to run the task without persisting data. Typically the run method returns all
data, even if it is persisted, and the task's config method offers a flag to disable persistence.
**Deprecated:** Tasks other than cmdLineTask.CmdLineTask%s should *not* accept a blob such as a butler
data reference. How we will handle data references is still TBD, so don't make changes yet!
RHL 2014-06-27
Subclasses must also have an attribute ``ConfigClass`` that is a subclass of `lsst.pex.config.Config`
which configures the task. Subclasses should also have an attribute ``_DefaultName``:
the default name if there is no parent task. ``_DefaultName`` is required for subclasses of
`~lsst.pipe.base.CmdLineTask` and recommended for subclasses of Task because it simplifies construction
(e.g. for unit tests).
Tasks intended to be run from the command line should be subclasses of `~lsst.pipe.base.CmdLineTask`
not Task.
"""
def __init__(self, config=None, name=None, parentTask=None, log=None):
self.metadata = dafBase.PropertyList()
self._parentTask = parentTask
if parentTask is not None:
if name is None:
raise RuntimeError("name is required for a subtask")
self._name = name
self._fullName = parentTask._computeFullName(name)
if config is None:
config = getattr(parentTask.config, name)
self._taskDict = parentTask._taskDict
loggerName = parentTask.log.getName() + '.' + name
else:
if name is None:
name = getattr(self, "_DefaultName", None)
if name is None:
raise RuntimeError("name is required for a task unless it has attribute _DefaultName")
name = self._DefaultName
self._name = name
self._fullName = self._name
if config is None:
config = self.ConfigClass()
self._taskDict = dict()
loggerName = self._fullName
if log is not None and log.getName():
loggerName = log.getName() + '.' + loggerName
self.log = Log.getLogger(loggerName)
self.config = config
self._display = lsstDebug.Info(self.__module__).display
self._taskDict[self._fullName] = self
[docs] def getSchemaCatalogs(self):
"""Get the schemas generated by this task.
Returns
-------
schemaCatalogs : `dict`
Keys are butler dataset type, values are an empty catalog (an instance of the appropriate
`lsst.afw.table` Catalog type) for this task.
Notes
-----
.. warning::
Subclasses that use schemas must override this method. The default implemenation returns
an empty dict.
This method may be called at any time after the Task is constructed, which means that all task
schemas should be computed at construction time, *not* when data is actually processed. This
reflects the philosophy that the schema should not depend on the data.
Returning catalogs rather than just schemas allows us to save e.g. slots for SourceCatalog as well.
See also
--------
Task.getAllSchemaCatalogs
"""
return {}
[docs] def getAllSchemaCatalogs(self):
"""Get schema catalogs for all tasks in the hierarchy, combining the results into a single dict.
Returns
-------
schemacatalogs : `dict`
Keys are butler dataset type, values are a empty catalog (an instance of the appropriate
lsst.afw.table Catalog type) for all tasks in the hierarchy, from the top-level task down
through all subtasks.
Notes
-----
This method may be called on any task in the hierarchy; it will return the same answer, regardless.
The default implementation should always suffice. If your subtask uses schemas the override
`Task.getSchemaCatalogs`, not this method.
"""
schemaDict = self.getSchemaCatalogs()
for subtask in self._taskDict.values():
schemaDict.update(subtask.getSchemaCatalogs())
return schemaDict
[docs] def getFullName(self):
"""Get the task name as a hierarchical name including parent task names.
Returns
-------
fullName : `str`
The full name consists of the name of the parent task and each subtask separated by periods.
For example:
- The full name of top-level task "top" is simply "top".
- The full name of subtask "sub" of top-level task "top" is "top.sub".
- The full name of subtask "sub2" of subtask "sub" of top-level task "top" is "top.sub.sub2".
"""
return self._fullName
[docs] def getName(self):
"""Get the name of the task.
Returns
-------
taskName : `str`
Name of the task.
See also
--------
getFullName
"""
return self._name
[docs] def getTaskDict(self):
"""Get a dictionary of all tasks as a shallow copy.
Returns
-------
taskDict : `dict`
Dictionary containing full task name: task object for the top-level task and all subtasks,
sub-subtasks, etc..
"""
return self._taskDict.copy()
[docs] def makeSubtask(self, name, **keyArgs):
"""Create a subtask as a new instance as the ``name`` attribute of this task.
Parameters
----------
name : `str`
Brief name of the subtask.
keyArgs
Extra keyword arguments used to construct the task. The following arguments are automatically
provided and cannot be overridden:
- "config".
- "parentTask".
Notes
-----
The subtask must be defined by ``Task.config.name``, an instance of pex_config ConfigurableField
or RegistryField.
"""
taskField = getattr(self.config, name, None)
if taskField is None:
raise KeyError("%s's config does not have field %r" % (self.getFullName(), name))
subtask = taskField.apply(name=name, parentTask=self, **keyArgs)
setattr(self, name, subtask)
@contextlib.contextmanager
[docs] def timer(self, name, logLevel=Log.DEBUG):
"""Context manager to log performance data for an arbitrary block of code.
Parameters
----------
name : `str`
Name of code being timed; data will be logged using item name: ``Start`` and ``End``.
logLevel
A `lsst.log` level constant.
Examples
--------
Creating a timer context::
with self.timer("someCodeToTime"):
pass # code to time
See also
--------
timer.logInfo
"""
logInfo(obj=self, prefix=name + "Start", logLevel=logLevel)
try:
yield
finally:
logInfo(obj=self, prefix=name + "End", logLevel=logLevel)
@classmethod
[docs] def makeField(cls, doc):
"""Make a `lsst.pex.config.ConfigurableField` for this task.
Parameters
----------
doc : `str`
Help text for the field.
Returns
-------
configurableField : `lsst.pex.config.ConfigurableField`
A `~ConfigurableField` for this task.
Examples
--------
Provides a convenient way to specify this task is a subtask of another task.
Here is an example of use::
class OtherTaskConfig(lsst.pex.config.Config)
aSubtask = ATaskClass.makeField("a brief description of what this task does")
"""
return ConfigurableField(doc=doc, target=cls)
def _computeFullName(self, name):
"""Compute the full name of a subtask or metadata item, given its brief name.
Parameters
----------
name : `str`
Brief name of subtask or metadata item.
Returns
-------
fullName : `str`
The full name: the ``name`` argument prefixed by the full task name and a period.
Notes
-----
For example: if the full name of this task is "top.sub.sub2"
then ``_computeFullName("subname")`` returns ``"top.sub.sub2.subname"``.
"""
return "%s.%s" % (self._fullName, name)
def __reduce__(self):
"""Pickler.
"""
return self.__class__, (self.config, self._name, self._parentTask, None)