SingleQuantumExecutor

class lsst.ctrl.mpexec.SingleQuantumExecutor(taskFactory: lsst.pipe.base.taskFactory.TaskFactory, skipExistingIn: Optional[list, None] = None, clobberOutputs: bool = False, enableLsstDebug: bool = False, exitOnKnownError: bool = False, mock: bool = False, mock_configs: Optional[list, None] = None)

Bases: lsst.ctrl.mpexec.QuantumExecutor

Executor class which runs one Quantum at a time.

Parameters:
butler : Butler

Data butler.

taskFactory : TaskFactory

Instance of a task factory.

skipExistingIn : list [ str ], optional

Accepts list of collections, if all Quantum outputs already exist in the specified list of collections then that Quantum will not be rerun.

clobberOutputs : bool, optional

If True, then existing outputs in output run collection will be overwritten. If skipExistingIn is defined, only outputs from failed quanta will be overwritten.

enableLsstDebug : bool, optional

Enable debugging with lsstDebug facility for a task.

exitOnKnownError : bool, optional

If True, call sys.exit with the appropriate exit code for special known exceptions, after printing a traceback, instead of letting the exception propagate up to calling. This is always the behavior for InvalidQuantumError.

mock : bool, optional

If True then mock task execution.

mock_configs : list [ _PipelineAction ], optional

Optional config overrides for mock tasks.

Attributes Summary

stream_json_logs If True each log record is written to a temporary file and ingested when quantum completes.

Methods Summary

captureLogging(taskDef, quantum, butler) Configure logging system to capture logs for execution of this task.
checkExistingOutputs(quantum, butler, taskDef) Decide whether this quantum needs to be executed.
execute(taskDef, quantum, butler) Execute single quantum.
getReport() Return execution report from last call to execute.
initGlobals(quantum, butler) Initialize global state needed for task execution.
makeTask(taskClass, name, config, butler) Make new task instance.
runQuantum(task, quantum, taskDef, butler) Execute task on a single quantum.
updatedQuantumInputs(quantum, butler, taskDef) Update quantum with extra information, returns a new updated Quantum.
writeLogRecords(quantum, taskDef, butler, store)
writeMetadata(quantum, metadata, taskDef, butler)

Attributes Documentation

stream_json_logs = True

If True each log record is written to a temporary file and ingested when quantum completes. If False the records are accumulated in memory and stored in butler on quantum completion.

Methods Documentation

captureLogging(taskDef: lsst.pipe.base.pipeline.TaskDef, quantum: lsst.daf.butler.core.quantum.Quantum, butler: lsst.daf.butler._butler.Butler) → Iterator

Configure logging system to capture logs for execution of this task.

Parameters:
taskDef : lsst.pipe.base.TaskDef

The task definition.

quantum : Quantum

Single Quantum instance.

butler : Butler

Butler to write logs to.

Notes

Expected to be used as a context manager to ensure that logging records are inserted into the butler once the quantum has been executed:

with self.captureLogging(taskDef, quantum, butler):
    # Run quantum and capture logs.

Ths method can also setup logging to attach task- or quantum-specific information to log messages. Potentially this can take into account some info from task configuration as well.

checkExistingOutputs(quantum: lsst.daf.butler.core.quantum.Quantum, butler: lsst.daf.butler._butler.Butler, taskDef: lsst.pipe.base.pipeline.TaskDef) → bool

Decide whether this quantum needs to be executed.

If only partial outputs exist then they are removed if clobberOutputs is True, otherwise an exception is raised.

Parameters:
quantum : Quantum

Quantum to check for existing outputs

butler : Butler

Data butler.

taskDef : TaskDef

Task definition structure.

Returns:
exist : bool

True if self.skipExistingIn is defined, and a previous execution of this quanta appears to have completed successfully (either because metadata was written or all datasets were written). False otherwise.

Raises:
RuntimeError

Raised if some outputs exist and some not.

execute(taskDef: lsst.pipe.base.pipeline.TaskDef, quantum: lsst.daf.butler.core.quantum.Quantum, butler: lsst.daf.butler._butler.Butler) → lsst.daf.butler.core.quantum.Quantum

Execute single quantum.

Parameters:
taskDef : TaskDef

Task definition structure.

quantum : Quantum

Quantum for this execution.

butler : Butler

Data butler instance

Returns:
quantum : Quantum

The quantum actually executed. At present this quantum will contain only unresolved DatasetRef instances for output datasets, reflecting the state of the quantum just before it was run (but after any adjustments for predicted but now missing inputs). This may change in the future to include resolved output DatasetRef objects.

Notes

Any exception raised by the task or code that wraps task execution is propagated to the caller of this method.

getReport() → Optional[lsst.ctrl.mpexec.reports.QuantumReport, None]

Return execution report from last call to execute.

Returns:
report : QuantumReport

Structure describing the status of the execution of a quantum. None is returned if implementation does not support this feature.

Raises:
RuntimeError

Raised if this method is called before execute.

initGlobals(quantum: lsst.daf.butler.core.quantum.Quantum, butler: lsst.daf.butler._butler.Butler) → None

Initialize global state needed for task execution.

Parameters:
quantum : Quantum

Single Quantum instance.

butler : Butler

Data butler.

Notes

There is an issue with initializing filters singleton which is done by instrument, to avoid requiring tasks to do it in runQuantum() we do it here when any dataId has an instrument dimension. Also for now we only allow single instrument, verify that all instrument names in all dataIds are identical.

This will need revision when filter singleton disappears.

makeTask(taskClass: type, name: str, config: lsst.pipe.base.config.PipelineTaskConfig, butler: lsst.daf.butler._butler.Butler) → lsst.pipe.base.pipelineTask.PipelineTask

Make new task instance.

Parameters:
taskClass : type

Sub-class of PipelineTask.

name : str

Name for this task.

config : PipelineTaskConfig

Configuration object for this task

Returns:
task : PipelineTask

Instance of taskClass type.

butler : Butler

Data butler.

runQuantum(task: lsst.pipe.base.pipelineTask.PipelineTask, quantum: lsst.daf.butler.core.quantum.Quantum, taskDef: lsst.pipe.base.pipeline.TaskDef, butler: lsst.daf.butler._butler.Butler) → None

Execute task on a single quantum.

Parameters:
task : PipelineTask

Task object.

quantum : Quantum

Single Quantum instance.

taskDef : TaskDef

Task definition structure.

butler : Butler

Data butler.

updatedQuantumInputs(quantum: lsst.daf.butler.core.quantum.Quantum, butler: lsst.daf.butler._butler.Butler, taskDef: lsst.pipe.base.pipeline.TaskDef) → lsst.daf.butler.core.quantum.Quantum

Update quantum with extra information, returns a new updated Quantum.

Some methods may require input DatasetRefs to have non-None dataset_id, but in case of intermediate dataset it may not be filled during QuantumGraph construction. This method will retrieve missing info from registry.

Parameters:
quantum : Quantum

Single Quantum instance.

butler : Butler

Data butler.

taskDef : TaskDef

Task definition structure.

Returns:
update : Quantum

Updated Quantum instance

writeLogRecords(quantum: lsst.daf.butler.core.quantum.Quantum, taskDef: lsst.pipe.base.pipeline.TaskDef, butler: lsst.daf.butler._butler.Butler, store: bool) → None
writeMetadata(quantum: lsst.daf.butler.core.quantum.Quantum, metadata: Any, taskDef: lsst.pipe.base.pipeline.TaskDef, butler: lsst.daf.butler._butler.Butler) → None