dflow.python package¶
Submodules¶
dflow.python.op module¶
- class dflow.python.op.OP(*args, **kwargs)¶
Bases:
ABC
Python class OP
- Parameters:
progress_total – an int representing total progress
progress_current – an int representing currenet progress
- classmethod convert_to_graph()¶
- create_slice_dir = False¶
- static exec_sign_check(func)¶
- classmethod from_graph(graph)¶
- classmethod function(func=None, **kwargs)¶
- classmethod get_info()¶
- get_input_artifact_link(name: str) str ¶
- get_input_artifact_storage_key(name: str) str ¶
- classmethod get_opio_info(opio_sign)¶
- get_output_artifact_link(name: str) str ¶
- get_output_artifact_storage_key(name: str) str ¶
- handle_outputs(outputs, symlink=False)¶
- key = None¶
- outputs = {}¶
- pool_size = None¶
- progress_current = 0¶
- progress_total = 1¶
- register_output_artifact(name, namespace, dataset_name, **kwargs)¶
- set_output(name, value)¶
- slices = {}¶
- classmethod superfunction(func=None, **kwargs)¶
- tmp_root = '/tmp'¶
- workflow_name = None¶
- dflow.python.op.get_source_code(o)¶
- dflow.python.op.type2opiosign(t)¶
dflow.python.opio module¶
- class dflow.python.opio.Artifact(type: Any, archive: str = 'default', save: List[PVC | S3Artifact] | None = None, optional: bool = False, global_name: str | None = None, sub_path: bool = True, **kwargs)¶
Bases:
object
OPIO signature of artifact
- Parameters:
type – str, Path, Set[str], Set[Path], List[str], List[Path], Dict[str, str], Dict[str, Path], NestedDict[str] or NestedDict[Path]
archive – compress format of the artifact, None for no compression
save – place to store the output artifact instead of default storage, can be a list
optional – optional input artifact or not
global_name – global name of the artifact within the workflow
- classmethod from_dict(d)¶
- to_dict()¶
- class dflow.python.opio.BigParameter(type: Any, **kwargs)¶
Bases:
object
OPIO signature of big parameter
- Parameters:
type – parameter type
- classmethod from_dict(d)¶
- to_dict()¶
- class dflow.python.opio.HDF5Dataset(file, key)¶
Bases:
object
- property dataset¶
- get_data()¶
- is_none()¶
- recover()¶
- class dflow.python.opio.HDF5Datasets¶
Bases:
object
- class dflow.python.opio.NestedDictBase¶
Bases:
object
- class dflow.python.opio.NestedDictPath¶
Bases:
NestedDictBase
- class dflow.python.opio.NestedDictStr¶
Bases:
NestedDictBase
- class dflow.python.opio.OPIO(*args, **kwargs)¶
Bases:
MutableMapping
- class dflow.python.opio.OPIOSign(*args, **kwargs)¶
Bases:
MutableMapping
The signature of OPIO. A signature of OPIO includes the key and its typing
- class dflow.python.opio.Parameter(type: Any, global_name: str | None = None, **kwargs)¶
Bases:
object
OPIO signature of parameter
- Parameters:
type – parameter type
global_name – global name of the parameter within the workflow
default – default value of the parameter
- classmethod from_dict(d)¶
- to_dict()¶
- dflow.python.opio.t¶
alias of
NestedDictPath
dflow.python.python_op_template module¶
- exception dflow.python.python_op_template.FatalError¶
Bases:
Exception
- class dflow.python.python_op_template.PicklableFunctionOP(op_class)¶
Bases:
object
- class dflow.python.python_op_template.PythonOPTemplate(op_class: Type[OP] | OP, image: str | None = None, command: str | List[str] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, output_artifact_save: Dict[str, List[PVC | S3Artifact]] | None = None, output_artifact_archive: Dict[str, str | None] | None = None, output_parameter_default: Dict[str, Any] | None = None, input_artifact_prefix: Dict[str, str] | None = None, input_artifact_slices: Dict[str, str] | None = None, input_parameter_slices: Dict[str, str] | None = None, output_artifact_slices: Dict[str, str] | None = None, output_parameter_slices: Dict[str, str] | None = None, output_artifact_global_name: Dict[str, str] | None = None, output_parameter_global_name: Dict[str, str] | None = None, slices: Slices | None = None, python_packages: List[PathLike] | None = None, timeout: int | None = None, retry_on_transient_error: int | None = None, retry_on_failure: int | None = None, retry_on_error: int | None = None, retry_on_failure_and_error: int | None = None, timeout_as_transient_error: bool = False, memoize_key: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, upload_dflow: bool = True, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, tmp_root: str = '/tmp', pre_script: str = '', post_script: str = '', success_tag: bool = False, create_slice_dir: bool = False, skip_slice_input: bool = False)¶
Bases:
PythonScriptOPTemplate
Convert from Python class OP to OP template
- Parameters:
op_class – Python class OP
image – image of the OP template
command – python executable
annotations – annotations for the OP template
labels – labels for the OP template
node_selector – node selector when scheduling the pod
tolerations – tolerations of taints when scheduling the pod
affinity – affinity when scheduling the pod
input_artifact_slices – a dict specifying input artifacts to use slices
output_artifact_save – a dict specifying storage of output artifacts overriding default storage
output_artifact_archive – a dict specifying compress format of output artifacts, None for no compression
input_parameter_slices – a dict specifying input parameters to use slices
output_artifact_slices – a dict specifying output artifacts to use slices
output_parameter_slices – a dict specifying output parameters to use slices
output_artifact_global_name – a dict specifying global names of output artifacts within the workflow
slices – use slices to generate parallel steps
python_packages – local python packages to be uploaded to the OP
timeout – timeout of the OP template
retry_on_transient_error – maximum retries on TrasientError
output_parameter_default – a dict specifying default values for output parameters
output_parameter_global_name – a dict specifying global names of output parameters within the workflow
timeout_as_transient_error – regard timeout as transient error or fatal one
memoize_key – memoized key of the OP template
volumes – volumes to use in the OP template
mounts – volumes to mount in the OP template
image_pull_policy – Always, IfNotPresent, Never
requests – a dict of resource requests
limits – a dict of resource limits
envs – environment variables
init_containers – init containers before the template runs
sidecars – sidecar containers
- convert_to_argo(memoize_prefix=None, memoize_configmap='dflow')¶
- convert_to_graph()¶
- classmethod from_graph(graph)¶
- get_slices(slices_dict, name)¶
- render_script()¶
- render_slices(slices=None)¶
- set_skip_slice_input()¶
- set_slices(slices)¶
- class dflow.python.python_op_template.Slices(slices: str | None = None, input_parameter: List[str] | None = None, input_artifact: List[str] | None = None, output_parameter: List[str] | None = None, output_artifact: List[str] | None = None, sub_path: bool = False, group_size: int | None = None, shuffle: bool = False, random_seed: int = 0, pool_size: int | None = None, pool_timeout: int | None = None, register_first_only: bool = False, create_dir: bool = False, raise_for_group: bool = False)¶
Bases:
object
Slices specified in PythonOPTemplate
- Parameters:
slices – slice pattern
input_parameter – list of input parameters to be sliced
input_artifact – list of input artifacts to be sliced
output_parameter – list of output parameters to be stacked
output_artifact – list of output artifacts to be stacked
group_size – number of slices per task/step
pool_size – for multi slices per task/step, use a multiprocessing pool to handle each slice, 1 for serial, -1 for infinity (i.e. equals to the number of slices)
register_first_only – only register first slice when lineage used
create_dir – create a separate dir for each slice for saving output artifacts
raise_for_group – raise exception finally if any task in the group fails
- evalable_repr(imports)¶
- exception dflow.python.python_op_template.TransientError¶
Bases:
Exception
- dflow.python.python_op_template.handle_packages_script(package_root)¶
dflow.python.utils module¶
- dflow.python.utils.absolutize(path)¶
- dflow.python.utils.absolutize_hdf5(obj)¶
- dflow.python.utils.copy_results(source, name, data_root='/tmp', slice_dir=None, symlink=False)¶
- dflow.python.utils.copy_results_and_return_path_item(path, name, order, data_root='/tmp', slice_dir=None, symlink=False)¶
- dflow.python.utils.get_input_slices(name, data_root='/tmp')¶
- dflow.python.utils.get_slices(path_object, slices)¶
- dflow.python.utils.handle_empty_dir(path)¶
- dflow.python.utils.handle_input_artifact(name, sign, slices=None, data_root='/tmp', sub_path=None, n_parts=None, keys_of_parts=None, path=None, prefix=None)¶
- dflow.python.utils.handle_input_parameter(name, value, sign, slices=None, data_root='/tmp')¶
- dflow.python.utils.handle_lineage(wf_name, pod_name, op_obj, input_urns, workflow_urn, data_root='/tmp')¶
- dflow.python.utils.handle_output_artifact(name, value, sign, slices=None, data_root='/tmp', create_dir=False, symlink=False)¶
- dflow.python.utils.handle_output_parameter(name, value, sign, slices=None, data_root='/tmp')¶
- dflow.python.utils.path_or_none(p)¶
- dflow.python.utils.sigalrm_handler(signum, frame)¶
- dflow.python.utils.slice_to_dir(slice)¶
- dflow.python.utils.try_to_execute(input, slice_dir, op_obj, output_sign, cwd, timeout=None)¶
Module contents¶
- class dflow.python.Artifact(type: Any, archive: str = 'default', save: List[PVC | S3Artifact] | None = None, optional: bool = False, global_name: str | None = None, sub_path: bool = True, **kwargs)¶
Bases:
object
OPIO signature of artifact
- Parameters:
type – str, Path, Set[str], Set[Path], List[str], List[Path], Dict[str, str], Dict[str, Path], NestedDict[str] or NestedDict[Path]
archive – compress format of the artifact, None for no compression
save – place to store the output artifact instead of default storage, can be a list
optional – optional input artifact or not
global_name – global name of the artifact within the workflow
- classmethod from_dict(d)¶
- to_dict()¶
- class dflow.python.BigParameter(type: Any, **kwargs)¶
Bases:
object
OPIO signature of big parameter
- Parameters:
type – parameter type
- classmethod from_dict(d)¶
- to_dict()¶
- exception dflow.python.FatalError¶
Bases:
Exception
- class dflow.python.HDF5Datasets¶
Bases:
object
- class dflow.python.OP(*args, **kwargs)¶
Bases:
ABC
Python class OP
- Parameters:
progress_total – an int representing total progress
progress_current – an int representing currenet progress
- classmethod convert_to_graph()¶
- create_slice_dir = False¶
- static exec_sign_check(func)¶
- classmethod from_graph(graph)¶
- classmethod function(func=None, **kwargs)¶
- classmethod get_info()¶
- get_input_artifact_link(name: str) str ¶
- get_input_artifact_storage_key(name: str) str ¶
- classmethod get_opio_info(opio_sign)¶
- get_output_artifact_link(name: str) str ¶
- get_output_artifact_storage_key(name: str) str ¶
- handle_outputs(outputs, symlink=False)¶
- key = None¶
- outputs = {}¶
- pool_size = None¶
- progress_current = 0¶
- progress_total = 1¶
- register_output_artifact(name, namespace, dataset_name, **kwargs)¶
- set_output(name, value)¶
- slices = {}¶
- classmethod superfunction(func=None, **kwargs)¶
- tmp_root = '/tmp'¶
- workflow_name = None¶
- class dflow.python.OPIO(*args, **kwargs)¶
Bases:
MutableMapping
- class dflow.python.OPIOSign(*args, **kwargs)¶
Bases:
MutableMapping
The signature of OPIO. A signature of OPIO includes the key and its typing
- class dflow.python.Parameter(type: Any, global_name: str | None = None, **kwargs)¶
Bases:
object
OPIO signature of parameter
- Parameters:
type – parameter type
global_name – global name of the parameter within the workflow
default – default value of the parameter
- classmethod from_dict(d)¶
- to_dict()¶
- class dflow.python.PythonOPTemplate(op_class: Type[OP] | OP, image: str | None = None, command: str | List[str] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, output_artifact_save: Dict[str, List[PVC | S3Artifact]] | None = None, output_artifact_archive: Dict[str, str | None] | None = None, output_parameter_default: Dict[str, Any] | None = None, input_artifact_prefix: Dict[str, str] | None = None, input_artifact_slices: Dict[str, str] | None = None, input_parameter_slices: Dict[str, str] | None = None, output_artifact_slices: Dict[str, str] | None = None, output_parameter_slices: Dict[str, str] | None = None, output_artifact_global_name: Dict[str, str] | None = None, output_parameter_global_name: Dict[str, str] | None = None, slices: Slices | None = None, python_packages: List[PathLike] | None = None, timeout: int | None = None, retry_on_transient_error: int | None = None, retry_on_failure: int | None = None, retry_on_error: int | None = None, retry_on_failure_and_error: int | None = None, timeout_as_transient_error: bool = False, memoize_key: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, upload_dflow: bool = True, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, tmp_root: str = '/tmp', pre_script: str = '', post_script: str = '', success_tag: bool = False, create_slice_dir: bool = False, skip_slice_input: bool = False)¶
Bases:
PythonScriptOPTemplate
Convert from Python class OP to OP template
- Parameters:
op_class – Python class OP
image – image of the OP template
command – python executable
annotations – annotations for the OP template
labels – labels for the OP template
node_selector – node selector when scheduling the pod
tolerations – tolerations of taints when scheduling the pod
affinity – affinity when scheduling the pod
input_artifact_slices – a dict specifying input artifacts to use slices
output_artifact_save – a dict specifying storage of output artifacts overriding default storage
output_artifact_archive – a dict specifying compress format of output artifacts, None for no compression
input_parameter_slices – a dict specifying input parameters to use slices
output_artifact_slices – a dict specifying output artifacts to use slices
output_parameter_slices – a dict specifying output parameters to use slices
output_artifact_global_name – a dict specifying global names of output artifacts within the workflow
slices – use slices to generate parallel steps
python_packages – local python packages to be uploaded to the OP
timeout – timeout of the OP template
retry_on_transient_error – maximum retries on TrasientError
output_parameter_default – a dict specifying default values for output parameters
output_parameter_global_name – a dict specifying global names of output parameters within the workflow
timeout_as_transient_error – regard timeout as transient error or fatal one
memoize_key – memoized key of the OP template
volumes – volumes to use in the OP template
mounts – volumes to mount in the OP template
image_pull_policy – Always, IfNotPresent, Never
requests – a dict of resource requests
limits – a dict of resource limits
envs – environment variables
init_containers – init containers before the template runs
sidecars – sidecar containers
- convert_to_argo(memoize_prefix=None, memoize_configmap='dflow')¶
- convert_to_graph()¶
- classmethod from_graph(graph)¶
- get_slices(slices_dict, name)¶
- render_script()¶
- render_slices(slices=None)¶
- set_skip_slice_input()¶
- set_slices(slices)¶
- class dflow.python.Slices(slices: str | None = None, input_parameter: List[str] | None = None, input_artifact: List[str] | None = None, output_parameter: List[str] | None = None, output_artifact: List[str] | None = None, sub_path: bool = False, group_size: int | None = None, shuffle: bool = False, random_seed: int = 0, pool_size: int | None = None, pool_timeout: int | None = None, register_first_only: bool = False, create_dir: bool = False, raise_for_group: bool = False)¶
Bases:
object
Slices specified in PythonOPTemplate
- Parameters:
slices – slice pattern
input_parameter – list of input parameters to be sliced
input_artifact – list of input artifacts to be sliced
output_parameter – list of output parameters to be stacked
output_artifact – list of output artifacts to be stacked
group_size – number of slices per task/step
pool_size – for multi slices per task/step, use a multiprocessing pool to handle each slice, 1 for serial, -1 for infinity (i.e. equals to the number of slices)
register_first_only – only register first slice when lineage used
create_dir – create a separate dir for each slice for saving output artifacts
raise_for_group – raise exception finally if any task in the group fails
- evalable_repr(imports)¶
- exception dflow.python.TransientError¶
Bases:
Exception