dflow.python package

Submodules

dflow.python.op module

class dflow.python.op.OP(*args, **kwargs)

Bases: ABC

Python class OP

Parameters:
  • progress_total – an int representing total progress

  • progress_current – an int representing currenet progress

classmethod convert_to_graph()
create_slice_dir = False
static exec_sign_check(func)
abstract execute(op_in: OPIO) OPIO

Run the OP

classmethod from_graph(graph)
classmethod function(func=None, **kwargs)
classmethod get_info()
get_input_artifact_storage_key(name: str) str
abstract classmethod get_input_sign() OPIOSign

Get the signature of the inputs

classmethod get_opio_info(opio_sign)
get_output_artifact_storage_key(name: str) str
abstract classmethod get_output_sign() OPIOSign

Get the signature of the outputs

handle_outputs(outputs, symlink=False)
key = None
outputs = {}
pool_size = None
progress_current = 0
progress_total = 1
register_output_artifact(name, namespace, dataset_name, **kwargs)
set_output(name, value)
slices = {}
classmethod superfunction(func=None, **kwargs)
tmp_root = '/tmp'
workflow_name = None
dflow.python.op.get_source_code(o)
dflow.python.op.type2opiosign(t)

dflow.python.opio module

class dflow.python.opio.Artifact(type: Any, archive: str = 'default', save: List[PVC | S3Artifact] | None = None, optional: bool = False, global_name: str | None = None, sub_path: bool = True, **kwargs)

Bases: object

OPIO signature of artifact

Parameters:
  • type – str, Path, Set[str], Set[Path], List[str], List[Path], Dict[str, str], Dict[str, Path], NestedDict[str] or NestedDict[Path]

  • archive – compress format of the artifact, None for no compression

  • save – place to store the output artifact instead of default storage, can be a list

  • optional – optional input artifact or not

  • global_name – global name of the artifact within the workflow

classmethod from_dict(d)
to_dict()
class dflow.python.opio.BigParameter(type: Any, **kwargs)

Bases: object

OPIO signature of big parameter

Parameters:

type – parameter type

classmethod from_dict(d)
to_dict()
class dflow.python.opio.HDF5Dataset(file, key)

Bases: object

property dataset
get_data()
is_none()
recover()
class dflow.python.opio.HDF5Datasets

Bases: object

class dflow.python.opio.NestedDictBase

Bases: object

class dflow.python.opio.NestedDictPath

Bases: NestedDictBase

class dflow.python.opio.NestedDictStr

Bases: NestedDictBase

class dflow.python.opio.OPIO(*args, **kwargs)

Bases: MutableMapping

class dflow.python.opio.OPIOSign(*args, **kwargs)

Bases: MutableMapping

The signature of OPIO. A signature of OPIO includes the key and its typing

class dflow.python.opio.Parameter(type: Any, global_name: str | None = None, **kwargs)

Bases: object

OPIO signature of parameter

Parameters:
  • type – parameter type

  • global_name – global name of the parameter within the workflow

  • default – default value of the parameter

classmethod from_dict(d)
to_dict()
dflow.python.opio.t

alias of NestedDictPath

dflow.python.python_op_template module

exception dflow.python.python_op_template.FatalError

Bases: Exception

class dflow.python.python_op_template.PicklableFunctionOP(op_class)

Bases: object

class dflow.python.python_op_template.PythonOPTemplate(op_class: Type[OP] | OP, image: str | None = None, command: str | List[str] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, output_artifact_save: Dict[str, List[PVC | S3Artifact]] | None = None, output_artifact_archive: Dict[str, str | None] | None = None, output_parameter_default: Dict[str, Any] | None = None, input_artifact_prefix: Dict[str, str] | None = None, input_artifact_slices: Dict[str, str] | None = None, input_parameter_slices: Dict[str, str] | None = None, output_artifact_slices: Dict[str, str] | None = None, output_parameter_slices: Dict[str, str] | None = None, output_artifact_global_name: Dict[str, str] | None = None, output_parameter_global_name: Dict[str, str] | None = None, slices: Slices | None = None, python_packages: List[PathLike] | None = None, timeout: int | None = None, retry_on_transient_error: int | None = None, retry_on_failure: int | None = None, retry_on_error: int | None = None, retry_on_failure_and_error: int | None = None, timeout_as_transient_error: bool = False, memoize_key: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, upload_dflow: bool = True, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, tmp_root: str = '/tmp', pre_script: str = '', post_script: str = '', success_tag: bool = False, create_slice_dir: bool = False, skip_slice_input: bool = False)

Bases: PythonScriptOPTemplate

Convert from Python class OP to OP template

Parameters:
  • op_class – Python class OP

  • image – image of the OP template

  • command – python executable

  • annotations – annotations for the OP template

  • labels – labels for the OP template

  • node_selector – node selector when scheduling the pod

  • tolerations – tolerations of taints when scheduling the pod

  • affinity – affinity when scheduling the pod

  • input_artifact_slices – a dict specifying input artifacts to use slices

  • output_artifact_save – a dict specifying storage of output artifacts overriding default storage

  • output_artifact_archive – a dict specifying compress format of output artifacts, None for no compression

  • input_parameter_slices – a dict specifying input parameters to use slices

  • output_artifact_slices – a dict specifying output artifacts to use slices

  • output_parameter_slices – a dict specifying output parameters to use slices

  • output_artifact_global_name – a dict specifying global names of output artifacts within the workflow

  • slices – use slices to generate parallel steps

  • python_packages – local python packages to be uploaded to the OP

  • timeout – timeout of the OP template

  • retry_on_transient_error – maximum retries on TrasientError

  • output_parameter_default – a dict specifying default values for output parameters

  • output_parameter_global_name – a dict specifying global names of output parameters within the workflow

  • timeout_as_transient_error – regard timeout as transient error or fatal one

  • memoize_key – memoized key of the OP template

  • volumes – volumes to use in the OP template

  • mounts – volumes to mount in the OP template

  • image_pull_policy – Always, IfNotPresent, Never

  • requests – a dict of resource requests

  • limits – a dict of resource limits

  • envs – environment variables

  • init_containers – init containers before the template runs

  • sidecars – sidecar containers

add_slices(slices: Slices, layer=0)
convert_to_argo(memoize_prefix=None, memoize_configmap='dflow')
convert_to_graph()
classmethod from_graph(graph)
get_slices(slices_dict, name)
render_script()
render_slices(slices=None)
set_skip_slice_input()
set_slices(slices)
class dflow.python.python_op_template.Slices(slices: str | None = None, input_parameter: List[str] | None = None, input_artifact: List[str] | None = None, output_parameter: List[str] | None = None, output_artifact: List[str] | None = None, sub_path: bool = False, group_size: int | None = None, shuffle: bool = False, random_seed: int = 0, pool_size: int | None = None, pool_timeout: int | None = None, register_first_only: bool = False, create_dir: bool = False, raise_for_group: bool = False)

Bases: object

Slices specified in PythonOPTemplate

Parameters:
  • slices – slice pattern

  • input_parameter – list of input parameters to be sliced

  • input_artifact – list of input artifacts to be sliced

  • output_parameter – list of output parameters to be stacked

  • output_artifact – list of output artifacts to be stacked

  • group_size – number of slices per task/step

  • pool_size – for multi slices per task/step, use a multiprocessing pool to handle each slice, 1 for serial, -1 for infinity (i.e. equals to the number of slices)

  • register_first_only – only register first slice when lineage used

  • create_dir – create a separate dir for each slice for saving output artifacts

  • raise_for_group – raise exception finally if any task in the group fails

evalable_repr(imports)
exception dflow.python.python_op_template.TransientError

Bases: Exception

dflow.python.python_op_template.handle_packages_script(package_root)

dflow.python.utils module

dflow.python.utils.absolutize(path)
dflow.python.utils.absolutize_hdf5(obj)
dflow.python.utils.copy_results(source, name, data_root='/tmp', slice_dir=None, symlink=False)
dflow.python.utils.copy_results_and_return_path_item(path, name, order, data_root='/tmp', slice_dir=None, symlink=False)
dflow.python.utils.get_input_slices(name, data_root='/tmp')
dflow.python.utils.get_slices(path_object, slices)
dflow.python.utils.handle_empty_dir(path)
dflow.python.utils.handle_input_artifact(name, sign, slices=None, data_root='/tmp', sub_path=None, n_parts=None, keys_of_parts=None, path=None, prefix=None)
dflow.python.utils.handle_input_parameter(name, value, sign, slices=None, data_root='/tmp')
dflow.python.utils.handle_lineage(wf_name, pod_name, op_obj, input_urns, workflow_urn, data_root='/tmp')
dflow.python.utils.handle_output_artifact(name, value, sign, slices=None, data_root='/tmp', create_dir=False, symlink=False)
dflow.python.utils.handle_output_parameter(name, value, sign, slices=None, data_root='/tmp')
dflow.python.utils.path_or_none(p)
dflow.python.utils.sigalrm_handler(signum, frame)
dflow.python.utils.slice_to_dir(slice)
dflow.python.utils.try_to_execute(input, slice_dir, op_obj, output_sign, cwd, timeout=None)

Module contents

class dflow.python.Artifact(type: Any, archive: str = 'default', save: List[PVC | S3Artifact] | None = None, optional: bool = False, global_name: str | None = None, sub_path: bool = True, **kwargs)

Bases: object

OPIO signature of artifact

Parameters:
  • type – str, Path, Set[str], Set[Path], List[str], List[Path], Dict[str, str], Dict[str, Path], NestedDict[str] or NestedDict[Path]

  • archive – compress format of the artifact, None for no compression

  • save – place to store the output artifact instead of default storage, can be a list

  • optional – optional input artifact or not

  • global_name – global name of the artifact within the workflow

classmethod from_dict(d)
to_dict()
class dflow.python.BigParameter(type: Any, **kwargs)

Bases: object

OPIO signature of big parameter

Parameters:

type – parameter type

classmethod from_dict(d)
to_dict()
exception dflow.python.FatalError

Bases: Exception

class dflow.python.HDF5Datasets

Bases: object

class dflow.python.OP(*args, **kwargs)

Bases: ABC

Python class OP

Parameters:
  • progress_total – an int representing total progress

  • progress_current – an int representing currenet progress

classmethod convert_to_graph()
create_slice_dir = False
static exec_sign_check(func)
abstract execute(op_in: OPIO) OPIO

Run the OP

classmethod from_graph(graph)
classmethod function(func=None, **kwargs)
classmethod get_info()
get_input_artifact_storage_key(name: str) str
abstract classmethod get_input_sign() OPIOSign

Get the signature of the inputs

classmethod get_opio_info(opio_sign)
get_output_artifact_storage_key(name: str) str
abstract classmethod get_output_sign() OPIOSign

Get the signature of the outputs

handle_outputs(outputs, symlink=False)
key = None
outputs = {}
pool_size = None
progress_current = 0
progress_total = 1
register_output_artifact(name, namespace, dataset_name, **kwargs)
set_output(name, value)
slices = {}
classmethod superfunction(func=None, **kwargs)
tmp_root = '/tmp'
workflow_name = None
class dflow.python.OPIO(*args, **kwargs)

Bases: MutableMapping

class dflow.python.OPIOSign(*args, **kwargs)

Bases: MutableMapping

The signature of OPIO. A signature of OPIO includes the key and its typing

class dflow.python.Parameter(type: Any, global_name: str | None = None, **kwargs)

Bases: object

OPIO signature of parameter

Parameters:
  • type – parameter type

  • global_name – global name of the parameter within the workflow

  • default – default value of the parameter

classmethod from_dict(d)
to_dict()
class dflow.python.PythonOPTemplate(op_class: Type[OP] | OP, image: str | None = None, command: str | List[str] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, output_artifact_save: Dict[str, List[PVC | S3Artifact]] | None = None, output_artifact_archive: Dict[str, str | None] | None = None, output_parameter_default: Dict[str, Any] | None = None, input_artifact_prefix: Dict[str, str] | None = None, input_artifact_slices: Dict[str, str] | None = None, input_parameter_slices: Dict[str, str] | None = None, output_artifact_slices: Dict[str, str] | None = None, output_parameter_slices: Dict[str, str] | None = None, output_artifact_global_name: Dict[str, str] | None = None, output_parameter_global_name: Dict[str, str] | None = None, slices: Slices | None = None, python_packages: List[PathLike] | None = None, timeout: int | None = None, retry_on_transient_error: int | None = None, retry_on_failure: int | None = None, retry_on_error: int | None = None, retry_on_failure_and_error: int | None = None, timeout_as_transient_error: bool = False, memoize_key: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, upload_dflow: bool = True, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, tmp_root: str = '/tmp', pre_script: str = '', post_script: str = '', success_tag: bool = False, create_slice_dir: bool = False, skip_slice_input: bool = False)

Bases: PythonScriptOPTemplate

Convert from Python class OP to OP template

Parameters:
  • op_class – Python class OP

  • image – image of the OP template

  • command – python executable

  • annotations – annotations for the OP template

  • labels – labels for the OP template

  • node_selector – node selector when scheduling the pod

  • tolerations – tolerations of taints when scheduling the pod

  • affinity – affinity when scheduling the pod

  • input_artifact_slices – a dict specifying input artifacts to use slices

  • output_artifact_save – a dict specifying storage of output artifacts overriding default storage

  • output_artifact_archive – a dict specifying compress format of output artifacts, None for no compression

  • input_parameter_slices – a dict specifying input parameters to use slices

  • output_artifact_slices – a dict specifying output artifacts to use slices

  • output_parameter_slices – a dict specifying output parameters to use slices

  • output_artifact_global_name – a dict specifying global names of output artifacts within the workflow

  • slices – use slices to generate parallel steps

  • python_packages – local python packages to be uploaded to the OP

  • timeout – timeout of the OP template

  • retry_on_transient_error – maximum retries on TrasientError

  • output_parameter_default – a dict specifying default values for output parameters

  • output_parameter_global_name – a dict specifying global names of output parameters within the workflow

  • timeout_as_transient_error – regard timeout as transient error or fatal one

  • memoize_key – memoized key of the OP template

  • volumes – volumes to use in the OP template

  • mounts – volumes to mount in the OP template

  • image_pull_policy – Always, IfNotPresent, Never

  • requests – a dict of resource requests

  • limits – a dict of resource limits

  • envs – environment variables

  • init_containers – init containers before the template runs

  • sidecars – sidecar containers

add_slices(slices: Slices, layer=0)
convert_to_argo(memoize_prefix=None, memoize_configmap='dflow')
convert_to_graph()
classmethod from_graph(graph)
get_slices(slices_dict, name)
render_script()
render_slices(slices=None)
set_skip_slice_input()
set_slices(slices)
class dflow.python.Slices(slices: str | None = None, input_parameter: List[str] | None = None, input_artifact: List[str] | None = None, output_parameter: List[str] | None = None, output_artifact: List[str] | None = None, sub_path: bool = False, group_size: int | None = None, shuffle: bool = False, random_seed: int = 0, pool_size: int | None = None, pool_timeout: int | None = None, register_first_only: bool = False, create_dir: bool = False, raise_for_group: bool = False)

Bases: object

Slices specified in PythonOPTemplate

Parameters:
  • slices – slice pattern

  • input_parameter – list of input parameters to be sliced

  • input_artifact – list of input artifacts to be sliced

  • output_parameter – list of output parameters to be stacked

  • output_artifact – list of output artifacts to be stacked

  • group_size – number of slices per task/step

  • pool_size – for multi slices per task/step, use a multiprocessing pool to handle each slice, 1 for serial, -1 for infinity (i.e. equals to the number of slices)

  • register_first_only – only register first slice when lineage used

  • create_dir – create a separate dir for each slice for saving output artifacts

  • raise_for_group – raise exception finally if any task in the group fails

evalable_repr(imports)
exception dflow.python.TransientError

Bases: Exception