InMemoryRepo

class lsst.pipe.base.tests.mocks.InMemoryRepo(*args: str | ResourcePath, registry_config: RegistryConfig | None = None, input_run: str = 'input_run', input_chain: str = 'input_chain', output_run: str = 'output_run', use_import_collections_as_input: bool | str | Iterable[str] = True, data_root: str | ParseResult | ResourcePath | Path | None = 'resource://lsst.daf.butler/tests/registry_data')

Bases: object

A test helper that simulates a butler repository for task execution without any disk I/O.

Parameters:
*argsstr or lsst.resources.ResourcePath

Butler YAML import files to load into the test repository.

registry_configlsst.daf.butler.RegistryConfig, optional

Registry configuration for the repository.

input_runstr, optional

Name of a RUN collection that will be used as an input to quantum graph generation. Input datasets created by the helper are added to this collection.

input_chainstr, optional

Name of a CHAINED collection that will be the direct input to quantum graph generation. This always includes input_run.

output_runstr, optional

Name of a RUN collection for execution outputs.

use_import_collections_as_inputbool str, or Iterable [ str], optional

Additional collections from YAML import files to include in input_chain, or True to include all such collections (in chain-flattened lexicographical order).

data_rootconvertible to lsst.resources.ResourcePath, optional

Root directory to join to each element in *args. Defaults to the lsst.daf.butler.tests.registry_data package.

Notes

This helper maintains an pipeline_graph.PipelineGraph and a no-datastore butler backed by an in-memory SQLite database for use in quantum graph generation.

Methods Summary

add_task([label, task_class, config, ...])

Add a task to the helper's pipeline graph.

insert_datasets(dataset_type[, register])

Insert input datasets into the test repository.

make_limited_butler()

Make a test limited butler for execution.

make_quantum_graph_builder(*[, ...])

Make a quantum graph builder from the pipeline task and internal data repository.

make_single_quantum_executor()

Make a single-quantum executor backed by a new limited butler.

Methods Documentation

add_task(label: str | None = None, *, task_class: type[lsst.pipe.base.tests.mocks._pipeline_task.DynamicTestPipelineTask] = <class 'lsst.pipe.base.tests.mocks._pipeline_task.DynamicTestPipelineTask'>, config: ~lsst.pipe.base.tests.mocks._pipeline_task.DynamicTestPipelineTaskConfig | None = None, dimensions: ~collections.abc.Iterable[str] | None = None, inputs: ~collections.abc.Mapping[str, ~lsst.pipe.base.tests.mocks._pipeline_task.DynamicConnectionConfig] | None = None, outputs: ~collections.abc.Mapping[str, ~lsst.pipe.base.tests.mocks._pipeline_task.DynamicConnectionConfig] | None = None, prerequisite_inputs: ~collections.abc.Mapping[str, ~lsst.pipe.base.tests.mocks._pipeline_task.DynamicConnectionConfig] | None = None, init_inputs: ~collections.abc.Mapping[str, ~lsst.pipe.base.tests.mocks._pipeline_task.DynamicConnectionConfig] | None = None, init_outputs: ~collections.abc.Mapping[str, ~lsst.pipe.base.tests.mocks._pipeline_task.DynamicConnectionConfig] | None = None) None

Add a task to the helper’s pipeline graph.

Parameters:
labelstr, optional

Label for the task. If not provided, the task name will be task_auto{self.last_auto_task_index}, with that variable incremented.

task_classtype, optional

Subclass of DynamicTestPipelineTask to use.

configDynamicTestPipelineTaskConfig, optional

Task configuration to use. Note that the dimensions are always overridden by the dimensions argument and inputs and outputs are updated by those arguments unless they are explicitly set to empty dictionaries.

dimensionsIterable [ str ], optional

Dimensions of the task and any automatically-added input or output connection.

inputsMapping [ str, DynamicConnectionConfig ], optional

Input connections to add. If not provided, a single connection is added with the same dimensions as the task and dataset type name dataset_auto{self.last_auto_dataset_type_index}.

outputsMapping [ str, DynamicConnectionConfig ], optional

Output connections to add. If not provided, a single connection is added with the same dimensions as the task and dataset type name dataset_auto{self.last_auto_dataset_type_index}, with that variable incremented first.

prerequisite_inputsMapping [ str, DynamicConnectionConfig ], optional

Prerequisite input connections to add. Defaults to an empty mapping.

init_inputsMapping [ str, DynamicConnectionConfig ], optional

Init input connections to add. Defaults to an empty mapping.

init_outputsMapping [ str, DynamicConnectionConfig ], optional

Init output connections to add. Defaults to an empty mapping.

Notes

The defaults for this method’s arguments are designed to allow it to be called in succession to create a sequence of “one-to-one” tasks in which each consumes the output of the last.

insert_datasets(dataset_type: DatasetType | str, register: bool = True, *args: Any, **kwargs: Any) list[lsst.daf.butler._dataset_ref.DatasetRef]

Insert input datasets into the test repository.

Parameters:
dataset_typeDatasetType or str

Dataset type or name. If a name, it must be included in the pipeline graph.

registerbool, optional

Whether to register the dataset type. If False, the dataset type must already be registered.

*argsobject

Forwarded to query_data_ids.

**kwargsobject

Forwarded to query_data_ids.

Returns:
refslist [ lsst.daf.butler.DatasetRef ]

References to the inserted datasets.

Notes

For dataset types with dimensions that are queryable, this queries for all data IDs in the repository (forwarding *args and **kwargs for e.g. where strings). For skypix dimensions, this queries for both patches and visit-detector regions (forwarding *args` and **kwargs to both) and uses all overlapping sky pixels. Dataset types with a mix of skypix and queryable dimensions are not supported.

make_limited_butler() InMemoryLimitedButler

Make a test limited butler for execution.

Returns:
limited_butlerInMemoryLimitedButler

A limited butler that can be used for task execution.

Notes

This queries the database-only butler used for quantum-graph generation for all datasets in the input_chain collection, and populates the limited butler with those that have a mock storage class. Other datasets are ignored, so they will appear as though they were present during quantum graph generation but absent during execution.

make_quantum_graph_builder(*, insert_mocked_inputs: bool = True, register_output_dataset_types: bool = True) AllDimensionsQuantumGraphBuilder

Make a quantum graph builder from the pipeline task and internal data repository.

Parameters:
insert_mocked_inputsbool, optional

Whether to automatically insert datasets for all overall inputs to the pipeline graph whose dataset types have not already been registered. If set to False, inputs must be provided by imported YAML files or explicit calls to insert_datasets, which provides more fine-grained control over the data IDs of the datasets.

register_output_dataset_typesbool, optional

If True, register all output dataset types.

Returns:
builderall_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder

Quantum graph builder. Note that attach_datastore_records=False must be passed to build, since the helper’s butler does not have a datastore.

make_single_quantum_executor() tuple[lsst.pipe.base.single_quantum_executor.SingleQuantumExecutor, lsst.pipe.base.tests.in_memory_limited_butler.InMemoryLimitedButler]

Make a single-quantum executor backed by a new limited butler.

Returns:
executorsingle_quantum_executor.SingleQuantumExecutor

An executor for a single quantum.

butlerInMemoryLimitedButler

The butler that the executor will write to.