Pipeline

class lsst.pipe.base.Pipeline(description: str)

Bases: object

A Pipeline is a representation of a series of tasks to run, and the configuration for those tasks.

Parameters:
descriptionstr

A description of that this pipeline does.

Attributes Summary

subsets

Returns a MappingProxyType where the keys are the labels of labeled subsets in the Pipeline and the values are the set of task labels contained within that subset.

task_labels

Labels of all tasks in the pipelines.

Methods Summary

addConfigFile(label, filename)

Add overrides from a specified file.

addConfigOverride(label, key, value)

Apply single config override.

addConfigPython(label, pythonString)

Add Overrides by running a snippet of python code against a config.

addInstrument(instrument)

Add an instrument to the pipeline, or replace an instrument that is already defined.

addLabelToSubset(subset, label)

Add a task label from the specified subset.

addLabeledSubset(label, description, taskLabels)

Add a new labeled subset to the Pipeline.

addTask(task, label)

Add a new task to the pipeline, or replace a task that is already associated with the supplied label.

findSubsetsWithLabel(label)

Find any subsets which may contain the specified label.

fromFile(filename)

Load a pipeline defined in a pipeline yaml file.

fromIR(deserialized_pipeline)

Create a pipeline from an already created PipelineIR object.

fromPipeline(pipeline)

Create a new pipeline by copying an already existing Pipeline.

fromString(pipeline_string)

Create a pipeline from string formatted as a pipeline document.

from_uri(uri)

Load a pipeline defined in a pipeline yaml file at a location specified by a URI.

getInstrument()

Get the instrument from the pipeline.

get_data_id(universe)

Return a data ID with all dimension constraints embedded in the pipeline.

mergePipeline(pipeline)

Merge another in-memory Pipeline object into this one.

removeLabelFromSubset(subset, label)

Remove a task label from the specified subset.

removeLabeledSubset(label)

Remove a labeled subset from the Pipeline.

removeTask(label)

Remove a task from the pipeline.

subsetFromLabels(labelSpecifier[, subsetCtrl])

Subset a pipeline to contain only labels specified in labelSpecifier.

toExpandedPipeline()

Return a generator of TaskDefs which can be used to create quantum graphs.

to_graph([registry])

Construct a pipeline graph from this pipeline.

write_to_uri(uri)

Write the pipeline to a file or directory.

Attributes Documentation

subsets

Returns a MappingProxyType where the keys are the labels of labeled subsets in the Pipeline and the values are the set of task labels contained within that subset.

task_labels

Labels of all tasks in the pipelines.

For simple pipelines with no imports, iteration over this set will match the order in which tasks are defined in the pipeline file. In all other cases the order is unspecified but deterministic. It is not dependency-ordered (use to_graph().tasks.keys() for that).

Methods Documentation

addConfigFile(label: str, filename: str) None

Add overrides from a specified file.

Parameters:
labelstr

The label used to identify the task associated with config to modify.

filenamestr

Path to the override file.

addConfigOverride(label: str, key: str, value: object) None

Apply single config override.

Parameters:
labelstr

Label of the task.

keystr

Fully-qualified field name.

valueobject

Value to be given to a field.

addConfigPython(label: str, pythonString: str) None

Add Overrides by running a snippet of python code against a config.

Parameters:
labelstr

The label used to identity the task associated with config to modify.

pythonStringstr

A string which is valid python code to be executed. This is done with config as the only local accessible value.

addInstrument(instrument: Instrument | str) None

Add an instrument to the pipeline, or replace an instrument that is already defined.

Parameters:
instrumentInstrument or str

Either a derived class object of a lsst.daf.butler.instrument or a string corresponding to a fully qualified lsst.daf.butler.instrument name.

addLabelToSubset(subset: str, label: str) None

Add a task label from the specified subset.

Parameters:
subsetstr

The labeled subset to modify.

labelstr

The task label to add to the specified subset.

Raises:
ValueError

Raised if the specified subset does not exist within the pipeline. Raised if the specified label does not exist within the pipeline.

addLabeledSubset(label: str, description: str, taskLabels: set[str]) None

Add a new labeled subset to the Pipeline.

Parameters:
labelstr

The label to assign to the subset.

descriptionstr

A description of what the subset is for.

taskLabelsset [str]

The set of task labels to be associated with the labeled subset.

Raises:
ValueError

Raised if label already exists in the Pipeline. Raised if a task label is not found within the Pipeline.

addTask(task: type[lsst.pipe.base.pipelineTask.PipelineTask] | str, label: str) None

Add a new task to the pipeline, or replace a task that is already associated with the supplied label.

Parameters:
taskPipelineTask or str

Either a derived class object of a PipelineTask or a string corresponding to a fully qualified PipelineTask name.

labelstr

A label that is used to identify the PipelineTask being added.

findSubsetsWithLabel(label: str) set[str]

Find any subsets which may contain the specified label.

This function returns the name of subsets which return the specified label. May return an empty set if there are no subsets, or no subsets containing the specified label.

Parameters:
labelstr

The task label to use in membership check.

Returns:
subsetsset of str

Returns a set (possibly empty) of subsets names which contain the specified label.

Raises:
ValueError

Raised if the specified label does not exist within this pipeline.

classmethod fromFile(filename: str) Pipeline

Load a pipeline defined in a pipeline yaml file.

Parameters:
filenamestr

A path that points to a pipeline defined in yaml format. This filename may also supply additional labels to be used in subsetting the loaded Pipeline. These labels are separated from the path by a #, and may be specified as a comma separated list, or a range denoted as beginning..end. Beginning or end may be empty, in which case the range will be a half open interval. Unlike python iteration bounds, end bounds are INCLUDED. Note that range based selection is not well defined for pipelines that are not linear in nature, and correct behavior is not guaranteed, or may vary from run to run.

Returns:
pipeline: Pipeline

The pipeline loaded from specified location with appropriate (if any) subsetting.

Notes

This method attempts to prune any contracts that contain labels which are not in the declared subset of labels. This pruning is done using a string based matching due to the nature of contracts and may prune more than it should.

classmethod fromIR(deserialized_pipeline: PipelineIR) Pipeline

Create a pipeline from an already created PipelineIR object.

Parameters:
deserialized_pipelinePipelineIR

An already created pipeline intermediate representation object.

Returns:
pipeline: Pipeline

The new pipeline.

classmethod fromPipeline(pipeline: Pipeline) Pipeline

Create a new pipeline by copying an already existing Pipeline.

Parameters:
pipelinePipeline

An already created pipeline intermediate representation object.

Returns:
pipeline: Pipeline

The new pipeline.

classmethod fromString(pipeline_string: str) Pipeline

Create a pipeline from string formatted as a pipeline document.

Parameters:
pipeline_stringstr

A string that is formatted according like a pipeline document.

Returns:
pipeline: Pipeline

The new pipeline.

classmethod from_uri(uri: str | ParseResult | ResourcePath | Path) Pipeline

Load a pipeline defined in a pipeline yaml file at a location specified by a URI.

Parameters:
uriconvertible to ResourcePath

If a string is supplied this should be a URI path that points to a pipeline defined in yaml format, either as a direct path to the yaml file, or as a directory containing a pipeline.yaml file the form used by write_to_uri with expand=True). This uri may also supply additional labels to be used in subsetting the loaded Pipeline. These labels are separated from the path by a #, and may be specified as a comma separated list, or a range denoted as beginning..end. Beginning or end may be empty, in which case the range will be a half open interval. Unlike python iteration bounds, end bounds are INCLUDED. Note that range based selection is not well defined for pipelines that are not linear in nature, and correct behavior is not guaranteed, or may vary from run to run. The same specifiers can be used with a ResourcePath object, by being the sole contents in the fragments attribute.

Returns:
pipelinePipeline

The pipeline loaded from specified location with appropriate (if any) subsetting.

Notes

This method attempts to prune any contracts that contain labels which are not in the declared subset of labels. This pruning is done using a string based matching due to the nature of contracts and may prune more than it should.

getInstrument() str | None

Get the instrument from the pipeline.

Returns:
instrumentstr, or None

The fully qualified name of a lsst.obs.base.Instrument subclass, name, or None if the pipeline does not have an instrument.

get_data_id(universe: DimensionUniverse) DataCoordinate

Return a data ID with all dimension constraints embedded in the pipeline.

Parameters:
universelsst.daf.butler.DimensionUniverse

Object that defines all dimensions.

Returns:
data_idlsst.daf.butler.DataCoordinate

Data ID with all dimension constraints embedded in the pipeline.

mergePipeline(pipeline: Pipeline) None

Merge another in-memory Pipeline object into this one.

This merges another pipeline into this object, as if it were declared in the import block of the yaml definition of this pipeline. This modifies this pipeline in place.

Parameters:
pipelinePipeline

The Pipeline object that is to be merged into this object.

removeLabelFromSubset(subset: str, label: str) None

Remove a task label from the specified subset.

Parameters:
subsetstr

The labeled subset to modify.

labelstr

The task label to remove from the specified subset.

Raises:
ValueError

Raised if the specified subset does not exist in the pipeline. Raised if the specified label does not exist within the specified subset.

removeLabeledSubset(label: str) None

Remove a labeled subset from the Pipeline.

Parameters:
labelstr

The label of the subset to remove from the Pipeline.

Raises:
ValueError

Raised if the label is not found within the Pipeline.

removeTask(label: str) None

Remove a task from the pipeline.

Parameters:
labelstr

The label used to identify the task that is to be removed.

Raises:
KeyError

If no task with that label exists in the pipeline.

subsetFromLabels(labelSpecifier: LabelSpecifier, subsetCtrl: PipelineSubsetCtrl = PipelineSubsetCtrl.DROP) Pipeline

Subset a pipeline to contain only labels specified in labelSpecifier.

Parameters:
labelSpecifierlabelSpecifier

Object containing labels that describes how to subset a pipeline.

subsetCtrlPipelineSubsetCtrl

Control object which decides how subsets with missing labels are handled. Setting to PipelineSubsetCtrl.DROP (the default) will cause any subsets that have labels which are not in the set of all task labels to be dropped. Setting to PipelineSubsetCtrl.EDIT will cause the subset to instead be edited to remove the nonexistent label.

Returns:
pipelinePipeline

A new pipeline object that is a subset of the old pipeline.

Raises:
ValueError

Raised if there is an issue with specified labels

Notes

This method attempts to prune any contracts that contain labels which are not in the declared subset of labels. This pruning is done using a string based matching due to the nature of contracts and may prune more than it should.

toExpandedPipeline() Generator[TaskDef, None, None]

Return a generator of TaskDefs which can be used to create quantum graphs.

Returns:
generatorgenerator of TaskDef

The generator returned will be the sorted iterator of tasks which are to be used in constructing a quantum graph.

Raises:
NotImplementedError

If a dataId is supplied in a config block. This is in place for future use.

Deprecated since version v27.0: Deprecated in favor of to_graph; will be removed after v27.

to_graph(registry: Registry | None = None) PipelineGraph

Construct a pipeline graph from this pipeline.

Constructing a graph applies all configuration overrides, freezes all configuration, checks all contracts, and checks for dataset type consistency between tasks (as much as possible without access to a data repository). It cannot be reversed.

Parameters:
registrylsst.daf.butler.Registry, optional

Data repository client. If provided, the graph’s dataset types and dimensions will be resolved (see PipelineGraph.resolve).

Returns:
graphpipeline_graph.PipelineGraph

Representation of the pipeline as a graph.

write_to_uri(uri: str | ParseResult | ResourcePath | Path) None

Write the pipeline to a file or directory.

Parameters:
uriconvertible to ResourcePath

URI to write to; may have any scheme with ResourcePath write support or no scheme for a local file/directory. Should have a .yaml extension.