GenericWorkflow

class lsst.ctrl.bps.GenericWorkflow(name, incoming_graph_data=None, **attr)

Bases: networkx.classes.digraph.DiGraph

A generic representation of a workflow used to submit to specific workflow management systems.

Parameters:
name : str

Name of generic workflow.

incoming_graph_data : Any, optional

Data used to initialized graph that is passed through to DiGraph constructor. Can be any type supported by networkx.DiGraph.

attr : dict

Keyword arguments passed through to DiGraph constructor.

Attributes Summary

job_counts Counts of jobs per job label in workflow (collections.Counter).
name Retrieve name of generic workflow.
quanta_counts Counts of quanta per task label in workflow (collections.Counter).

Methods Summary

add_edge(u_of_edge, v_of_edge, **attr) Add edge connecting jobs in workflow.
add_edges_from(ebunch_to_add, **attr) Add several edges between jobs in the generic workflow.
add_executable(executable) Add executable to workflow’s list of executables.
add_file(gwfile) Add file object.
add_final(final) Add special final job/workflow to the generic workflow.
add_job(job[, parent_names, child_names]) Add job to generic workflow.
add_job_inputs(job_name, files) Add files as inputs to specified job.
add_job_outputs(job_name, files) Add output files to a job.
add_job_relationships(parents, children) Add dependencies between parent and child jobs.
add_node(node_for_adding, **attr) Override networkx function to call more specific add_job function.
add_workflow_source(workflow) Add given workflow as new source to this workflow.
del_job(job_name) Delete job from generic workflow leaving connected graph.
draw(stream[, format_]) Output generic workflow in a visualization format.
get_executables([data, transfer_only]) Retrieve executables from generic workflow.
get_file(name) Retrieve a file object by name.
get_files([data, transfer_only]) Retrieve files from generic workflow.
get_final() Return job/workflow to be executed after all jobs that can be executed have been executed regardless of exit status of any of the jobs.
get_job(job_name) Retrieve job by name from workflow.
get_job_inputs(job_name[, data, transfer_only]) Return the input files for the given job.
get_job_outputs(job_name[, data, transfer_only]) Return the output files for the given job.
load(stream[, format_]) Load a GenericWorkflow from the given stream
save(stream[, format_]) Save the generic workflow in a format that is loadable.
validate() Run checks to ensure this is still a valid generic workflow graph.

Attributes Documentation

job_counts

Counts of jobs per job label in workflow (collections.Counter).

name

Retrieve name of generic workflow.

Returns:
name : str

Name of generic workflow.

quanta_counts

Counts of quanta per task label in workflow (collections.Counter).

Methods Documentation

add_edge(u_of_edge: str, v_of_edge: str, **attr)

Add edge connecting jobs in workflow.

Parameters:
u_of_edge : str

Name of parent job.

v_of_edge : str

Name of child job.

attr : keyword arguments, optional

Attributes to save with edge.

add_edges_from(ebunch_to_add, **attr)

Add several edges between jobs in the generic workflow.

Parameters:
ebunch_to_add : Iterable [tuple]

Iterable of job name pairs between which a dependency should be saved.

attr : keyword arguments, optional

Data can be assigned using keyword arguments (not currently used).

add_executable(executable)

Add executable to workflow’s list of executables.

Parameters:
executable : lsst.ctrl.bps.GenericWorkflowExec

Executable object to be added to workflow.

add_file(gwfile)

Add file object.

Parameters:
gwfile : lsst.ctrl.bps.GenericWorkflowFile

File object to add to workflow

add_final(final)

Add special final job/workflow to the generic workflow.

Parameters:
final : lsst.ctrl.bps.GenericWorkflowJob or lsst.ctrl.bps.GenericWorkflow

Information needed to execute the special final job(s), the job(s) to be executed after all jobs that can be executed have been executed regardless of exit status of any of the jobs.

add_job(job, parent_names=None, child_names=None)

Add job to generic workflow.

Parameters:
job : lsst.ctrl.bps.GenericWorkflowJob

Job to add to the generic workflow.

parent_names : list [str], optional

Names of jobs that are parents of given job

child_names : list [str], optional

Names of jobs that are children of given job

add_job_inputs(job_name, files)

Add files as inputs to specified job.

Parameters:
job_name : str

Name of job to which inputs should be added

files : lsst.ctrl.bps.GenericWorkflowFile or list [lsst.ctrl.bps.GenericWorkflowFile]

File object(s) to be added as inputs to the specified job.

add_job_outputs(job_name, files)

Add output files to a job.

Parameters:
job_name : str

Name of job to which the files should be added as outputs.

files : list [lsst.ctrl.bps.GenericWorkflowFile]

File objects to be added as outputs for specified job.

add_job_relationships(parents, children)

Add dependencies between parent and child jobs. All parents will be connected to all children.

Parameters:
parents : list [str]

Parent job names.

children : list [str]

Children job names.

add_node(node_for_adding, **attr)

Override networkx function to call more specific add_job function.

Parameters:
node_for_adding : lsst.ctrl.bps.GenericWorkflowJob

Job to be added to generic workflow.

attr :

Needed to match original networkx function, but not used.

add_workflow_source(workflow)

Add given workflow as new source to this workflow.

Parameters:
workflow : lsst.ctrl.bps.GenericWorkflow
del_job(job_name: str)

Delete job from generic workflow leaving connected graph.

Parameters:
job_name : str

Name of job to delete from workflow.

draw(stream, format_='dot')

Output generic workflow in a visualization format.

Parameters:
stream : str or io.BufferedIOBase

Stream to which the visualization should be written.

format_ : str, optional

Which visualization format to use. It defaults to the format for the dot program.

get_executables(data=False, transfer_only=True)

Retrieve executables from generic workflow.

Parameters:
data : bool, optional

Whether to return the executable data as well as the exec object name. (The defaults is False.)

transfer_only : bool, optional

Whether to only return executables for which transfer_executable is True.

Returns:
execs : list [lsst.ctrl.bps.GenericWorkflowExec] or list [str]

Filtered executable names or objects from generic workflow.

get_file(name)

Retrieve a file object by name.

Parameters:
name : str

Name of file object

Returns:
gwfile : lsst.ctrl.bps.GenericWorkflowFile

File matching given name.

get_files(data=False, transfer_only=True)

Retrieve files from generic workflow.

Need API in case change way files are stored (e.g., make workflow a bipartite graph with jobs and files nodes).

Parameters:
data : bool, optional

Whether to return the file data as well as the file object name. (The defaults is False.)

transfer_only : bool, optional

Whether to only return files for which a workflow management system would be responsible for transferring.

Returns:
files : list [lsst.ctrl.bps.GenericWorkflowFile] or list [str]

File names or objects from generic workflow meeting specifications.

get_final()

Return job/workflow to be executed after all jobs that can be executed have been executed regardless of exit status of any of the jobs.

Returns:
final : lsst.ctrl.bps.GenericWorkflowJob or lsst.ctrl.bps.GenericWorkflow

Information needed to execute final job(s).

get_job(job_name: str)

Retrieve job by name from workflow.

Parameters:
job_name : str

Name of job to retrieve.

Returns:
job : lsst.ctrl.bps.GenericWorkflowJob

Job matching given job_name.

get_job_inputs(job_name, data=True, transfer_only=False)

Return the input files for the given job.

Parameters:
job_name : str

Name of the job.

data : bool, optional

Whether to return the file data as well as the file object name.

transfer_only : bool, optional

Whether to only return files for which a workflow management system would be responsible for transferring.

Returns:
inputs : list [lsst.ctrl.bps.GenericWorkflowFile]

Input files for the given job. If no input files for the job, returns an empty list.

get_job_outputs(job_name, data=True, transfer_only=False)

Return the output files for the given job.

Parameters:
job_name : str

Name of the job.

data : bool

Whether to return the file data as well as the file object name. It defaults to True thus returning file data as well.

transfer_only : bool

Whether to only return files for which a workflow management system would be responsible for transferring. It defaults to False thus returning all output files.

Returns:
outputs : list [lsst.ctrl.bps.GenericWorkflowFile]

Output files for the given job. If no output files for the job, returns an empty list.

classmethod load(stream, format_='pickle')

Load a GenericWorkflow from the given stream

Parameters:
stream : str or io.BufferedIOBase

Stream to pass to the format-specific loader. Accepts anything that the loader accepts.

format_ : str, optional

Format of data to expect when loading from stream. It defaults to pickle format.

Returns:
generic_workflow : lsst.ctrl.bps.GenericWorkflow

Generic workflow loaded from the given stream

save(stream, format_='pickle')

Save the generic workflow in a format that is loadable.

Parameters:
stream : str or io.BufferedIOBase

Stream to pass to the format-specific writer. Accepts anything that the writer accepts.

format_ : str, optional

Format in which to write the data. It defaults to pickle format.

validate()

Run checks to ensure this is still a valid generic workflow graph.