TaskRunner¶
-
class
lsst.pipe.base.
TaskRunner
(TaskClass, parsedCmd, doReturnResults=False)[source]¶ Bases:
object
Run a command-line task, using
multiprocessing
if requested.Parameters: TaskClass :
lsst.pipe.base.Task
subclassThe class of the task to run.
parsedCmd :
argparse.Namespace
The parsed command-line arguments, as returned by the task’s argument parser’s
parse_args
method.Warning
Do not store
parsedCmd
, as this instance is pickled (if multiprocessing) and parsedCmd may contain non-picklable elements. It certainly contains more data than we need to send to each instance of the task.doReturnResults :
bool
, optionalShould run return the collected result from each invocation of the task? This is only intended for unit tests and similar use. It can easily exhaust memory (if the task returns enough data and you call it enough times) and it will fail when using multiprocessing if the returned data cannot be pickled.
Note that even if
doReturnResults
is False a struct with a single member “exitStatus” is returned, with value 0 or 1 to be returned to the unix shell.Raises: ImportError
If multiprocessing is requested (and the task supports it) but the multiprocessing library cannot be imported.
Notes
Each command-line task (subclass of
lsst.pipe.base.CmdLineTask
) has a task runner. By default it is this class, but some tasks require a subclass. See the manual Creating a command-line task for more information. SeeCmdLineTask.parseAndRun
to see how a task runner is used.You may use this task runner for your command-line task if your task has a run method that takes exactly one argument: a butler data reference. Otherwise you must provide a task-specific subclass of this runner for your task’s
RunnerClass
that overridesTaskRunner.getTargetList
and possiblyTaskRunner.__call__
. SeeTaskRunner.getTargetList
for details.This design matches the common pattern for command-line tasks: the run method takes a single data reference, of some suitable name. Additional arguments are rare, and if present, require a subclass of
TaskRunner
that calls these additional arguments by name.Instances of this class must be picklable in order to be compatible with multiprocessing. If multiprocessing is requested (
parsedCmd.numProcesses > 1
) thenrun
callsprepareForMultiProcessing
to jettison optional non-picklable elements. If your task runner is not compatible with multiprocessing then indicate this in your task by setting class variablecanMultiprocess=False
.Due to a python bug, handling a
KeyboardInterrupt
properly requires specifying a timeout. This timeout (in sec) can be specified as thetimeout
element in the output fromArgumentParser
(theparsedCmd
), if available, otherwise we useTaskRunner.TIMEOUT
.By default, we disable “implicit” threading – ie, as provided by underlying numerical libraries such as MKL or BLAS. This is designed to avoid thread contention both when a single command line task spawns multiple processes and when multiple users are running on a shared system. Users can override this behaviour by setting the
LSST_ALLOW_IMPLICIT_THREADS
environment variable.Attributes Summary
TIMEOUT
Default timeout (seconds) for multiprocessing. Methods Summary
__call__
(args)Run the Task on a single target. getTargetList
(parsedCmd, **kwargs)Get a list of (dataRef, kwargs) for TaskRunner.__call__
.makeTask
([parsedCmd, args])Create a Task instance. precall
(parsedCmd)Hook for code that should run exactly once, before multiprocessing. prepareForMultiProcessing
()Prepare this instance for multiprocessing run
(parsedCmd)Run the task on all targets. Attributes Documentation
-
TIMEOUT
= 2592000¶ Default timeout (seconds) for multiprocessing.
Methods Documentation
-
__call__
(args)[source]¶ Run the Task on a single target.
Parameters: args
Arguments for Task.run()
Returns: struct :
lsst.pipe.base.Struct
Contains these fields if
doReturnResults
isTrue
:dataRef
: the provided data reference.metadata
: task metadata after execution of run.result
: result returned by task run, orNone
if the task fails.- ``exitStatus`: 0 if the task completed successfully, 1 otherwise.
If
doReturnResults
isFalse
the struct contains:- ``exitStatus`: 0 if the task completed successfully, 1 otherwise.
Notes
This default implementation assumes that the
args
is a tuple containing a data reference and a dict of keyword arguments.Warning
If you override this method and wish to return something when
doReturnResults
isFalse
, then it must be picklable to support multiprocessing and it should be small enough that pickling and unpickling do not add excessive overhead.
-
static
getTargetList
(parsedCmd, **kwargs)[source]¶ Get a list of (dataRef, kwargs) for
TaskRunner.__call__
.Parameters: parsedCmd :
argparse.Namespace
The parsed command object returned by
lsst.pipe.base.ArgumentParser.parse_args
.kwargs
Any additional keyword arguments. In the default
TaskRunner
this is an empty dict, but having it simplifies overridingTaskRunner
for tasks whose run method takes additional arguments (see case (1) below).Notes
The default implementation of
TaskRunner.getTargetList
andTaskRunner.__call__
works for any command-line task whose run method takes exactly one argument: a data reference. Otherwise you must provide a variant of TaskRunner that overridesTaskRunner.getTargetList
and possiblyTaskRunner.__call__
. There are two cases.Case 1
If your command-line task has a
run
method that takes one data reference followed by additional arguments, then you need only overrideTaskRunner.getTargetList
to return the additional arguments as an argument dict. To make this easier, your overridden version ofgetTargetList
may callTaskRunner.getTargetList
with the extra arguments as keyword arguments. For example, the following adds an argument dict containing a single key: “calExpList”, whose value is the list of data IDs for the calexp ID argument:def getTargetList(parsedCmd): return TaskRunner.getTargetList( parsedCmd, calExpList=parsedCmd.calexp.idList )
It is equivalent to this slightly longer version:
@staticmethod def getTargetList(parsedCmd): argDict = dict(calExpList=parsedCmd.calexp.idList) return [(dataId, argDict) for dataId in parsedCmd.id.idList]
Case 2
If your task does not meet condition (1) then you must override both TaskRunner.getTargetList and
TaskRunner.__call__
. You may do this however you see fit, so long asTaskRunner.getTargetList
returns a list, each of whose elements is sent toTaskRunner.__call__
, which runs your task.
-
makeTask
(parsedCmd=None, args=None)[source]¶ Create a Task instance.
Parameters: parsedCmd
Parsed command-line options (used for extra task args by some task runners).
args
Args tuple passed to
TaskRunner.__call__
(used for extra task arguments by some task runners).Notes
makeTask
can be called with either theparsedCmd
argument orargs
argument set to None, but it must construct identical Task instances in either case.Subclasses may ignore this method entirely if they reimplement both
TaskRunner.precall
andTaskRunner.__call__
.
-
precall
(parsedCmd)[source]¶ Hook for code that should run exactly once, before multiprocessing.
Notes
Must return True if
TaskRunner.__call__
should subsequently be called.Warning
Implementations must take care to ensure that no unpicklable attributes are added to the TaskRunner itself, for compatibility with multiprocessing.
The default implementation writes package versions, schemas and configs, or compares them to existing files on disk if present.
-
prepareForMultiProcessing
()[source]¶ Prepare this instance for multiprocessing
Optional non-picklable elements are removed.
This is only called if the task is run under multiprocessing.
-
run
(parsedCmd)[source]¶ Run the task on all targets.
Parameters: parsedCmd :
argparse.Namespace
Parsed command
argparse.Namespace
.Returns: resultList :
list
A list of results returned by
TaskRunner.__call__
, or an empty list ifTaskRunner.__call__
is not called (e.g. ifTaskRunner.precall
returnsFalse
). SeeTaskRunner.__call__
for details.Notes
The task is run under multiprocessing if
TaskRunner.numProcesses
is more than 1; otherwise processing is serial.
-