GenericWorkflow#
- class lsst.ctrl.bps.GenericWorkflow(name: str, incoming_graph_data: Any | None = None, **attr: Any)#
Bases:
DiGraphA generic representation of a workflow used to submit to specific workflow management systems.
Parameters#
- name
str Name of generic workflow.
- incoming_graph_data
Any, optional Data used to initialized graph that is passed through to DiGraph constructor. Can be any type supported by networkx.DiGraph.
- **attr
dict Keyword arguments passed through to DiGraph constructor.
Attributes Summary
Count of jobs per job label (
collections.Counter).Job labels (
list[str], read-only).Retrieve name of generic workflow.
Count of quanta per task label (
collections.Counter).Methods Summary
add_edge(u_of_edge, v_of_edge, **attr)Add edge connecting jobs in workflow.
add_edges_from(ebunch_to_add, **attr)Add several edges between jobs in the generic workflow.
add_executable(executable)Add executable to workflow's list of executables.
add_file(gwfile)Add file object.
add_final(final)Add special final job/workflow to the generic workflow.
add_job(job[, parent_names, child_names])Add job to generic workflow.
add_job_inputs(job_name, files)Add files as inputs to specified job.
add_job_outputs(job_name, files)Add output files to a job.
add_job_relationships(parents, children)Add dependencies between parent and child jobs.
add_node(node_for_adding, **attr)Override networkx function to call more specific add_job function.
add_special_job_ordering(ordering)Add special nodes and dependencies to enforce given ordering.
add_workflow_source(workflow)Add given workflow as new source to this workflow.
del_job(job_name)Delete job from generic workflow leaving connected graph.
draw(stream[, format_])Output generic workflow in a visualization format.
Retrieve executables from generic workflow.
get_file(name)Retrieve a file object by name.
Retrieve files from generic workflow.
Return job/workflow to be executed after all jobs that can be executed have been executed regardless of exit status of any of the jobs.
get_job(job_name)Retrieve job by name from workflow.
Return the input files for the given job.
Return the output files for the given job.
get_jobs_by_label(label)Retrieve jobs by label from workflow.
load(stream[, format_])Load a GenericWorkflow from the given stream.
Regenerate the list of job labels.
save(stream[, format_])Save the generic workflow in a format that is loadable.
validate()Run checks to ensure that the generic workflow graph is valid.
Attributes Documentation
- job_counts#
Count of jobs per job label (
collections.Counter).
- labels#
Job labels (
list[str], read-only).
- quanta_counts#
Count of quanta per task label (
collections.Counter).
Methods Documentation
- add_edge(u_of_edge: str, v_of_edge: str, **attr: Any) None#
Add edge connecting jobs in workflow.
Parameters#
- u_of_edge
str Name of parent job.
- v_of_edge
str Name of child job.
- **attrkeyword arguments, optional
Attributes to save with edge.
- u_of_edge
- add_edges_from(ebunch_to_add: Iterable[tuple[str, str]], **attr: Any) None#
Add several edges between jobs in the generic workflow.
Parameters#
- ebunch_to_addIterable [
tuple[str,str]] Iterable of job name pairs between which a dependency should be saved.
- **attrkeyword arguments, optional
Data can be assigned using keyword arguments (not currently used).
- ebunch_to_addIterable [
- add_executable(executable: GenericWorkflowExec | None) None#
Add executable to workflow’s list of executables.
Parameters#
- executable
lsst.ctrl.bps.GenericWorkflowExec Executable object to be added to workflow.
- executable
- add_file(gwfile: GenericWorkflowFile) None#
Add file object.
Parameters#
- gwfile
lsst.ctrl.bps.GenericWorkflowFile File object to add to workflow.
- gwfile
- add_final(final: GenericWorkflowJob | GenericWorkflow) None#
Add special final job/workflow to the generic workflow.
Parameters#
- final
lsst.ctrl.bps.GenericWorkflowJoborlsst.ctrl.bps.GenericWorkflow Information needed to execute the special final job(s), the job(s) to be executed after all jobs that can be executed have been executed regardless of exit status of any of the jobs.
- final
- add_job(job: GenericWorkflowNode, parent_names: str | list[str] | None = None, child_names: str | list[str] | None = None) None#
Add job to generic workflow.
Parameters#
- job
lsst.ctrl.bps.GenericWorkflowNode Job to add to the generic workflow.
- parent_names
str|list[str], optional Names of jobs that are parents of given job.
- child_names
str|list[str], optional Names of jobs that are children of given job.
- job
- add_job_inputs(job_name: str, files: GenericWorkflowFile | list[GenericWorkflowFile]) None#
Add files as inputs to specified job.
Parameters#
- job_name
str Name of job to which inputs should be added.
- files
lsst.ctrl.bps.GenericWorkflowFileorlist[lsst.ctrl.bps.GenericWorkflowFile] File object(s) to be added as inputs to the specified job.
- job_name
- add_job_outputs(job_name: str, files: list[GenericWorkflowFile]) None#
Add output files to a job.
Parameters#
- job_name
str Name of job to which the files should be added as outputs.
- files
list[lsst.ctrl.bps.GenericWorkflowFile] File objects to be added as outputs for specified job.
- job_name
- add_job_relationships(parents: str | list[str] | None, children: str | list[str] | None) None#
Add dependencies between parent and child jobs. All parents will be connected to all children.
Parameters#
- parents
strorlist[str], optional Parent job names.
- children
strorlist[str], optional Children job names.
- parents
- add_node(node_for_adding: GenericWorkflowNode, **attr: Any) None#
Override networkx function to call more specific add_job function.
Parameters#
- node_for_adding
lsst.ctrl.bps.GenericWorkflowJob Job to be added to generic workflow.
- **attr
Needed to match original networkx function, but not used.
- node_for_adding
- add_special_job_ordering(ordering: dict[str, Any]) None#
Add special nodes and dependencies to enforce given ordering.
Parameters#
- ordering
dict[str,Any] Description of the job ordering to enforce.
- ordering
- add_workflow_source(workflow: GenericWorkflow) None#
Add given workflow as new source to this workflow.
Parameters#
- workflow
lsst.ctrl.bps.GenericWorkflow The given workflow.
- workflow
- del_job(job_name: str) None#
Delete job from generic workflow leaving connected graph.
Parameters#
- job_name
str Name of job to delete from workflow.
- job_name
- draw(stream: str | IO[str], format_: str = 'dot') None#
Output generic workflow in a visualization format.
Parameters#
- stream
strorio.BufferedIOBase Stream to which the visualization should be written.
- format_
str, optional Which visualization format to use. It defaults to the format for the dot program.
- stream
- get_executables(data: Literal[False], transfer_only: bool = False) list[str]#
- get_executables(data: Literal[True], transfer_only: bool = False) list[GenericWorkflowExec]
Retrieve executables from generic workflow.
Parameters#
- data
bool, optional Whether to return the executable data as well as the exec object name (The defaults is False).
- transfer_only
bool, optional Whether to only return executables for which transfer_executable is True.
Returns#
- execs
list[lsst.ctrl.bps.GenericWorkflowExec] orlist[str] Filtered executable names or objects from generic workflow.
- data
- get_file(name: str) GenericWorkflowFile#
Retrieve a file object by name.
Parameters#
- name
str Name of file object.
Returns#
- gwfile
lsst.ctrl.bps.GenericWorkflowFile File matching given name.
- name
- get_files(data: Literal[False], transfer_only: bool = True) list[str]#
- get_files(data: Literal[True], transfer_only: bool = True) list[GenericWorkflowFile]
Retrieve files from generic workflow.
Need API in case change way files are stored (e.g., make workflow a bipartite graph with jobs and files nodes).
Parameters#
- data
bool, optional Whether to return the file data as well as the file object name (The default is False).
- transfer_only
bool, optional Whether to only return files for which a workflow management system would be responsible for transferring.
Returns#
- files
list[lsst.ctrl.bps.GenericWorkflowFile] orlist[str] File names or objects from generic workflow meeting specifications.
- data
- get_final() GenericWorkflowJob | GenericWorkflow | None#
Return job/workflow to be executed after all jobs that can be executed have been executed regardless of exit status of any of the jobs.
Returns#
- final
lsst.ctrl.bps.GenericWorkflowJoborlsst.ctrl.bps.GenericWorkflow Information needed to execute final job(s).
- final
- get_job(job_name: str) GenericWorkflowNode#
Retrieve job by name from workflow.
Parameters#
- job_name
str Name of job to retrieve.
Returns#
- job
lsst.ctrl.bps.GenericWorkflowNode Job matching given job_name.
- job_name
- get_job_inputs(job_name: str, data: Literal[False], transfer_only: bool = False) list[str]#
- get_job_inputs(job_name: str, data: Literal[True], transfer_only: bool = False) list[GenericWorkflowFile]
Return the input files for the given job.
Parameters#
- job_name
str Name of the job.
- data
bool, optional Whether to return the file data as well as the file object name.
- transfer_only
bool, optional Whether to only return files for which a workflow management system would be responsible for transferring.
Returns#
- inputs
list[lsst.ctrl.bps.GenericWorkflowFile] orlist[str] Input files for the given job. If no input files for the job, returns an empty list.
- job_name
- get_job_outputs(job_name: str, data: Literal[False], transfer_only: bool = False) list[str]#
- get_job_outputs(job_name: str, data: Literal[True], transfer_only: bool = False) list[GenericWorkflowFile]
Return the output files for the given job.
Parameters#
- job_name
str Name of the job.
- data
bool Whether to return the file data as well as the file object name. It defaults to
Truethus returning file data as well.- transfer_only
bool Whether to only return files for which a workflow management system would be responsible for transferring. It defaults to
Falsethus returning all output files.
Returns#
- outputs
list[lsst.ctrl.bps.GenericWorkflowFile] orlist[str] Output files for the given job. If no output files for the job, returns an empty list.
- job_name
- get_jobs_by_label(label: str) list[GenericWorkflowJob]#
Retrieve jobs by label from workflow.
Parameters#
- label
str Label of jobs to retrieve.
Returns#
- jobslist[
lsst.ctrl.bps.GenericWorkflowNode] Jobs having given label.
- label
- classmethod load(stream: str | IO[bytes], format_: str = 'pickle') GenericWorkflow#
Load a GenericWorkflow from the given stream.
Parameters#
- stream
strorio.BufferedIOBase Stream to pass to the format-specific loader. Accepts anything that the loader accepts.
- format_
str, optional Format of data to expect when loading from stream. It defaults to pickle format.
Returns#
- generic_workflow
lsst.ctrl.bps.GenericWorkflow Generic workflow loaded from the given stream.
- stream
- regenerate_labels() None#
Regenerate the list of job labels.
- save(stream: str | IO[bytes], format_: str = 'pickle') None#
Save the generic workflow in a format that is loadable.
Parameters#
- stream
strorio.BufferedIOBase Stream to pass to the format-specific writer. Accepts anything that the writer accepts.
- format_
str, optional Format in which to write the data. It defaults to pickle format.
- stream
- validate() None#
Run checks to ensure that the generic workflow graph is valid.
- name