GenericWorkflow

class lsst.ctrl.bps.GenericWorkflow(name, incoming_graph_data=None, **attr)

Bases: networkx.classes.digraph.DiGraph

A generic representation of a workflow used to submit to specific workflow management systems.

Parameters:
name : str

Name of generic workflow.

incoming_graph_data : Any, optional

Data used to initialized graph that is passed through to nx.DiGraph constructor. Can be any type supported by networkx.DiGraph.

attr : dict

Keyword arguments passed through to nx.DiGraph constructor.

Attributes Summary

name Retrieve name of generic workflow.

Methods Summary

add_edge(u_of_edge, v_of_edge, **attr) Add edge connecting jobs in workflow.
add_edges_from(ebunch_to_add, **attr) Add several edges between jobs in the generic workflow.
add_job(job[, parent_names, child_names]) Add job to generic workflow.
add_job_inputs(job_name, files) Add files as inputs to specified job.
add_job_outputs(job_name, files) Add output files to a job.
add_job_relationships(parents, children) Add dependencies between parent and child jobs.
add_node(node_for_adding, **attr) Override networkx function to call more specific add_job function.
del_job(job_name) Delete job from generic workflow leaving connected graph.
draw(stream[, format_]) Output generic workflow in a visualization format.
get_file(name) Retrieve a file object by name.
get_files([data, transfer_only]) Retrieve files from generic workflow.
get_job(job_name) Retrieve job by name from workflow.
get_job_inputs(job_name[, data, transfer_only]) Return the input files for the given job.
get_job_outputs(job_name[, data, transfer_only]) Return the output files for the given job.
load(stream[, format_]) Load a GenericWorkflow from the given stream
save(stream[, format_]) Save the generic workflow in a format that is loadable.
validate() Run checks to ensure this is still a valid generic workflow graph.

Attributes Documentation

name

Retrieve name of generic workflow.

Returns:
name : str

Name of generic workflow.

Methods Documentation

add_edge(u_of_edge: str, v_of_edge: str, **attr)

Add edge connecting jobs in workflow.

Parameters:
u_of_edge : str

Name of parent job.

v_of_edge : str

Name of child job.

attr : keyword arguments, optional

Attributes to save with edge.

add_edges_from(ebunch_to_add, **attr)

Add several edges between jobs in the generic workflow.

Parameters:
ebunch_to_add : Iterable of tuple of str

Iterable of job name pairs between which a dependency should be saved.

attr : keyword arguments, optional

Data can be assigned using keyword arguments (not currently used)

add_job(job, parent_names=None, child_names=None)

Add job to generic workflow.

Parameters:
job : GenericWorkflowJob

Job to add to the generic workflow.

parent_names : list of str, optional

Names of jobs that are parents of given job

child_names : list of str, optional

Names of jobs that are children of given job

add_job_inputs(job_name: str, files)

Add files as inputs to specified job.

Parameters:
job_name : str

Name of job to which inputs should be added

files : GenericWorkflowFile or list

File object(s) to be added as inputs to the specified job.

add_job_outputs(job_name, files)

Add output files to a job.

Parameters:
job_name : str

Name of job to which the files should be added as outputs.

files : list of GenericWorkflowFile

File objects to be added as outputs for specified job.

add_job_relationships(parents, children)

Add dependencies between parent and child jobs. All parents will be connected to all children.

Parameters:
parents : list of str

Parent job names.

children : list of str

Children job names.

add_node(node_for_adding, **attr)

Override networkx function to call more specific add_job function.

Parameters:
node_for_adding : GenericWorkflowJob

Job to be added to generic workflow.

attr :

Needed to match original networkx function, but not used.

del_job(job_name: str)

Delete job from generic workflow leaving connected graph.

Parameters:
job_name : str

Name of job to delete from workflow.

draw(stream, format_='dot')

Output generic workflow in a visualization format.

Parameters:
stream : str or io.BufferedIOBase

Stream to which the visualization should be written.

format_ : str, optional

Which visualization format to use. It defaults to the format for the dot program.

get_file(name)

Retrieve a file object by name.

Parameters:
name : str

Name of file object

Returns:
file_ : GenericWorkflowFile

File matching given name.

get_files(data=False, transfer_only=True)

Retrieve files from generic workflow. Need API in case change way files are stored (e.g., make workflow a bipartite graph with jobs and files nodes).

Parameters:
data : bool, optional

Whether to return the file data as well as the file object name.

transfer_only : bool, optional

Whether to only return files for which a workflow management system would be responsible for transferring.

Returns:
files : list of GenericWorkflowFile

Files from generic workflow meeting specifications.

get_job(job_name: str)

Retrieve job by name from workflow.

Parameters:
job_name : str

Name of job to retrieve.

Returns:
job : GenericWorkflowJob

Job matching given job_name.

get_job_inputs(job_name, data=True, transfer_only=False)

Return the input files for the given job.

Parameters:
job_name : str

Name of the job.

data : bool, optional

Whether to return the file data as well as the file object name.

transfer_only : bool, optional

Whether to only return files for which a workflow management system would be responsible for transferring.

Returns:
inputs : list of GenericWorkflowFile

Input files for the given job.

get_job_outputs(job_name, data=True, transfer_only=False)

Return the output files for the given job.

Parameters:
job_name : str

Name of the job.

data : bool

Whether to return the file data as well as the file object name. It defaults to True thus returning file data as well.

transfer_only : bool

Whether to only return files for which a workflow management system would be responsible for transferring. It defaults to False thus returning all output files.

Returns:
outputs : list of GenericWorkflowFile

Output files for the given job.

classmethod load(stream, format_='pickle')

Load a GenericWorkflow from the given stream

Parameters:
stream : str or io.BufferedIOBase

Stream to pass to the format-specific loader. Accepts anything that the loader accepts.

format_ : str, optional

Format of data to expect when loading from stream. It defaults to pickle format.

Returns:
generic_workflow : GenericWorkflow

Generic workflow loaded from the given stream

save(stream, format_='pickle')

Save the generic workflow in a format that is loadable.

Parameters:
stream : str or io.BufferedIOBase

Stream to pass to the format-specific writer. Accepts anything that the writer accepts.

format_ : str, optional

Format in which to write the data. It defaults to pickle format.

validate()

Run checks to ensure this is still a valid generic workflow graph.