SingleQuantumExecutor¶
-
class
lsst.ctrl.mpexec.
SingleQuantumExecutor
(taskFactory: lsst.pipe.base.taskFactory.TaskFactory, skipExistingIn: Optional[list, None] = None, clobberOutputs: bool = False, enableLsstDebug: bool = False, exitOnKnownError: bool = False, mock: bool = False, mock_configs: Optional[list, None] = None)¶ Bases:
lsst.ctrl.mpexec.QuantumExecutor
Executor class which runs one Quantum at a time.
Parameters: - butler :
Butler
Data butler.
- taskFactory :
TaskFactory
Instance of a task factory.
- skipExistingIn :
list
[str
], optional Accepts list of collections, if all Quantum outputs already exist in the specified list of collections then that Quantum will not be rerun.
- clobberOutputs :
bool
, optional If
True
, then existing outputs in output run collection will be overwritten. IfskipExistingIn
is defined, only outputs from failed quanta will be overwritten.- enableLsstDebug :
bool
, optional Enable debugging with
lsstDebug
facility for a task.- exitOnKnownError :
bool
, optional If
True
, callsys.exit
with the appropriate exit code for special known exceptions, after printing a traceback, instead of letting the exception propagate up to calling. This is always the behavior for InvalidQuantumError.- mock :
bool
, optional If
True
then mock task execution.- mock_configs :
list
[_PipelineAction
], optional Optional config overrides for mock tasks.
Attributes Summary
stream_json_logs
If True each log record is written to a temporary file and ingested when quantum completes. Methods Summary
captureLogging
(taskDef, quantum, butler)Configure logging system to capture logs for execution of this task. checkExistingOutputs
(quantum, butler, taskDef)Decide whether this quantum needs to be executed. execute
(taskDef, quantum, butler)Execute single quantum. getReport
()Return execution report from last call to execute
.initGlobals
(quantum, butler)Initialize global state needed for task execution. makeTask
(taskClass, name, config, butler)Make new task instance. runQuantum
(task, quantum, taskDef, butler)Execute task on a single quantum. updatedQuantumInputs
(quantum, butler, taskDef)Update quantum with extra information, returns a new updated Quantum. writeLogRecords
(quantum, taskDef, butler, store)writeMetadata
(quantum, metadata, taskDef, butler)Attributes Documentation
-
stream_json_logs
= True¶ If True each log record is written to a temporary file and ingested when quantum completes. If False the records are accumulated in memory and stored in butler on quantum completion.
Methods Documentation
-
captureLogging
(taskDef: lsst.pipe.base.pipeline.TaskDef, quantum: lsst.daf.butler.core.quantum.Quantum, butler: lsst.daf.butler._butler.Butler) → Iterator¶ Configure logging system to capture logs for execution of this task.
Parameters: - taskDef :
lsst.pipe.base.TaskDef
The task definition.
- quantum :
Quantum
Single Quantum instance.
- butler :
Butler
Butler to write logs to.
Notes
Expected to be used as a context manager to ensure that logging records are inserted into the butler once the quantum has been executed:
with self.captureLogging(taskDef, quantum, butler): # Run quantum and capture logs.
Ths method can also setup logging to attach task- or quantum-specific information to log messages. Potentially this can take into account some info from task configuration as well.
- taskDef :
-
checkExistingOutputs
(quantum: lsst.daf.butler.core.quantum.Quantum, butler: lsst.daf.butler._butler.Butler, taskDef: lsst.pipe.base.pipeline.TaskDef) → bool¶ Decide whether this quantum needs to be executed.
If only partial outputs exist then they are removed if
clobberOutputs
is True, otherwise an exception is raised.Parameters: Returns: Raises: - RuntimeError
Raised if some outputs exist and some not.
-
execute
(taskDef: lsst.pipe.base.pipeline.TaskDef, quantum: lsst.daf.butler.core.quantum.Quantum, butler: lsst.daf.butler._butler.Butler) → lsst.daf.butler.core.quantum.Quantum¶ Execute single quantum.
Parameters: Returns: - quantum :
Quantum
The quantum actually executed. At present this quantum will contain only unresolved
DatasetRef
instances for output datasets, reflecting the state of the quantum just before it was run (but after any adjustments for predicted but now missing inputs). This may change in the future to include resolved outputDatasetRef
objects.
Notes
Any exception raised by the task or code that wraps task execution is propagated to the caller of this method.
- quantum :
-
getReport
() → Optional[lsst.ctrl.mpexec.reports.QuantumReport, None]¶ Return execution report from last call to
execute
.Returns: - report :
QuantumReport
Structure describing the status of the execution of a quantum.
None
is returned if implementation does not support this feature.
Raises: - RuntimeError
Raised if this method is called before
execute
.
- report :
-
initGlobals
(quantum: lsst.daf.butler.core.quantum.Quantum, butler: lsst.daf.butler._butler.Butler) → None¶ Initialize global state needed for task execution.
Parameters: Notes
There is an issue with initializing filters singleton which is done by instrument, to avoid requiring tasks to do it in runQuantum() we do it here when any dataId has an instrument dimension. Also for now we only allow single instrument, verify that all instrument names in all dataIds are identical.
This will need revision when filter singleton disappears.
-
makeTask
(taskClass: type, name: str, config: lsst.pipe.base.config.PipelineTaskConfig, butler: lsst.daf.butler._butler.Butler) → lsst.pipe.base.pipelineTask.PipelineTask¶ Make new task instance.
Parameters: - taskClass :
type
Sub-class of
PipelineTask
.- name :
str
Name for this task.
- config :
PipelineTaskConfig
Configuration object for this task
Returns: - task :
PipelineTask
Instance of
taskClass
type.- butler :
Butler
Data butler.
- taskClass :
-
runQuantum
(task: lsst.pipe.base.pipelineTask.PipelineTask, quantum: lsst.daf.butler.core.quantum.Quantum, taskDef: lsst.pipe.base.pipeline.TaskDef, butler: lsst.daf.butler._butler.Butler) → None¶ Execute task on a single quantum.
Parameters: - task :
PipelineTask
Task object.
- quantum :
Quantum
Single Quantum instance.
- taskDef :
TaskDef
Task definition structure.
- butler :
Butler
Data butler.
- task :
-
updatedQuantumInputs
(quantum: lsst.daf.butler.core.quantum.Quantum, butler: lsst.daf.butler._butler.Butler, taskDef: lsst.pipe.base.pipeline.TaskDef) → lsst.daf.butler.core.quantum.Quantum¶ Update quantum with extra information, returns a new updated Quantum.
Some methods may require input DatasetRefs to have non-None
dataset_id
, but in case of intermediate dataset it may not be filled during QuantumGraph construction. This method will retrieve missing info from registry.Parameters: Returns: - update :
Quantum
Updated Quantum instance
- update :
-
writeLogRecords
(quantum: lsst.daf.butler.core.quantum.Quantum, taskDef: lsst.pipe.base.pipeline.TaskDef, butler: lsst.daf.butler._butler.Butler, store: bool) → None¶
-
writeMetadata
(quantum: lsst.daf.butler.core.quantum.Quantum, metadata: Any, taskDef: lsst.pipe.base.pipeline.TaskDef, butler: lsst.daf.butler._butler.Butler) → None¶
- butler :