SimplePipelineExecutor

class lsst.ctrl.mpexec.SimplePipelineExecutor(quantum_graph: lsst.pipe.base.graph.graph.QuantumGraph, butler: lsst.daf.butler._butler.Butler)

Bases: object

A simple, high-level executor for pipelines.

Parameters:
quantum_graph : QuantumGraph

Graph to be executed.

butler : Butler

Object that manages all I/O. Must be initialized with collections and run properties that correspond to the input and output collections, which must be consistent with those used to create quantum_graph.

Notes

Most callers should use one of the classmethod factory functions (from_pipeline_filename, from_task_class, from_pipeline) instead of invoking the constructor directly; these guarantee that the Butler and QuantumGraph are created consistently.

This class is intended primarily to support unit testing and small-scale integration testing of PipelineTask classes. It deliberately lacks many features present in the command-line-only pipetask tool in order to keep the implementation simple. Python callers that need more sophistication should call lower-level tools like GraphBuilder, PreExecInit, and SingleQuantumExecutor directly.

Methods Summary

as_generator(register_dataset_types, …) Yield quanta in the QuantumGraph in topological order.
from_pipeline(pipeline, …) Create an executor by building a QuantumGraph from an in-memory pipeline.
from_pipeline_filename(pipeline_filename, *, …) Create an executor by building a QuantumGraph from an on-disk pipeline YAML file.
from_task_class(task_class, config, …) Create an executor by building a QuantumGraph from a pipeline containing a single task.
prep_butler(root, inputs, output, …) Helper method for creating Butler instances with collections appropriate for processing.
run(register_dataset_types, save_versions) Run all the quanta in the QuantumGraph in topological order.

Methods Documentation

as_generator(register_dataset_types: bool = False, save_versions: bool = True) → collections.abc.Iterator[lsst.daf.butler.core.quantum.Quantum]

Yield quanta in the QuantumGraph in topological order.

These quanta will be run as the returned generator is iterated over. Use this method to run the quanta one at a time. Use run to run all quanta in the graph.

Parameters:
register_dataset_types : bool, optional

If True, register all output dataset types before executing any quanta.

save_versions : bool, optional

If True (default), save a package versions dataset.

Returns:
quanta : Iterator [ Quantum ]

Executed quanta. At present, these will contain only unresolved DatasetRef instances for output datasets, reflecting the state of the quantum just before it was run (but after any adjustments for predicted but now missing inputs). This may change in the future to include resolved output DatasetRef objects.

Notes

Global initialization steps (see PreExecInit) are performed immediately when this method is called, but individual quanta are not actually executed until the returned iterator is iterated over.

A topological ordering is not in general unique, but no other guarantees are made about the order in which quanta are processed.

classmethod from_pipeline(pipeline: Union[lsst.pipe.base.pipeline.Pipeline, collections.abc.Iterable[lsst.pipe.base.pipeline.TaskDef]], *, where: str = '', bind: Optional[collections.abc.Mapping[str, Any], None] = None, butler: lsst.daf.butler._butler.Butler, **kwargs) → lsst.ctrl.mpexec.simple_pipeline_executor.SimplePipelineExecutor

Create an executor by building a QuantumGraph from an in-memory pipeline.

Parameters:
pipeline : Pipeline or Iterable [ TaskDef ]

A Python object describing the tasks to run, along with their labels and configuration.

where : str, optional

Data ID query expression that constraints the quanta generated.

bind : Mapping, optional

Mapping containing literal values that should be injected into the where expression, keyed by the identifiers they replace.

butler : Butler

Butler that manages all I/O. prep_butler can be used to create one.

Returns:
executor : SimplePipelineExecutor

An executor instance containing the constructed QuantumGraph and Butler, ready for run to be called.

classmethod from_pipeline_filename(pipeline_filename: str, *, where: str = '', bind: Optional[collections.abc.Mapping[str, Any], None] = None, butler: lsst.daf.butler._butler.Butler) → lsst.ctrl.mpexec.simple_pipeline_executor.SimplePipelineExecutor

Create an executor by building a QuantumGraph from an on-disk pipeline YAML file.

Parameters:
pipeline_filename : str

Name of the YAML file to load the pipeline definition from.

where : str, optional

Data ID query expression that constraints the quanta generated.

bind : Mapping, optional

Mapping containing literal values that should be injected into the where expression, keyed by the identifiers they replace.

butler : Butler

Butler that manages all I/O. prep_butler can be used to create one.

Returns:
executor : SimplePipelineExecutor

An executor instance containing the constructed QuantumGraph and Butler, ready for run to be called.

classmethod from_task_class(task_class: Type[lsst.pipe.base.pipelineTask.PipelineTask], config: Optional[lsst.pex.config.config.Config, None] = None, label: Optional[str, None] = None, *, where: str = '', bind: Optional[collections.abc.Mapping[str, Any], None] = None, butler: lsst.daf.butler._butler.Butler) → lsst.ctrl.mpexec.simple_pipeline_executor.SimplePipelineExecutor

Create an executor by building a QuantumGraph from a pipeline containing a single task.

Parameters:
task_class : type

A concrete PipelineTask subclass.

config : Config, optional

Configuration for the task. If not provided, task-level defaults will be used (no per-instrument overrides).

label : str, optional

Label for the task in its pipeline; defaults to task_class._DefaultName.

where : str, optional

Data ID query expression that constraints the quanta generated.

bind : Mapping, optional

Mapping containing literal values that should be injected into the where expression, keyed by the identifiers they replace.

butler : Butler

Butler that manages all I/O. prep_butler can be used to create one.

Returns:
executor : SimplePipelineExecutor

An executor instance containing the constructed QuantumGraph and Butler, ready for run to be called.

classmethod prep_butler(root: str, inputs: collections.abc.Iterable[str], output: str, output_run: Optional[str, None] = None) → lsst.daf.butler._butler.Butler

Helper method for creating Butler instances with collections appropriate for processing.

Parameters:
root : str

Root of the butler data repository; must already exist, with all necessary input data.

inputs : Iterable [ str ]

Collections to search for all input datasets, in search order.

output : str

Name of a new output CHAINED collection to create that will combine both inputs and outputs.

output_run : str, optional

Name of the output RUN that will directly hold all output datasets. If not provided, a name will be created from output and a timestamp.

Returns:
butler : Butler

Butler client instance compatible with all classmethod factories. Always writeable.

run(register_dataset_types: bool = False, save_versions: bool = True) → List[lsst.daf.butler.core.quantum.Quantum]

Run all the quanta in the QuantumGraph in topological order.

Use this method to run all quanta in the graph. Use as_generator to get a generator to run the quanta one at a time.

Parameters:
register_dataset_types : bool, optional

If True, register all output dataset types before executing any quanta.

save_versions : bool, optional

If True (default), save a package versions dataset.

Returns:
quanta : List [ Quantum ]

Executed quanta. At present, these will contain only unresolved DatasetRef instances for output datasets, reflecting the state of the quantum just before it was run (but after any adjustments for predicted but now missing inputs). This may change in the future to include resolved output DatasetRef objects.

Notes

A topological ordering is not in general unique, but no other guarantees are made about the order in which quanta are processed.