ParslWorkflow

class lsst.ctrl.bps.parsl.ParslWorkflow(name: str, config: BpsConfig, path: str, jobs: dict[str, lsst.ctrl.bps.parsl.job.ParslJob], parents: Mapping[str, Iterable[str]], endpoints: Iterable[str], final: ParslJob | None = None)

Bases: BaseWmsWorkflow

Parsl-based workflow object to manage execution of workflow.

Parameters:
namestr

Unique name of workflow.

configlsst.ctrl.bps.BpsConfig

Generic workflow config.

pathstr

Path prefix for workflow output files.

jobsdict mapping str to ParslJob

Jobs to be executed.

parentsdict mapping str to iterable of str

Dependency tree. Keywords are job names, and values are a list of job names that must be executed before the keyword job name can be executed.

endpointsiterable of str

Endpoints of the dependency tree. These jobs (specified by name) have no children.

finalParslJob, optional

Final job to be done, e.g., to merge the execution butler. This is done locally.

Methods Summary

execute(name)

Execute a job.

finalize_jobs()

Run final jobs.

from_generic_workflow(config, ...)

Create a ParslWorkflow object from a BPS GenericWorkflow.

initialize_jobs()

Run initial jobs.

load_dfk()

Load data frame kernel.

read(out_prefix)

Construct from the saved workflow state.

restart()

Restart the workflow after interruption.

run([block])

Run the workflow.

shutdown()

Shut down the workflow.

start()

Start the workflow.

write(out_prefix)

Write workflow state.

Methods Documentation

execute(name: str) Future | None

Execute a job.

Parameters:
namestr

Name of job to execute.

Returns:
futureFuture or None

A Future object linked to the execution of the job, or None if the job is being reserved to run locally.

finalize_jobs()

Run final jobs.

These jobs are run locally after all other jobs are complete.

This is used to merge the execution butler.

classmethod from_generic_workflow(config: BpsConfig, generic_workflow: GenericWorkflow, out_prefix: str, service_class: str) BaseWmsWorkflow

Create a ParslWorkflow object from a BPS GenericWorkflow.

Parameters:
configBpsConfig

Configuration of the workflow.

generic_workflowlsst.ctrl.bps.generic_workflow.GenericWorkflow

Generic representation of a single workflow.

out_prefixstr

Prefix for workflow output files.

service_classstr

Full module name of WMS service class that created this workflow.

Returns:
selfParslWorkflow

Constructed workflow.

initialize_jobs()

Run initial jobs.

These jobs are run locally before any other jobs are submitted to parsl.

This is used to set up the butler.

load_dfk()

Load data frame kernel.

This starts parsl.

classmethod read(out_prefix: str) ParslWorkflow

Construct from the saved workflow state.

Parameters:
out_prefixstr

Root directory to be used for WMS workflow inputs and outputs as well as internal WMS files.

Returns:
selfParslWorkflow

Constructed workflow.

restart()

Restart the workflow after interruption.

run(block: bool = True) list[concurrent.futures._base.Future | None]

Run the workflow.

Parameters:
blockbool, optional

Block returning from this method until the workflow is complete? If False, jobs may still be running when this returns, and it is the user’s responsibility to call the finalize_jobs and shutdown methods when they are complete.

Returns:
futureslist of Future

Future objects linked to the execution of the endpoint jobs.

shutdown()

Shut down the workflow.

This stops parsl.

start()

Start the workflow.

write(out_prefix: str)

Write workflow state.

This, in combination with the parsl checkpoint files, can be used to restart a workflow that was interrupted.

Parameters:
out_prefixstr

Root directory to be used for WMS workflow inputs and outputs as well as internal WMS files.