GenericWorkflow

class lsst.ctrl.bps.GenericWorkflow(name, incoming_graph_data=None, **attr)

Bases: networkx.classes.digraph.DiGraph

A generic representation of a workflow used to submit to specific workflow management systems.

Parameters
namestr

Name of generic workflow.

incoming_graph_dataAny, optional

Data used to initialized graph that is passed through to nx.DiGraph constructor. Can be any type supported by networkx.DiGraph.

attrdict

Keyword arguments passed through to nx.DiGraph constructor.

Attributes Summary

name

Retrieve name of generic workflow.

Methods Summary

add_edge(u_of_edge, v_of_edge, **attr)

Add edge connecting jobs in workflow.

add_edges_from(ebunch_to_add, **attr)

Add several edges between jobs in the generic workflow.

add_job(job[, parent_names, child_names])

Add job to generic workflow.

add_job_inputs(job_name, files)

Add files as inputs to specified job.

add_job_outputs(job_name, files)

Add output files to a job.

add_job_relationships(parents, children)

Add dependencies between parent and child jobs.

add_node(node_for_adding, **attr)

Override networkx function to call more specific add_job function.

del_job(job_name)

Delete job from generic workflow leaving connected graph.

draw(stream[, format_])

Output generic workflow in a visualization format.

get_file(name)

Retrieve a file object by name.

get_files([data, transfer_only])

Retrieve files from generic workflow.

get_job(job_name)

Retrieve job by name from workflow.

get_job_inputs(job_name[, data, transfer_only])

Return the input files for the given job.

get_job_outputs(job_name[, data, transfer_only])

Return the output files for the given job.

load(stream[, format_])

Load a GenericWorkflow from the given stream

save(stream[, format_])

Save the generic workflow in a format that is loadable.

validate()

Run checks to ensure this is still a valid generic workflow graph.

Attributes Documentation

name

Retrieve name of generic workflow.

Returns
namestr

Name of generic workflow.

Methods Documentation

add_edge(u_of_edge: str, v_of_edge: str, **attr)

Add edge connecting jobs in workflow.

Parameters
u_of_edgestr

Name of parent job.

v_of_edgestr

Name of child job.

attrkeyword arguments, optional

Attributes to save with edge.

add_edges_from(ebunch_to_add, **attr)

Add several edges between jobs in the generic workflow.

Parameters
ebunch_to_addIterable of tuple of str

Iterable of job name pairs between which a dependency should be saved.

attrkeyword arguments, optional

Data can be assigned using keyword arguments (not currently used)

add_job(job, parent_names=None, child_names=None)

Add job to generic workflow.

Parameters
jobGenericWorkflowJob

Job to add to the generic workflow.

parent_nameslist of str, optional

Names of jobs that are parents of given job

child_nameslist of str, optional

Names of jobs that are children of given job

add_job_inputs(job_name: str, files)

Add files as inputs to specified job.

Parameters
job_namestr

Name of job to which inputs should be added

filesGenericWorkflowFile or list

File object(s) to be added as inputs to the specified job.

add_job_outputs(job_name, files)

Add output files to a job.

Parameters
job_namestr

Name of job to which the files should be added as outputs.

fileslist of GenericWorkflowFile

File objects to be added as outputs for specified job.

add_job_relationships(parents, children)

Add dependencies between parent and child jobs. All parents will be connected to all children.

Parameters
parentslist of str

Parent job names.

childrenlist of str

Children job names.

add_node(node_for_adding, **attr)

Override networkx function to call more specific add_job function.

Parameters
node_for_addingGenericWorkflowJob

Job to be added to generic workflow.

attr :

Needed to match original networkx function, but not used.

del_job(job_name: str)

Delete job from generic workflow leaving connected graph.

Parameters
job_namestr

Name of job to delete from workflow.

draw(stream, format_='dot')

Output generic workflow in a visualization format.

Parameters
streamstr or io.BufferedIOBase

Stream to which the visualization should be written.

format_str, optional

Which visualization format to use. It defaults to the format for the dot program.

get_file(name)

Retrieve a file object by name.

Parameters
namestr

Name of file object

Returns
file_GenericWorkflowFile

File matching given name.

get_files(data=False, transfer_only=True)

Retrieve files from generic workflow. Need API in case change way files are stored (e.g., make workflow a bipartite graph with jobs and files nodes).

Parameters
databool, optional

Whether to return the file data as well as the file object name.

transfer_onlybool, optional

Whether to only return files for which a workflow management system would be responsible for transferring.

Returns
fileslist of GenericWorkflowFile

Files from generic workflow meeting specifications.

get_job(job_name: str)

Retrieve job by name from workflow.

Parameters
job_namestr

Name of job to retrieve.

Returns
jobGenericWorkflowJob

Job matching given job_name.

get_job_inputs(job_name, data=True, transfer_only=False)

Return the input files for the given job.

Parameters
job_namestr

Name of the job.

databool, optional

Whether to return the file data as well as the file object name.

transfer_onlybool, optional

Whether to only return files for which a workflow management system would be responsible for transferring.

Returns
inputslist of GenericWorkflowFile

Input files for the given job.

get_job_outputs(job_name, data=True, transfer_only=False)

Return the output files for the given job.

Parameters
job_namestr

Name of the job.

databool

Whether to return the file data as well as the file object name. It defaults to True thus returning file data as well.

transfer_onlybool

Whether to only return files for which a workflow management system would be responsible for transferring. It defaults to False thus returning all output files.

Returns
outputslist of GenericWorkflowFile

Output files for the given job.

classmethod load(stream, format_='pickle')

Load a GenericWorkflow from the given stream

Parameters
streamstr or io.BufferedIOBase

Stream to pass to the format-specific loader. Accepts anything that the loader accepts.

format_str, optional

Format of data to expect when loading from stream. It defaults to pickle format.

Returns
generic_workflowGenericWorkflow

Generic workflow loaded from the given stream

save(stream, format_='pickle')

Save the generic workflow in a format that is loadable.

Parameters
streamstr or io.BufferedIOBase

Stream to pass to the format-specific writer. Accepts anything that the writer accepts.

format_str, optional

Format in which to write the data. It defaults to pickle format.

validate()

Run checks to ensure this is still a valid generic workflow graph.