SingleQuantumExecutor

class lsst.ctrl.mpexec.SingleQuantumExecutor(butler: Butler | None, taskFactory: TaskFactory, skipExistingIn: Any = None, clobberOutputs: bool = False, enableLsstDebug: bool = False, limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None, resources: ExecutionResources | None = None, skipExisting: bool = False, assumeNoExistingOutputs: bool = False, raise_on_partial_outputs: bool = True)

Bases: QuantumExecutor

Executor class which runs one Quantum at a time.

Parameters:
butlerButler or None

Data butler, None means that Quantum-backed butler should be used instead.

taskFactoryTaskFactory

Instance of a task factory.

skipExistingInAny

Expressions representing the collections to search for existing output datasets. See Ordered collection searches for allowed types. This class only checks for the presence of butler output run in the list of collections. If the output run is present in the list then the quanta whose complete outputs exist in the output run will be skipped. None or empty string/sequence disables skipping.

clobberOutputsbool, optional

If True, then outputs from a quantum that exist in output run collection will be removed prior to executing a quantum. If skipExistingIn contains output run, then only partial outputs from a quantum will be removed. Only used when butler is not None.

enableLsstDebugbool, optional

Enable debugging with lsstDebug facility for a task.

limited_butler_factoryCallable, optional

A method that creates a LimitedButler instance for a given Quantum. This parameter must be defined if butler is None. If butler is not None then this parameter is ignored.

resourcesExecutionResources, optional

The resources available to this quantum when executing.

skipExistingbool, optional

If True, skip quanta whose metadata datasets are already stored. Unlike skipExistingIn, this works with limited butlers as well as full butlers. Always set to True if skipExistingIn matches butler.run.

assumeNoExistingOutputsbool, optional

If True, assume preexisting outputs are impossible (e.g. because this is known by higher-level code to be a new RUN collection), and do not look for them. This causes the skipExisting and clobberOutputs options to be ignored, but unlike just setting both of those to False, it also avoids all dataset existence checks.

raise_on_partial_outputsbool, optional

If True raise exceptions chained by lsst.pipe.base.AnnotatedPartialOutputError immediately, instead of considering the partial result a success and continuing to run downstream tasks.

Methods Summary

checkExistingOutputs(quantum, task_node, /, ...)

Decide whether this quantum needs to be executed.

execute(task_node, /, quantum)

Execute single quantum.

initGlobals(quantum)

Initialize global state needed for task execution.

runQuantum(task, quantum, task_node, /, ...)

Execute task on a single quantum.

updatedQuantumInputs(quantum, task_node, /, ...)

Update quantum with extra information, returns a new updated Quantum.

writeMetadata(quantum, metadata, task_node, ...)

Methods Documentation

checkExistingOutputs(quantum: Quantum, task_node: TaskNode, /, limited_butler: LimitedButler) bool

Decide whether this quantum needs to be executed.

If only partial outputs exist then they are removed if clobberOutputs is True, otherwise an exception is raised.

The LimitedButler is used for everything, and should be set to self.butler if no separate LimitedButler is available.

Parameters:
quantumQuantum

Quantum to check for existing outputs.

task_nodeTaskNode

Task definition structure.

limited_butlerLimitedButler

Butler to use for querying and clobbering.

Returns:
existbool

True if self.skipExisting is defined, and a previous execution of this quanta appears to have completed successfully (either because metadata was written or all datasets were written). False otherwise.

Raises:
RuntimeError

Raised if some outputs exist and some not.

execute(task_node: TaskNode, /, quantum: Quantum) tuple[lsst.daf.butler._quantum.Quantum, lsst.ctrl.mpexec.reports.QuantumReport | None]

Execute single quantum.

Parameters:
task_nodeTaskNode

Task definition structure.

quantumQuantum

Quantum for this execution.

Returns:
quantumQuantum

The quantum actually executed.

reportQuantumReport

Structure describing the status of the execution of a quantum. None is returned if implementation does not support this feature.

Notes

Any exception raised by the task or code that wraps task execution is propagated to the caller of this method.

initGlobals(quantum: Quantum) None

Initialize global state needed for task execution.

Parameters:
quantumQuantum

Single Quantum instance.

Notes

There is an issue with initializing filters singleton which is done by instrument, to avoid requiring tasks to do it in runQuantum() we do it here when any dataId has an instrument dimension. Also for now we only allow single instrument, verify that all instrument names in all dataIds are identical.

This will need revision when filter singleton disappears.

runQuantum(task: PipelineTask, quantum: Quantum, task_node: TaskNode, /, limited_butler: LimitedButler) None

Execute task on a single quantum.

Parameters:
taskPipelineTask

Task object.

quantumQuantum

Single Quantum instance.

task_nodeTaskNode

Task definition structure.

limited_butlerLimitedButler

Butler to use for dataset I/O.

updatedQuantumInputs(quantum: Quantum, task_node: TaskNode, /, limited_butler: LimitedButler) Quantum

Update quantum with extra information, returns a new updated Quantum.

Some methods may require input DatasetRefs to have non-None dataset_id, but in case of intermediate dataset it may not be filled during QuantumGraph construction. This method will retrieve missing info from registry.

Parameters:
quantumQuantum

Single Quantum instance.

task_nodeTaskNode

Task definition structure.

limited_butlerLimitedButler

Butler to use for querying.

Returns:
updateQuantum

Updated Quantum instance.

writeMetadata(quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler) None