dflow package¶
Subpackages¶
- dflow.client package
- Submodules
- dflow.client.v1alpha1_artifact module
- dflow.client.v1alpha1_dag_task module
- dflow.client.v1alpha1_lifecycle_hook module
- dflow.client.v1alpha1_parameter module
- dflow.client.v1alpha1_retry_strategy module
- dflow.client.v1alpha1_sequence module
- dflow.client.v1alpha1_value_from module
- dflow.client.v1alpha1_workflow_step module
- Module contents
- dflow.plugins package
- Submodules
- dflow.plugins.bohrium module
- dflow.plugins.datasets module
DatasetsArtifact
DatasetsArtifact.bohrium_download()
DatasetsArtifact.download()
DatasetsArtifact.from_rclone_config()
DatasetsArtifact.from_urn()
DatasetsArtifact.get_bohrium_urn()
DatasetsArtifact.get_mount_script()
DatasetsArtifact.get_urn()
DatasetsArtifact.modify_config()
DatasetsArtifact.remote_download()
DatasetsArtifact.render()
DatasetsArtifact.sub_path()
wait_for_mount()
- dflow.plugins.dispatcher module
- dflow.plugins.launching module
- dflow.plugins.lebesgue module
- dflow.plugins.metadata module
- dflow.plugins.oss module
- dflow.plugins.ray module
- Module contents
- dflow.python package
- Submodules
- dflow.python.op module
OP
OP.convert_to_graph()
OP.create_slice_dir
OP.exec_sign_check()
OP.execute()
OP.from_graph()
OP.function()
OP.get_info()
OP.get_input_artifact_link()
OP.get_input_artifact_storage_key()
OP.get_input_sign()
OP.get_opio_info()
OP.get_output_artifact_link()
OP.get_output_artifact_storage_key()
OP.get_output_sign()
OP.handle_outputs()
OP.key
OP.outputs
OP.pool_size
OP.progress_current
OP.progress_total
OP.register_output_artifact()
OP.set_output()
OP.slices
OP.superfunction()
OP.tmp_root
OP.workflow_name
get_source_code()
type2opiosign()
- dflow.python.opio module
- dflow.python.python_op_template module
- dflow.python.utils module
absolutize()
absolutize_hdf5()
copy_results()
copy_results_and_return_path_item()
get_input_slices()
get_slices()
handle_empty_dir()
handle_input_artifact()
handle_input_parameter()
handle_lineage()
handle_output_artifact()
handle_output_parameter()
path_or_none()
sigalrm_handler()
slice_to_dir()
try_to_execute()
- Module contents
Artifact
BigParameter
FatalError
HDF5Datasets
OP
OP.convert_to_graph()
OP.create_slice_dir
OP.exec_sign_check()
OP.execute()
OP.from_graph()
OP.function()
OP.get_info()
OP.get_input_artifact_link()
OP.get_input_artifact_storage_key()
OP.get_input_sign()
OP.get_opio_info()
OP.get_output_artifact_link()
OP.get_output_artifact_storage_key()
OP.get_output_sign()
OP.handle_outputs()
OP.key
OP.outputs
OP.pool_size
OP.progress_current
OP.progress_total
OP.register_output_artifact()
OP.set_output()
OP.slices
OP.superfunction()
OP.tmp_root
OP.workflow_name
OPIO
OPIOSign
Parameter
PythonOPTemplate
Slices
TransientError
Submodules¶
dflow.argo_objects module¶
- class dflow.argo_objects.ArgoObjectDict(d)¶
Bases:
UserDict
Generate ArgoObjectDict and ArgoObjectList on initialization rather than on __getattr__, otherwise modify a.b.c will not take effect
- recover()¶
- class dflow.argo_objects.ArgoParameter(par)¶
Bases:
ArgoObjectDict
- class dflow.argo_objects.ArgoStep(step, workflow)¶
Bases:
ArgoObjectDict
- delete_pod()¶
- get_duration() timedelta ¶
- get_pod()¶
- get_script()¶
- handle_big_parameters(io)¶
- handle_io(io)¶
- modify_output_artifact(name: str, s3: S3Artifact) None ¶
Modify output artifact of an Argo step
- Parameters:
name – artifact name
s3 – replace the artifact with a s3 object
- modify_output_parameter(name: str, value: Any) None ¶
Modify output parameter of an Argo step
- Parameters:
name – parameter name
value – new value
- replay()¶
- retry()¶
- set_script(script)¶
- class dflow.argo_objects.ArgoWorkflow(d)¶
Bases:
ArgoObjectDict
- get_duration() timedelta ¶
- get_step(name: str | List[str] | None = None, key: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None, parent_id: str | None = None, sort_by_generation: bool = False) List[ArgoStep] ¶
- get_sub_nodes(node_id)¶
- record_generation(node_id, generation)¶
- dflow.argo_objects.fnva(data, hval_init, fnv_prime, fnv_size)¶
Alternative FNV hash algorithm used in FNV-1a.
- dflow.argo_objects.get_duration(status) timedelta ¶
- dflow.argo_objects.get_hash(node_name)¶
- dflow.argo_objects.get_pod_name(wf_name, node_name, template_name, node_id)¶
- dflow.argo_objects.match(n, names)¶
dflow.code_gen module¶
- class dflow.code_gen.CodeGenerator(graph)¶
Bases:
object
- generate()¶
- get_kwargs(template, cls)¶
- get_var_name(name)¶
- render_dag(var_name, template)¶
- render_python_op_template(var_name, template)¶
- render_script_op_template(var_name, template)¶
- render_steps(var_name, template)¶
- dflow.code_gen.gen_code(graph)¶
dflow.common module¶
- class dflow.common.CustomArtifact¶
Bases:
ABC
- abstract download(name: str, path: str)¶
- static from_urn(urn)¶
- abstract get_urn() str ¶
- redirect = None¶
- render(template, name: str)¶
- class dflow.common.CustomHandler(context)¶
Bases:
BaseHandler
- flatten(obj, data)¶
Flatten obj into a json-friendly form and write result to data.
- Parameters:
obj (object) – The object to be serialized.
data (dict) – A partially filled dictionary which will contain the json-friendly representation of obj once this method has finished.
- restore(obj)¶
Restore an object of the registered type from the json-friendly representation obj and return it.
- class dflow.common.LineageClient¶
Bases:
ABC
- abstract get_artifact_metadata(urn: str) object ¶
- abstract register_artifact(namespace: str, name: str, uri: str, **kwargs) str ¶
- abstract register_task(task_name: str, input_urns: Dict[str, str | List[str]], output_uris: Dict[str, str], workflow_urn: str) Dict[str, str] ¶
- abstract register_workflow(workflow_name: str) str ¶
- class dflow.common.S3Artifact(key: str | None = None, path_list: str | list | None = None, urn: str = '', debug_s3: bool = False, *args, **kwargs)¶
Bases:
object
S3 artifact
- Parameters:
key – key of the s3 artifact
- download(**kwargs)¶
- evalable_repr(imports)¶
- classmethod from_dict(d)¶
- oss()¶
- sub_path(path: str) Any ¶
- to_dict()¶
- dflow.common.import_func(s)¶
dflow.config module¶
- dflow.config.boolize(s)¶
- dflow.config.nullable(s)¶
- dflow.config.set_config(**kwargs) None ¶
Set global configurations
- Parameters:
host – host of Argo server
namespace – k8s namespace
token – token for authentication, necessary for reused workflows
k8s_config_file – location of kube config file if it is used for
authentication
k8s_api_server – address of Kubernetes API server, necessary for reused
workflows
private_key_host_path – path of private key on the Kubernetes nodes
save_path_as_parameter – save catalog of artifacts as parameters
catalog_dir_name – catalog directory name for artifacts
archive_mode – “tar” for archiving with tar, None for no archive
util_image – image for util step
util_image_pull_policy – image pull policy for util step
extender_image – image for dflow extender
extender_image_pull_policy – image pull policy for dflow extender
dispatcher_image – image for dpdispatcher
dispatcher_image_pull_policy – image pull policy for dpdispatcher
save_keys_in_global_outputs – save keys of steps in global outputs
mode – “default” for normal, “debug” for debugging locally
lineage – lineage client, None by default
http_headers – HTTP headers for requesting Argo server
workflow_annotations – default annotations for workflows
overwrite_reused_artifact – overwrite reused artifact
- dflow.config.set_s3_config(**kwargs) None ¶
Set S3 configurations
- Parameters:
endpoint – endpoint for S3 storage
console – console address for S3 storage
access_key – access key for S3 storage
secret_key – secret key for S3 storage
secure – secure or not
bucket_name – name of S3 bucket
repo_key – key of artifact repository
repo – artifact repository, parsed from repo_key
repo_type – s3 or oss, parsed from repo_key
repo_prefix – prefix of artifact repository, parsed from repo_key
prefix – prefix of storage key
storage_client – client for plugin storage backend
extra_prefixes – extra prefixes ignored by auto-prefixing
- dflow.config.split_headers(s)¶
dflow.context module¶
- class dflow.context.Context¶
Bases:
ABC
- abstract render(template: OPTemplate) OPTemplate ¶
render original template and return a new template
dflow.context_syntax module¶
dflow.dag module¶
- class dflow.dag.DAG(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, tasks: List[Task] | None = None, memoize_key: str | None = None, annotations: Dict[str, str] | None = None, parallelism: int | None = None)¶
Bases:
OPTemplate
- Parameters:
name – the name of the dag
inputs – inputs in the template
outputs – outputs in the template
tasks – a list of tasks
memoize_key – memoized key of the dag
annotations – annotations for the OP template
parallelism – maximum number of running pods for the OP template
- add(task: Task | List[Task]) None ¶
Add a task or a list of tasks to the dag
- Parameters:
task – a task or a list of tasks to be added to the dag
- add_slices(slices, layer=0)¶
- convert_to_argo(memoize_prefix=None, memoize_configmap='dflow', context=None)¶
- convert_to_graph()¶
- copy()¶
- classmethod from_dict(d, templates)¶
- classmethod from_graph(graph, templates)¶
- resolve(pool, futures)¶
- run(workflow_id=None, context=None)¶
dflow.executor module¶
- class dflow.executor.ContainerExecutor(docker: str | None = None, singularity: str | None = None, podman: str | None = None, image_pull_policy: str | None = None)¶
Bases:
Executor
- render(template)¶
render original template and return a new template, do not modify self in this method to make the executor reusable
- class dflow.executor.Executor¶
Bases:
ABC
- abstract render(template: OPTemplate) OPTemplate ¶
render original template and return a new template, do not modify self in this method to make the executor reusable
- class dflow.executor.RemoteExecutor(host: str, port: int = 22, username: str = 'root', password: str | None = None, private_key_file: PathLike | None = None, workdir: str = '~/dflow/workflows/{{workflow.name}}/{{pod.name}}', command: str | List[str] | None = None, remote_command: str | List[str] | None = None, image: str | None = None, image_pull_policy: str | None = None, map_tmp_dir: bool = True, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None, action_retries: int = -1)¶
Bases:
Executor
- download(src, dst)¶
- execute(cmd)¶
- get_script(template)¶
- mkdir_and_download(path)¶
- render(template)¶
render original template and return a new template, do not modify self in this method to make the executor reusable
- run(image, remote_command)¶
- upload(src, dst)¶
- upload_if_exists(path)¶
- dflow.executor.render_script_with_tmp_root(template, tmp_root)¶
- dflow.executor.run_script(image, cmd, docker=None, singularity=None, podman=None, image_pull_policy=None, host_mounts=None, cpu=None, memory=None, args='', envs=None)¶
dflow.io module¶
- class dflow.io.AutonamedDict(*args, **kwargs)¶
Bases:
UserDict
- convert_to_graph()¶
- set_step(step)¶
- set_template(template)¶
- class dflow.io.IfExpression(_if: str | ArgoVar, _then: str | ArgoVar, _else: str | ArgoVar)¶
Bases:
ArgoVar
,Expression
- class dflow.io.InputArtifact(path: str | None = None, name: str | None = None, step=None, template=None, optional: bool = False, type: Any | None = None, source: str | InputArtifact | OutputArtifact | S3Artifact | None = None, mode: int | None = None, sub_path: str | None = None, archive: str = 'default', save_as_parameter: bool = False, **kwargs)¶
Bases:
ArgoVar
Input artifact for OP template
- Parameters:
path – path where the input artifact is placed in the container
name – name of the input artifact
optional – optional artifact or not
type – artifact type
source – default source
archive – regarded as archived file or not
- convert_to_argo()¶
- convert_to_graph()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- get_path_list_parameter()¶
- get_urn_parameter()¶
- sub_path(path)¶
- class dflow.io.InputArtifacts(*args, **kwargs)¶
Bases:
AutonamedDict
- set_template(template)¶
- class dflow.io.InputParameter(name: str | None = None, step=None, template=None, type: Any | None = None, save_as_artifact: bool = False, path: str | None = None, source: S3Artifact | InputArtifact | OutputArtifact | None = None, **kwargs)¶
Bases:
ArgoVar
Input parameter for OP template
- Parameters:
name – name of the input parameter
type – parameter type
value – default value
- convert_to_argo()¶
- convert_to_graph()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- class dflow.io.InputParameters(*args, **kwargs)¶
Bases:
AutonamedDict
- class dflow.io.Inputs(parameters: Dict[str, InputParameter] | None = None, artifacts: Dict[str, InputArtifact] | None = None, step=None, template=None)¶
Bases:
object
Inputs for OP template
- Parameters:
parameters – input parameters
artifacts – input artifacts
- convert_to_argo()¶
- convert_to_graph()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- set_step(step)¶
- set_template(template)¶
- class dflow.io.ObjectDict(dict=None, /, **kwargs)¶
Bases:
UserDict
- class dflow.io.OutputArtifact(path: str | None = None, _from: InputArtifact | OutputArtifact | None = None, name: str | None = None, step=None, template=None, type: Any | None = None, save: List[PVC | S3Artifact] | None = None, archive: str = 'default', global_name: str | None = None, from_expression: IfExpression | str | None = None, optional: bool = False)¶
Bases:
ArgoVar
Output artifact for OP template
- Parameters:
path – path of the output artifact in the container
_from – the artifact is from another artifact
name – name of the output artifact
type – artifact type
save – place to store the output artifact instead of default storage, can be a list
archive – compress format of the artifact, None for no compression
global_name – global name of the artifact within the workflow
from_expression – the artifact is from an expression
- convert_to_argo()¶
- convert_to_graph()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- get_path_list_parameter()¶
- get_urn_parameter()¶
- handle_path_list()¶
- handle_urn()¶
- pvc(size='1Gi', storage_class=None, access_modes=None)¶
- sub_path(path)¶
- class dflow.io.OutputArtifacts(*args, **kwargs)¶
Bases:
AutonamedDict
- set_template(template)¶
- class dflow.io.OutputParameter(value_from_path: str | None = None, value_from_parameter: InputParameter | OutputParameter | None = None, name: str | None = None, step=None, template=None, type: Any | None = None, global_name: str | None = None, value_from_expression: IfExpression | str | None = None, save_as_artifact: bool = False, save_both: bool = False, **kwargs)¶
Bases:
ArgoVar
Output parameter for OP template
- Parameters:
value_from_path – the value is read from file generated in the container
value_from_parameter – the value is from another parameter
name – name of the output parameter
type – parameter type
default – default value
global_name – global name of the parameter within the workflow
value_from_expression – the value is from an expression
value – specify value directly
- convert_to_argo()¶
- convert_to_argo_artifact()¶
- convert_to_argo_parameter()¶
- convert_to_graph()¶
- expr_as_artifact()¶
- expr_as_parameter()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- repr_as_artifact()¶
- repr_as_parameter()¶
- class dflow.io.OutputParameters(*args, **kwargs)¶
Bases:
AutonamedDict
- class dflow.io.Outputs(parameters: Dict[str, OutputParameter] | None = None, artifacts: Dict[str, OutputArtifact] | None = None, step=None, template=None)¶
Bases:
object
Outputs for OP template
- Parameters:
paramters – output parameters
artifacts – output artifacts
- convert_to_argo()¶
- convert_to_graph()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- set_step(step)¶
- set_template(template)¶
- class dflow.io.PVC(name: str, subpath: str, size: str = '1Gi', storage_class: str | None = None, access_modes: List[str] | None = None)¶
Bases:
object
- dflow.io.convert_value_to_str(value)¶
- dflow.io.if_expression(_if: str | ArgoVar, _then: str | ArgoVar, _else: str | ArgoVar) IfExpression ¶
Return an if expression in Argo
- Parameters:
_if – a bool expression, which may be a comparison of two Argo parameters
_then – value returned if the condition is satisfied
_else – value returned if the condition is not satisfied
- dflow.io.to_expr(var)¶
- dflow.io.type_to_str(type)¶
dflow.main module¶
- dflow.main.format_print_table(t: List[List[str]])¶
- dflow.main.format_time_delta(td: timedelta) str ¶
- dflow.main.main()¶
- dflow.main.main_parser()¶
- dflow.main.parse_args(args: List[str] | None = None)¶
Commandline options argument parsing.
- Parameters:
args (List[str]) – list of command line arguments, main purpose is testing default option None takes arguments from sys.argv
dflow.op_template module¶
- class dflow.op_template.ContainerOPTemplate(command: str | List[str] = '', args: List[str] | None = None, **kwargs)¶
Bases:
ScriptOPTemplate
- classmethod from_dict(d)¶
- class dflow.op_template.OPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None)¶
Bases:
object
- copy()¶
- deepcopy()¶
- classmethod from_dict(d)¶
- handle_key(memoize_prefix=None, memoize_configmap='dflow')¶
- class dflow.op_template.PythonScriptOPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, image: str | None = None, command: str | List[str] | None = None, script: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, init_progress: str = '0/1', timeout: str | None = None, retry_strategy: object | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, script_rendered: bool = False)¶
Bases:
ScriptOPTemplate
Python script OP template
- Parameters:
name – the name of the OP template
inputs – input parameters and input artifacts
outputs – output parameters and output artifacts
image – image the template uses
command – command to run the script
script – python script
volumes – volumes the template uses
mounts – volumes the template mounts
init_progress – a str representing the initial progress
timeout – timeout of the OP template
retry_strategy – retry strategy of the OP template
memoize_key – memoized key of the OP template
pvcs – PVCs need to be declared
image_pull_policy – Always, IfNotPresent, Never
annotations – annotations for the OP template
labels – labels for the OP template
node_selector – node selector when scheduling the pod
tolerations – tolerations of taints when scheduling the pod
affinity – affinity when scheduling the pod
requests – a dict of resource requests
limits – a dict of resource limits
envs – environment variables
init_containers – init containers before the template runs
sidecars – sidecar containers
- class dflow.op_template.ScriptOPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, image: str | None = None, command: str | List[str] | None = None, script: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, init_progress: str = '0/1', timeout: str | None = None, retry_strategy: object | None = None, resource: object | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, envs: Dict[str, str | Secret | object] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, script_rendered: bool = False)¶
Bases:
OPTemplate
Script OP template
- Parameters:
name – the name of the OP template
inputs – input parameters and input artifacts
outputs – output parameters and output artifacts
image – image the template uses
command – command to run the script
script – script
volumes – volumes the template uses
mounts – volumes the template mounts
init_progress – a str representing the initial progress
timeout – timeout of the OP template
retry_strategy – retry strategy of the OP template
memoize_key – memoized key of the OP template
pvcs – PVCs need to be declared
image_pull_policy – Always, IfNotPresent, Never
annotations – annotations for the OP template
labels – labels for the OP template
node_selector – node selector when scheduling the pod
tolerations – tolerations of taints when scheduling the pod
affinity – affinity when scheduling the pod
requests – a dict of resource requests
limits – a dict of resource limits
envs – environment variables
- convert_to_argo(memoize_prefix=None, memoize_configmap='dflow')¶
- convert_to_graph()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- class dflow.op_template.Secret(value=None, name=None, key='secret')¶
Bases:
object
- class dflow.op_template.ShellOPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, image: str | None = None, command: str | List[str] | None = None, script: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, init_progress: str = '0/1', timeout: str | None = None, retry_strategy: object | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, script_rendered: bool = False)¶
Bases:
ScriptOPTemplate
Shell script OP template
- Parameters:
name – the name of the OP template
inputs – input parameters and input artifacts
outputs – output parameters and output artifacts
image – image the template uses
command – command to run the script
script – shell script
volumes – volumes the template uses
mounts – volumes the template mounts
init_progress – a str representing the initial progress
timeout – timeout of the OP template
retry_strategy – retry strategy of the OP template
memoize_key – memoized key of the OP template
pvcs – PVCs need to be declared
image_pull_policy – Always, IfNotPresent, Never
annotations – annotations for the OP template
labels – labels for the OP template
node_selector – node selector when scheduling the pod
tolerations – tolerations of taints when scheduling the pod
affinity – affinity when scheduling the pod
requests – a dict of resource requests
limits – a dict of resource limits
envs – environment variables
init_containers – init containers before the template runs
sidecars – sidecar containers
- dflow.op_template.get_k8s_client(k8s_api_server=None, token=None, k8s_config_file=None)¶
dflow.resource module¶
- class dflow.resource.Resource¶
Bases:
ABC
- Parameters:
action – action on the Kubernetes resource
success_condition – expression representing success
failure_condition – expression representing failure
- action: str | None = None¶
- failure_condition: str | None = None¶
- abstract get_manifest(template: OPTemplate) OPTemplate ¶
The method to get the manifest (str)
- success_condition: str | None = None¶
dflow.slurm module¶
- class dflow.slurm.SlurmJob(header='', node_selector=None, prepare=None, results=None, map_tmp_dir=True, workdir='.', remote_command=None, docker_executable=None, singularity_executable=None, podman_executable=None)¶
Bases:
Resource
- get_manifest(template)¶
The method to get the manifest (str)
- class dflow.slurm.SlurmJobTemplate(header: str = '', node_selector: Dict[str, str] | None = None, prepare_image: str | None = None, prepare_image_pull_policy: str | None = None, collect_image: str | None = None, collect_image_pull_policy: str | None = None, workdir: str = 'dflow/workflows/{{workflow.name}}/{{pod.name}}', remote_command: str | List[str] | None = None, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None)¶
Bases:
Executor
Slurm job template
- Parameters:
header – header for Slurm job
node_selector – node selector
prepare_image – image for preparing data
prepare_image_pull_policy – image pull policy for preparing data
collect_image – image for collecting results
collect_image_pull_policy – image pull policy for collecting results
workdir – remote working directory
remote_command – command for running the script remotely
docker_executable – docker executable to run remotely
singularity_executable – singularity executable to run remotely
podman_executable – podman executable to run remotely
- render(template)¶
render original template and return a new template, do not modify self in this method to make the executor reusable
- class dflow.slurm.SlurmRemoteExecutor(host: str, port: int = 22, username: str = 'root', password: str | None = None, private_key_file: PathLike | None = None, workdir: str = '~/dflow/workflows/{{workflow.name}}/{{pod.name}}', command: str | List[str] | None = None, remote_command: str | List[str] | None = None, image: str | None = None, image_pull_policy: str | None = None, map_tmp_dir: bool = True, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None, action_retries: int = -1, header: str = '', interval: int = 3, pvc: PVC | None = None)¶
Bases:
RemoteExecutor
Slurm remote executor
- Parameters:
host – remote host
port – SSH port
username – username
password – password for SSH
private_key_file – private key file for SSH
workdir – remote working directory
command – command for the executor
remote_command – command for running the script remotely
image – image for the executor
image_pull_policy – image pull policy for the executor
map_tmp_dir – map /tmp to ./tmp
docker_executable – docker executable to run remotely
singularity_executable – singularity executable to run remotely
podman_executable – podman executable to run remotely
action_retries – retries for actions (upload, execute commands, download), -1 for infinity
header – header for Slurm job
interval – query interval for Slurm
- render(template)¶
render original template and return a new template, do not modify self in this method to make the executor reusable
- run(image, remote_command)¶
dflow.step module¶
- class dflow.step.ArgoConcat(param)¶
Bases:
object
- class dflow.step.ArgoSequence(count, start, end, format)¶
Bases:
object
- convert_to_argo()¶
- evalable_repr(imports)¶
- classmethod from_dict(d)¶
- to_dict()¶
- class dflow.step.ArgoSum(param)¶
Bases:
object
- class dflow.step.HookStep(template: OPTemplate, expression: str | None = None, **kwargs)¶
Bases:
Step
- convert_to_argo(context=None)¶
- class dflow.step.Step(name: str, template: OPTemplate, parameters: Dict[str, Any] | None = None, artifacts: Dict[str, S3Artifact | InputArtifact | OutputArtifact | None] | None = None, when: str | None = None, with_param: str | list | InputParameter | OutputParameter | None = None, continue_on_failed: bool = False, continue_on_error: bool = False, continue_on_num_success: int | None = None, continue_on_success_ratio: float | None = None, with_sequence: object | None = None, key: str | None = None, executor: Executor | None = None, use_resource: Resource | None = None, util_image: str | None = None, util_image_pull_policy: str | None = None, util_command: str | List[str] | None = None, parallelism: int | None = None, slices: Slices | None = None, success_hook: HookStep | None = None, running_hook: HookStep | None = None, failure_hook: HookStep | None = None)¶
Bases:
object
- Parameters:
name – the name of the step
template – OP template the step uses
parameters – input parameters passed to the step as arguments
artifacts – input artifacts passed to the step as arguments
when – conditional step if the condition is satisfied
with_param – generate parallel steps with respect to a list as a parameter
continue_on_failed – continue if the step fails
continue_on_error – continue if the step meets error
continue_on_num_success – continue if the success number of the generated parallel steps greater than certain number
continue_on_success_ratio – continue if the success ratio of the generated parallel steps greater than certain number
with_sequence – generate parallel steps with respect to a sequence
key – the key of the step
executor – define the executor to execute the script
use_resource – use k8s resource
util_image – image for utility step
util_image_pull_policy – image pull policy for utility step
util_command – command for utility step
parallelism – parallelism for sliced step
slices – override slices of OP template
- convert_to_argo(context=None)¶
- convert_to_graph()¶
- exec(scope, parameters, item=None, context=None)¶
- exec_steps(scope, parameters, item=None, context=None)¶
- exec_with_config(scope, parameters, item, conf, s3_conf, cwd, context=None)¶
- property expr¶
- classmethod from_dict(d, templates)¶
- classmethod from_graph(graph, templates)¶
- handle_sub_path_slices_of_artifact_list(slices, artifacts)¶
- load_output_artifacts(stepdir, artifacts)¶
- load_output_parameters(stepdir, parameters)¶
- prepare_argo_arguments(context=None)¶
- record_input_artifacts(stepdir, artifacts, item, scope, ignore_nonexist=False)¶
- record_input_parameters(stepdir, parameters)¶
- record_output_artifacts(stepdir, artifacts)¶
- record_output_parameters(stepdir, parameters)¶
- render_by_executor(context=None)¶
- run(scope, context=None)¶
- run_with_config(scope, context, conf, s3_conf, cwd)¶
- set_artifacts(artifacts)¶
- set_parameters(parameters)¶
- dflow.step.add_slices(templ: OPTemplate, slices: Slices, layer=0)¶
- dflow.step.argo_concat(param: ArgoVar) ArgoConcat ¶
Return the concatenation of a list of lists which is an Argo parameter
- Parameters:
param – the Argo parameter which is a list of lists
- dflow.step.argo_enumerate(*args, **kwargs) ArgoVar ¶
Return the enumeration of a list which is an Argo parameter
- Parameters:
param – the Argo parameter which is a list
- dflow.step.argo_len(param: ArgoVar | S3Artifact) ArgoVar ¶
Return the length of a list which is an Argo parameter
- Parameters:
param – the Argo parameter which is a list
- dflow.step.argo_range(*args) ArgoVar ¶
Return a str representing a range of integer in Argo It receives 1-3 arguments, which is similar to the function range in
Python
Each argument can be Argo parameter
- dflow.step.argo_sequence(count: int | ArgoVar | None = None, start: int | ArgoVar | None = None, end: int | ArgoVar | None = None, format: str | None = None) object ¶
Return a numeric sequence in Argo
- Parameters:
count – number of elements in the sequence (default: 0), not to be used with end, can be an Argo parameter
start – number at which to start the sequence (default: 0), can be an Argo parameter
end – number at which to end the sequence (default: 0), not to be used with count, can be an Argo parameter
format – a printf format string to format the value in the sequence
- dflow.step.argo_sum(param: ArgoVar) ArgoSum ¶
Return the sum of a list of integers which is an Argo parameter
- Parameters:
param – the Argo parameter which is a list of integers
- dflow.step.backup(path)¶
- dflow.step.download_artifact_debug(artifact, path)¶
- dflow.step.download_with_lock(download, path)¶
- dflow.step.eval_expr(expr)¶
- dflow.step.get_var(expr, scope)¶
- dflow.step.render_expr(expr, scope)¶
- dflow.step.render_item(expr, item)¶
- dflow.step.render_script(script, parameters, workflow_id=None, step_id=None)¶
- dflow.step.replace_argo_func(expr)¶
- dflow.step.upload_python_packages(python_packages)¶
dflow.steps module¶
- class dflow.steps.Steps(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, steps: List[Step | List[Step]] | None = None, memoize_key: str | None = None, annotations: Dict[str, str] | None = None, parallelism: int | None = None)¶
Bases:
OPTemplate
- Parameters:
name – the name of the steps
inputs – inputs in the template
outputs – outputs in the template
steps – a sequential list of steps
memoize_key – memoized key of the steps
annotations – annotations for the OP template
parallelism – maximum number of running pods for the OP template
- add(step: Step | List[Step]) None ¶
Add a step or a list of parallel steps to the steps
- Parameters:
step – a step or a list of parallel steps to be added to the entrypoint of the workflow
- add_slices(slices, layer=0)¶
- convert_to_argo(memoize_prefix=None, memoize_configmap='dflow', context=None)¶
- convert_to_graph()¶
- copy()¶
- classmethod from_dict(d, templates)¶
- classmethod from_graph(graph, templates)¶
- run(workflow_id=None, context=None)¶
dflow.task module¶
- class dflow.task.Task(name: str, template: OPTemplate, dependencies: List[Task | str] | None = None, **kwargs)¶
Bases:
Step
- Parameters:
name – the name of the task
template – OP template the task uses
parameters – input parameters passed to the task as arguments
artifacts – input artifacts passed to the task as arguments
when – conditional task if the condition is satisfied
with_param – generate parallel tasks with respect to a list as a parameter
continue_on_failed – continue if the task fails
continue_on_num_success – continue if the success number of the generated parallel tasks greater than certain number
continue_on_success_ratio – continue if the success ratio of the generated parallel tasks greater than certain number
with_sequence – generate parallel tasks with respect to a sequence
key – the key of the task
executor – define the executor to execute the script
use_resource – use k8s resource
util_image – image for utility step
util_image_pull_policy – image pull policy for utility step
util_command – command for utility step
dependencies – extra dependencies of the task
- convert_to_argo(context=None)¶
- convert_to_graph()¶
- property expr¶
- classmethod from_dict(d, templates)¶
- set_artifacts(artifacts)¶
- set_parameters(parameters)¶
dflow.util_ops module¶
- class dflow.util_ops.CheckNumSuccess(name='check-num-success', image=None, image_pull_policy=None)¶
Bases:
ShellOPTemplate
- class dflow.util_ops.CheckSuccessRatio(name='check-success-ratio', image=None, image_pull_policy=None)¶
Bases:
ShellOPTemplate
- class dflow.util_ops.InitArtifactForSlices(template, image, command, image_pull_policy, key, sliced_output_artifact=None, sliced_input_artifact=None, sum_var=None, concat_var=None, auto_loop_artifacts=None, group_size=None, format=None, post_script='', tmp_root='/tmp')¶
Bases:
PythonScriptOPTemplate
- render_script()¶
dflow.utils module¶
- class dflow.utils.ArtifactDict¶
Bases:
dict
- class dflow.utils.ArtifactList(iterable=(), /)¶
Bases:
list
- class dflow.utils.ArtifactPosixPath(*args, **kwargs)¶
Bases:
PosixPath
- class dflow.utils.ArtifactSet¶
Bases:
set
- class dflow.utils.ArtifactStr¶
Bases:
str
- class dflow.utils.ArtifactWindowsPath(*args, **kwargs)¶
Bases:
WindowsPath
- class dflow.utils.MinioClient(endpoint: str | None = None, access_key: str | None = None, secret_key: str | None = None, secure: bool | None = None, bucket_name: str | None = None, **kwargs)¶
Bases:
StorageClient
- copy(src: str, dst: str) None ¶
- download(key: str, path: str) None ¶
- get_md5(key: str) str ¶
- list(prefix: str, recursive: bool = False) List[str] ¶
- upload(key: str, path: str) None ¶
- class dflow.utils.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())¶
Bases:
ProcessPoolExecutor
- class dflow.utils.StorageClient¶
Bases:
ABC
- abstract copy(src: str, dst: str) None ¶
- abstract download(key: str, path: str) None ¶
- abstract get_md5(key: str) str ¶
- abstract list(prefix: str, recursive: bool = False) List[str] ¶
- abstract upload(key: str, path: str) None ¶
- dflow.utils.append_item(catalog, item)¶
- dflow.utils.assemble_path_object(art_path, remove=False)¶
- dflow.utils.assemble_path_object_from_catalog(catalog, art_path=None)¶
- dflow.utils.catalog_of_artifact(art, **kwargs) List[dict] ¶
- dflow.utils.catalog_of_local_artifact(art_path, remove=False)¶
- dflow.utils.convert_dflow_list(dflow_list)¶
- dflow.utils.copy_artifact(src, dst, sort=False) S3Artifact ¶
Copy an artifact to another on server side
- Parameters:
src – source artifact
dst – destination artifact
sort – append the path list of dst after that of src
- dflow.utils.copy_file(src, dst, func=<function try_link>)¶
- dflow.utils.copy_s3(src_key: str, dst_key: str, recursive: bool = True, ignore_catalog: bool = False, **kwargs) None ¶
- dflow.utils.dict2list(d: dict)¶
- dflow.utils.download_artifact(artifact, extract: bool = True, sub_path: str | None = None, slice: int | None = None, path: PathLike = '.', remove_catalog: bool = True, **kwargs) List[str] ¶
Download an artifact from Argo to local
- Parameters:
artifact – artifact to be downloaded
extract – extract files if the artifact is compressed
sub_path – download a subdir of an artifact
slice – download a slice of an artifact
path – local path
endpoint – endpoint for Minio
access_key – access key for Minio
secret_key – secret key for Minio
secure – secure or not for Minio
bucket_name – bucket name for Minio
skip_exists – skip files with the same MD5
- dflow.utils.download_s3(key: str, path: PathLike = '.', recursive: bool = True, skip_exists: bool = False, keep_dir: bool = False, **kwargs) str ¶
- dflow.utils.evalable_repr(obj, imports)¶
- dflow.utils.expand(d: dict) list | dict ¶
- dflow.utils.find_subclass(pkg, cls)¶
- dflow.utils.flatten(d: list | dict) dict ¶
- dflow.utils.force_link(src, dst, func=<built-in function symlink>)¶
- dflow.utils.force_move(src, dst)¶
- dflow.utils.get_key(artifact, raise_error=True)¶
- dflow.utils.get_md5(f)¶
- dflow.utils.link(src, dst)¶
- dflow.utils.linktree(src, dst, func=<built-in function symlink>)¶
- dflow.utils.merge_dir(src, dst, func=<function force_move>)¶
- dflow.utils.path_list_of_artifact(art, **kwargs) List[str] ¶
- dflow.utils.path_object_of_artifact(art, **kwargs) list | dict ¶
- dflow.utils.randstr(length: int = 5) str ¶
- dflow.utils.remove_empty_dir_tag(path)¶
- dflow.utils.run_command(cmd: List[str] | str, raise_error: bool = True, input: str | None = None, try_bash: bool = False, login: bool = True, interactive: bool = True, shell: bool = False, print_oe: bool = False, **kwargs) Tuple[int, str, str] ¶
Run shell command in subprocess
Parameters:¶
- cmd: list of str, or str
Command to execute
- raise_error: bool
Wheter to raise an error if the command failed
- input: str, optional
Input string for the command
- try_bash: bool
Try to use bash if bash exists, otherwise use sh
- login: bool
Login mode of bash when try_bash=True
- interactive: bool
Alias of login
- shell: bool
Use shell for subprocess.Popen
- print_oe: bool
Print stdout and stderr at the same time
- **kwargs:
Arguments in subprocess.Popen
Raises:¶
- AssertionError:
Raises if the error failed to execute and raise_error set to True
Return:¶
- return_code: int
The return code of the command
- out: str
stdout content of the executed command
- err: str
stderr content of the executed command
- dflow.utils.set_directory(dirname: PathLike, mkdir: bool = False)¶
Set current workding directory within context
- Parameters:
dirname (os.PathLike) – The directory path to change to
mkdir (bool) – Whether make directory if dirname does not exist
- Yields:
path (Path) – The absolute path of the changed working directory
Examples
>>> with set_directory("some_path"): ... do_something()
- dflow.utils.set_key(artifact, key)¶
- dflow.utils.subclass_or_none(m, cls)¶
- dflow.utils.try_link(src, dst)¶
- dflow.utils.upload_artifact(path: PathLike | List[PathLike] | Set[PathLike] | Dict[str, PathLike] | list | dict, archive: str = 'default', namespace: str | None = None, dataset_name: str | None = None, **kwargs) S3Artifact ¶
Upload an artifact from local to Argo
- Parameters:
path – local path
archive – compress format of the artifact, None for no compression
endpoint – endpoint for Minio
access_key – access key for Minio
secret_key – secret key for Minio
secure – secure or not for Minio
bucket_name – bucket name for Minio
- dflow.utils.upload_s3(path: ~os.PathLike, key: str | None = None, prefix: str | None = None, debug_func=<built-in function symlink>, **kwargs) str ¶
dflow.workflow module¶
- class dflow.workflow.DockerSecret(registry, username, password, name=None)¶
Bases:
object
- class dflow.workflow.Workflow(name: str = 'workflow', steps: Steps | None = None, dag: DAG | None = None, namespace: str | None = None, id: str | None = None, uid: str | None = None, host: str | None = None, token: str | None = None, k8s_config_file: PathLike | None = None, k8s_api_server: str | None = None, context: Context | Executor | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, parallelism: int | None = None, pod_gc_strategy: str | None = None, image_pull_secrets: str | DockerSecret | List[str | DockerSecret] | None = None, artifact_repo_key: str | None = None, parameters: Dict[str, Any] | None = None, on_exit: OPTemplate | None = None)¶
Bases:
object
- Parameters:
name – the name of the workflow
steps – steps used as the entrypoint of the workflow, if not provided, a empty steps will be used
dag – dag used as the entrypoint of the workflow
namespace – k8s namespace
id – workflow ID in Argo, you can provide it to track an existing workflow
host – URL of the Argo server, will override global config
token – request the Argo server with the token, will override global config
k8s_config_file – Kubernetes configuration file for accessing API server, will override global config
k8s_api_server – Url of kubernetes API server, will override global config
context – context for the workflow
annotations – annotations for the workflow
parallelism – maximum number of running pods for the workflow
pod_gc_stategy –
pod GC provides the ability to delete pods automatically without deleting the workflow, pod GC strategy must be one of the following:
- OnPodCompletion - delete pods immediately when pod is completed
(including errors/failures)
OnPodSuccess - delete pods immediately when pod is successful
OnWorkflowCompletion - delete pods when workflow is completed
OnWorkflowSuccess - delete pods when workflow is successful
image_pull_secrets – secrets for image registies
artifact_repo_key – use artifact repository reference by key
parameters – global input parameters
- add(step: Step | List[Step] | Task | List[Task]) None ¶
Add a step or a list of parallel steps to the workflow
- Parameters:
step – a step or a list of parallel steps to be added to the
workflow (entrypoint of the)
- convert_to_argo(reuse_step=None)¶
- deduplicate_templates()¶
- delete() None ¶
Delete the workflow
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- classmethod from_graph_json(j, **kwargs)¶
- classmethod from_graph_yaml(y, **kwargs)¶
- classmethod from_json(s)¶
- classmethod from_yaml(s)¶
- get_graph_templates(template, graph_templates=None)¶
- get_k8s_core_v1_api()¶
- handle_reused_artifact(step, name, art, group_key)¶
- handle_reused_artifact_with_copy(step, name, art, group_key)¶
- handle_reused_step(step, global_parameters, global_artifacts)¶
- handle_template(template, memoize_prefix=None, memoize_configmap='dflow')¶
- query(fields: List[str] | None = None, retry: int = 3) ArgoWorkflow ¶
Query the workflow from Argo If fields is not provided, full information of all steps will be returned [O(n)]
- Parameters:
fields – fields of the workflow to be returned
- Returns:
an ArgoWorkflow object
- query_global_outputs() ArgoWorkflow ¶
Query the global outputs of the workflow from Argo The function is O(1)
- Parameters:
key – filter by key of step
- Returns:
a list of steps
- query_keys_of_steps() List[str] ¶
Query the keys of existing steps of the workflow from Argo This function will try to get keys from the global outputs, which is O(1). If failed, it will downgrade to query full steps
- Returns:
a list of keys
- query_status() str ¶
Query the status of the workflow from Argo The function is O(1)
- Returns:
Pending, Running, Succeeded, Failed, Error, etc
- query_step(name: str | List[str] | None = None, key: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None, parent_id: str | None = None, sort_by_generation: bool = False) List[ArgoStep] ¶
Query the existing steps of the workflow from Argo This function will query full steps from server [O(n)], then filter with conditions given in the arguments If you want to call this function multiple times successively, it is recommended to call query once and call get_step repeatedly, e.g. info = wf.query() step1 = info.get_step(key=”step1”) step2 = info.get_step(key=”step2”)
- Parameters:
name – filter by name of step, support regex
key – filter by key of step
phase – filter by phase of step
id – filter by id of step
type – filter by type of step
parent_id – get sub steps of a specific step
sort_by_generation – sort results by the number of generation from the root node
- Returns:
a list of steps
- query_step_by_key(key: str | List[str], name: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None) List[ArgoStep] ¶
Query the existing steps of the workflow from Argo by key This function will try to get key-ID map from the global outputs, then query step by ID, which is O(m) where m is the number of the requested keys. If failed, it will downgrade to query full steps
- Parameters:
key – filter by key of step
- Returns:
a list of steps
- resubmit() None ¶
Resubmit the workflow
- resume() None ¶
Resume the workflow
- retry() None ¶
Retry the workflow
- retry_steps(step_ids)¶
- stop() None ¶
Stop the workflow
- submit(reuse_step: List[ArgoStep] | None = None) ArgoWorkflow ¶
Submit the workflow
- Parameters:
reuse_step – a list of steps to be reused in the workflow
- suspend() None ¶
Suspend the workflow
- terminate() None ¶
Terminate the workflow
- to_dict()¶
- to_graph()¶
- to_graph_json(**kwargs)¶
- to_graph_yaml(**kwargs)¶
- to_json(**kwargs)¶
- to_yaml(**kwargs)¶
- wait(interval=1)¶
- dflow.workflow.get_argo_api_client(host=None, token=None)¶
- dflow.workflow.parse_repo(repo_key=None, namespace=None, **kwargs)¶
- dflow.workflow.query_archived_workflows(labels: Dict[str, str] | None = None, id: str | None = None) List[ArgoWorkflow] ¶
- dflow.workflow.query_workflows(labels: Dict[str, str] | None = None, fields: List[str] | None = None) List[ArgoWorkflow] ¶
Module contents¶
- class dflow.ArgoStep(step, workflow)¶
Bases:
ArgoObjectDict
- delete_pod()¶
- get_duration() timedelta ¶
- get_pod()¶
- get_script()¶
- handle_big_parameters(io)¶
- handle_io(io)¶
- modify_output_artifact(name: str, s3: S3Artifact) None ¶
Modify output artifact of an Argo step
- Parameters:
name – artifact name
s3 – replace the artifact with a s3 object
- modify_output_parameter(name: str, value: Any) None ¶
Modify output parameter of an Argo step
- Parameters:
name – parameter name
value – new value
- replay()¶
- retry()¶
- set_script(script)¶
- class dflow.ArgoWorkflow(d)¶
Bases:
ArgoObjectDict
- get_duration() timedelta ¶
- get_step(name: str | List[str] | None = None, key: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None, parent_id: str | None = None, sort_by_generation: bool = False) List[ArgoStep] ¶
- get_sub_nodes(node_id)¶
- record_generation(node_id, generation)¶
- class dflow.AutonamedDict(*args, **kwargs)¶
Bases:
UserDict
- convert_to_graph()¶
- set_step(step)¶
- set_template(template)¶
- class dflow.ContainerExecutor(docker: str | None = None, singularity: str | None = None, podman: str | None = None, image_pull_policy: str | None = None)¶
Bases:
Executor
- render(template)¶
render original template and return a new template, do not modify self in this method to make the executor reusable
- class dflow.Context¶
Bases:
ABC
- abstract render(template: OPTemplate) OPTemplate ¶
render original template and return a new template
- class dflow.CustomArtifact¶
Bases:
ABC
- abstract download(name: str, path: str)¶
- static from_urn(urn)¶
- abstract get_urn() str ¶
- redirect = None¶
- render(template, name: str)¶
- class dflow.DAG(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, tasks: List[Task] | None = None, memoize_key: str | None = None, annotations: Dict[str, str] | None = None, parallelism: int | None = None)¶
Bases:
OPTemplate
- Parameters:
name – the name of the dag
inputs – inputs in the template
outputs – outputs in the template
tasks – a list of tasks
memoize_key – memoized key of the dag
annotations – annotations for the OP template
parallelism – maximum number of running pods for the OP template
- add(task: Task | List[Task]) None ¶
Add a task or a list of tasks to the dag
- Parameters:
task – a task or a list of tasks to be added to the dag
- add_slices(slices, layer=0)¶
- convert_to_argo(memoize_prefix=None, memoize_configmap='dflow', context=None)¶
- convert_to_graph()¶
- copy()¶
- classmethod from_dict(d, templates)¶
- classmethod from_graph(graph, templates)¶
- resolve(pool, futures)¶
- run(workflow_id=None, context=None)¶
- class dflow.DockerSecret(registry, username, password, name=None)¶
Bases:
object
- class dflow.Executor¶
Bases:
ABC
- abstract render(template: OPTemplate) OPTemplate ¶
render original template and return a new template, do not modify self in this method to make the executor reusable
- class dflow.HookStep(template: OPTemplate, expression: str | None = None, **kwargs)¶
Bases:
Step
- convert_to_argo(context=None)¶
- class dflow.IfExpression(_if: str | ArgoVar, _then: str | ArgoVar, _else: str | ArgoVar)¶
Bases:
ArgoVar
,Expression
- class dflow.InputArtifact(path: str | None = None, name: str | None = None, step=None, template=None, optional: bool = False, type: Any | None = None, source: str | InputArtifact | OutputArtifact | S3Artifact | None = None, mode: int | None = None, sub_path: str | None = None, archive: str = 'default', save_as_parameter: bool = False, **kwargs)¶
Bases:
ArgoVar
Input artifact for OP template
- Parameters:
path – path where the input artifact is placed in the container
name – name of the input artifact
optional – optional artifact or not
type – artifact type
source – default source
archive – regarded as archived file or not
- convert_to_argo()¶
- convert_to_graph()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- get_path_list_parameter()¶
- get_urn_parameter()¶
- sub_path(path)¶
- class dflow.InputParameter(name: str | None = None, step=None, template=None, type: Any | None = None, save_as_artifact: bool = False, path: str | None = None, source: S3Artifact | InputArtifact | OutputArtifact | None = None, **kwargs)¶
Bases:
ArgoVar
Input parameter for OP template
- Parameters:
name – name of the input parameter
type – parameter type
value – default value
- convert_to_argo()¶
- convert_to_graph()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- class dflow.Inputs(parameters: Dict[str, InputParameter] | None = None, artifacts: Dict[str, InputArtifact] | None = None, step=None, template=None)¶
Bases:
object
Inputs for OP template
- Parameters:
parameters – input parameters
artifacts – input artifacts
- convert_to_argo()¶
- convert_to_graph()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- set_step(step)¶
- set_template(template)¶
- class dflow.LineageClient¶
Bases:
ABC
- abstract get_artifact_metadata(urn: str) object ¶
- abstract register_artifact(namespace: str, name: str, uri: str, **kwargs) str ¶
- abstract register_task(task_name: str, input_urns: Dict[str, str | List[str]], output_uris: Dict[str, str], workflow_urn: str) Dict[str, str] ¶
- abstract register_workflow(workflow_name: str) str ¶
- class dflow.OPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None)¶
Bases:
object
- copy()¶
- deepcopy()¶
- classmethod from_dict(d)¶
- handle_key(memoize_prefix=None, memoize_configmap='dflow')¶
- class dflow.OutputArtifact(path: str | None = None, _from: InputArtifact | OutputArtifact | None = None, name: str | None = None, step=None, template=None, type: Any | None = None, save: List[PVC | S3Artifact] | None = None, archive: str = 'default', global_name: str | None = None, from_expression: IfExpression | str | None = None, optional: bool = False)¶
Bases:
ArgoVar
Output artifact for OP template
- Parameters:
path – path of the output artifact in the container
_from – the artifact is from another artifact
name – name of the output artifact
type – artifact type
save – place to store the output artifact instead of default storage, can be a list
archive – compress format of the artifact, None for no compression
global_name – global name of the artifact within the workflow
from_expression – the artifact is from an expression
- convert_to_argo()¶
- convert_to_graph()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- get_path_list_parameter()¶
- get_urn_parameter()¶
- handle_path_list()¶
- handle_urn()¶
- pvc(size='1Gi', storage_class=None, access_modes=None)¶
- sub_path(path)¶
- class dflow.OutputParameter(value_from_path: str | None = None, value_from_parameter: InputParameter | OutputParameter | None = None, name: str | None = None, step=None, template=None, type: Any | None = None, global_name: str | None = None, value_from_expression: IfExpression | str | None = None, save_as_artifact: bool = False, save_both: bool = False, **kwargs)¶
Bases:
ArgoVar
Output parameter for OP template
- Parameters:
value_from_path – the value is read from file generated in the container
value_from_parameter – the value is from another parameter
name – name of the output parameter
type – parameter type
default – default value
global_name – global name of the parameter within the workflow
value_from_expression – the value is from an expression
value – specify value directly
- convert_to_argo()¶
- convert_to_argo_artifact()¶
- convert_to_argo_parameter()¶
- convert_to_graph()¶
- expr_as_artifact()¶
- expr_as_parameter()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- repr_as_artifact()¶
- repr_as_parameter()¶
- class dflow.Outputs(parameters: Dict[str, OutputParameter] | None = None, artifacts: Dict[str, OutputArtifact] | None = None, step=None, template=None)¶
Bases:
object
Outputs for OP template
- Parameters:
paramters – output parameters
artifacts – output artifacts
- convert_to_argo()¶
- convert_to_graph()¶
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- set_step(step)¶
- set_template(template)¶
- class dflow.PythonScriptOPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, image: str | None = None, command: str | List[str] | None = None, script: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, init_progress: str = '0/1', timeout: str | None = None, retry_strategy: object | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, script_rendered: bool = False)¶
Bases:
ScriptOPTemplate
Python script OP template
- Parameters:
name – the name of the OP template
inputs – input parameters and input artifacts
outputs – output parameters and output artifacts
image – image the template uses
command – command to run the script
script – python script
volumes – volumes the template uses
mounts – volumes the template mounts
init_progress – a str representing the initial progress
timeout – timeout of the OP template
retry_strategy – retry strategy of the OP template
memoize_key – memoized key of the OP template
pvcs – PVCs need to be declared
image_pull_policy – Always, IfNotPresent, Never
annotations – annotations for the OP template
labels – labels for the OP template
node_selector – node selector when scheduling the pod
tolerations – tolerations of taints when scheduling the pod
affinity – affinity when scheduling the pod
requests – a dict of resource requests
limits – a dict of resource limits
envs – environment variables
init_containers – init containers before the template runs
sidecars – sidecar containers
- class dflow.RemoteExecutor(host: str, port: int = 22, username: str = 'root', password: str | None = None, private_key_file: PathLike | None = None, workdir: str = '~/dflow/workflows/{{workflow.name}}/{{pod.name}}', command: str | List[str] | None = None, remote_command: str | List[str] | None = None, image: str | None = None, image_pull_policy: str | None = None, map_tmp_dir: bool = True, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None, action_retries: int = -1)¶
Bases:
Executor
- download(src, dst)¶
- execute(cmd)¶
- get_script(template)¶
- mkdir_and_download(path)¶
- render(template)¶
render original template and return a new template, do not modify self in this method to make the executor reusable
- run(image, remote_command)¶
- upload(src, dst)¶
- upload_if_exists(path)¶
- class dflow.Resource¶
Bases:
ABC
- Parameters:
action – action on the Kubernetes resource
success_condition – expression representing success
failure_condition – expression representing failure
- action: str | None = None¶
- failure_condition: str | None = None¶
- abstract get_manifest(template: OPTemplate) OPTemplate ¶
The method to get the manifest (str)
- success_condition: str | None = None¶
- class dflow.S3Artifact(key: str | None = None, path_list: str | list | None = None, urn: str = '', debug_s3: bool = False, *args, **kwargs)¶
Bases:
object
S3 artifact
- Parameters:
key – key of the s3 artifact
- download(**kwargs)¶
- evalable_repr(imports)¶
- classmethod from_dict(d)¶
- oss()¶
- sub_path(path: str) Any ¶
- to_dict()¶
- class dflow.Secret(value=None, name=None, key='secret')¶
Bases:
object
- class dflow.ShellOPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, image: str | None = None, command: str | List[str] | None = None, script: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, init_progress: str = '0/1', timeout: str | None = None, retry_strategy: object | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, script_rendered: bool = False)¶
Bases:
ScriptOPTemplate
Shell script OP template
- Parameters:
name – the name of the OP template
inputs – input parameters and input artifacts
outputs – output parameters and output artifacts
image – image the template uses
command – command to run the script
script – shell script
volumes – volumes the template uses
mounts – volumes the template mounts
init_progress – a str representing the initial progress
timeout – timeout of the OP template
retry_strategy – retry strategy of the OP template
memoize_key – memoized key of the OP template
pvcs – PVCs need to be declared
image_pull_policy – Always, IfNotPresent, Never
annotations – annotations for the OP template
labels – labels for the OP template
node_selector – node selector when scheduling the pod
tolerations – tolerations of taints when scheduling the pod
affinity – affinity when scheduling the pod
requests – a dict of resource requests
limits – a dict of resource limits
envs – environment variables
init_containers – init containers before the template runs
sidecars – sidecar containers
- class dflow.SlurmJob(header='', node_selector=None, prepare=None, results=None, map_tmp_dir=True, workdir='.', remote_command=None, docker_executable=None, singularity_executable=None, podman_executable=None)¶
Bases:
Resource
- get_manifest(template)¶
The method to get the manifest (str)
- class dflow.SlurmJobTemplate(header: str = '', node_selector: Dict[str, str] | None = None, prepare_image: str | None = None, prepare_image_pull_policy: str | None = None, collect_image: str | None = None, collect_image_pull_policy: str | None = None, workdir: str = 'dflow/workflows/{{workflow.name}}/{{pod.name}}', remote_command: str | List[str] | None = None, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None)¶
Bases:
Executor
Slurm job template
- Parameters:
header – header for Slurm job
node_selector – node selector
prepare_image – image for preparing data
prepare_image_pull_policy – image pull policy for preparing data
collect_image – image for collecting results
collect_image_pull_policy – image pull policy for collecting results
workdir – remote working directory
remote_command – command for running the script remotely
docker_executable – docker executable to run remotely
singularity_executable – singularity executable to run remotely
podman_executable – podman executable to run remotely
- render(template)¶
render original template and return a new template, do not modify self in this method to make the executor reusable
- class dflow.SlurmRemoteExecutor(host: str, port: int = 22, username: str = 'root', password: str | None = None, private_key_file: PathLike | None = None, workdir: str = '~/dflow/workflows/{{workflow.name}}/{{pod.name}}', command: str | List[str] | None = None, remote_command: str | List[str] | None = None, image: str | None = None, image_pull_policy: str | None = None, map_tmp_dir: bool = True, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None, action_retries: int = -1, header: str = '', interval: int = 3, pvc: PVC | None = None)¶
Bases:
RemoteExecutor
Slurm remote executor
- Parameters:
host – remote host
port – SSH port
username – username
password – password for SSH
private_key_file – private key file for SSH
workdir – remote working directory
command – command for the executor
remote_command – command for running the script remotely
image – image for the executor
image_pull_policy – image pull policy for the executor
map_tmp_dir – map /tmp to ./tmp
docker_executable – docker executable to run remotely
singularity_executable – singularity executable to run remotely
podman_executable – podman executable to run remotely
action_retries – retries for actions (upload, execute commands, download), -1 for infinity
header – header for Slurm job
interval – query interval for Slurm
- render(template)¶
render original template and return a new template, do not modify self in this method to make the executor reusable
- run(image, remote_command)¶
- class dflow.Step(name: str, template: OPTemplate, parameters: Dict[str, Any] | None = None, artifacts: Dict[str, S3Artifact | InputArtifact | OutputArtifact | None] | None = None, when: str | None = None, with_param: str | list | InputParameter | OutputParameter | None = None, continue_on_failed: bool = False, continue_on_error: bool = False, continue_on_num_success: int | None = None, continue_on_success_ratio: float | None = None, with_sequence: object | None = None, key: str | None = None, executor: Executor | None = None, use_resource: Resource | None = None, util_image: str | None = None, util_image_pull_policy: str | None = None, util_command: str | List[str] | None = None, parallelism: int | None = None, slices: Slices | None = None, success_hook: HookStep | None = None, running_hook: HookStep | None = None, failure_hook: HookStep | None = None)¶
Bases:
object
- Parameters:
name – the name of the step
template – OP template the step uses
parameters – input parameters passed to the step as arguments
artifacts – input artifacts passed to the step as arguments
when – conditional step if the condition is satisfied
with_param – generate parallel steps with respect to a list as a parameter
continue_on_failed – continue if the step fails
continue_on_error – continue if the step meets error
continue_on_num_success – continue if the success number of the generated parallel steps greater than certain number
continue_on_success_ratio – continue if the success ratio of the generated parallel steps greater than certain number
with_sequence – generate parallel steps with respect to a sequence
key – the key of the step
executor – define the executor to execute the script
use_resource – use k8s resource
util_image – image for utility step
util_image_pull_policy – image pull policy for utility step
util_command – command for utility step
parallelism – parallelism for sliced step
slices – override slices of OP template
- convert_to_argo(context=None)¶
- convert_to_graph()¶
- exec(scope, parameters, item=None, context=None)¶
- exec_steps(scope, parameters, item=None, context=None)¶
- exec_with_config(scope, parameters, item, conf, s3_conf, cwd, context=None)¶
- property expr¶
- classmethod from_dict(d, templates)¶
- classmethod from_graph(graph, templates)¶
- handle_sub_path_slices_of_artifact_list(slices, artifacts)¶
- load_output_artifacts(stepdir, artifacts)¶
- load_output_parameters(stepdir, parameters)¶
- prepare_argo_arguments(context=None)¶
- record_input_artifacts(stepdir, artifacts, item, scope, ignore_nonexist=False)¶
- record_input_parameters(stepdir, parameters)¶
- record_output_artifacts(stepdir, artifacts)¶
- record_output_parameters(stepdir, parameters)¶
- render_by_executor(context=None)¶
- run(scope, context=None)¶
- run_with_config(scope, context, conf, s3_conf, cwd)¶
- set_artifacts(artifacts)¶
- set_parameters(parameters)¶
- class dflow.Steps(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, steps: List[Step | List[Step]] | None = None, memoize_key: str | None = None, annotations: Dict[str, str] | None = None, parallelism: int | None = None)¶
Bases:
OPTemplate
- Parameters:
name – the name of the steps
inputs – inputs in the template
outputs – outputs in the template
steps – a sequential list of steps
memoize_key – memoized key of the steps
annotations – annotations for the OP template
parallelism – maximum number of running pods for the OP template
- add(step: Step | List[Step]) None ¶
Add a step or a list of parallel steps to the steps
- Parameters:
step – a step or a list of parallel steps to be added to the entrypoint of the workflow
- add_slices(slices, layer=0)¶
- convert_to_argo(memoize_prefix=None, memoize_configmap='dflow', context=None)¶
- convert_to_graph()¶
- copy()¶
- classmethod from_dict(d, templates)¶
- classmethod from_graph(graph, templates)¶
- run(workflow_id=None, context=None)¶
- class dflow.Task(name: str, template: OPTemplate, dependencies: List[Task | str] | None = None, **kwargs)¶
Bases:
Step
- Parameters:
name – the name of the task
template – OP template the task uses
parameters – input parameters passed to the task as arguments
artifacts – input artifacts passed to the task as arguments
when – conditional task if the condition is satisfied
with_param – generate parallel tasks with respect to a list as a parameter
continue_on_failed – continue if the task fails
continue_on_num_success – continue if the success number of the generated parallel tasks greater than certain number
continue_on_success_ratio – continue if the success ratio of the generated parallel tasks greater than certain number
with_sequence – generate parallel tasks with respect to a sequence
key – the key of the task
executor – define the executor to execute the script
use_resource – use k8s resource
util_image – image for utility step
util_image_pull_policy – image pull policy for utility step
util_command – command for utility step
dependencies – extra dependencies of the task
- convert_to_argo(context=None)¶
- convert_to_graph()¶
- property expr¶
- classmethod from_dict(d, templates)¶
- set_artifacts(artifacts)¶
- set_parameters(parameters)¶
- class dflow.Workflow(name: str = 'workflow', steps: Steps | None = None, dag: DAG | None = None, namespace: str | None = None, id: str | None = None, uid: str | None = None, host: str | None = None, token: str | None = None, k8s_config_file: PathLike | None = None, k8s_api_server: str | None = None, context: Context | Executor | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, parallelism: int | None = None, pod_gc_strategy: str | None = None, image_pull_secrets: str | DockerSecret | List[str | DockerSecret] | None = None, artifact_repo_key: str | None = None, parameters: Dict[str, Any] | None = None, on_exit: OPTemplate | None = None)¶
Bases:
object
- Parameters:
name – the name of the workflow
steps – steps used as the entrypoint of the workflow, if not provided, a empty steps will be used
dag – dag used as the entrypoint of the workflow
namespace – k8s namespace
id – workflow ID in Argo, you can provide it to track an existing workflow
host – URL of the Argo server, will override global config
token – request the Argo server with the token, will override global config
k8s_config_file – Kubernetes configuration file for accessing API server, will override global config
k8s_api_server – Url of kubernetes API server, will override global config
context – context for the workflow
annotations – annotations for the workflow
parallelism – maximum number of running pods for the workflow
pod_gc_stategy –
pod GC provides the ability to delete pods automatically without deleting the workflow, pod GC strategy must be one of the following:
- OnPodCompletion - delete pods immediately when pod is completed
(including errors/failures)
OnPodSuccess - delete pods immediately when pod is successful
OnWorkflowCompletion - delete pods when workflow is completed
OnWorkflowSuccess - delete pods when workflow is successful
image_pull_secrets – secrets for image registies
artifact_repo_key – use artifact repository reference by key
parameters – global input parameters
- add(step: Step | List[Step] | Task | List[Task]) None ¶
Add a step or a list of parallel steps to the workflow
- Parameters:
step – a step or a list of parallel steps to be added to the
workflow (entrypoint of the)
- convert_to_argo(reuse_step=None)¶
- deduplicate_templates()¶
- delete() None ¶
Delete the workflow
- classmethod from_dict(d)¶
- classmethod from_graph(graph)¶
- classmethod from_graph_json(j, **kwargs)¶
- classmethod from_graph_yaml(y, **kwargs)¶
- classmethod from_json(s)¶
- classmethod from_yaml(s)¶
- get_graph_templates(template, graph_templates=None)¶
- get_k8s_core_v1_api()¶
- handle_reused_artifact(step, name, art, group_key)¶
- handle_reused_artifact_with_copy(step, name, art, group_key)¶
- handle_reused_step(step, global_parameters, global_artifacts)¶
- handle_template(template, memoize_prefix=None, memoize_configmap='dflow')¶
- query(fields: List[str] | None = None, retry: int = 3) ArgoWorkflow ¶
Query the workflow from Argo If fields is not provided, full information of all steps will be returned [O(n)]
- Parameters:
fields – fields of the workflow to be returned
- Returns:
an ArgoWorkflow object
- query_global_outputs() ArgoWorkflow ¶
Query the global outputs of the workflow from Argo The function is O(1)
- Parameters:
key – filter by key of step
- Returns:
a list of steps
- query_keys_of_steps() List[str] ¶
Query the keys of existing steps of the workflow from Argo This function will try to get keys from the global outputs, which is O(1). If failed, it will downgrade to query full steps
- Returns:
a list of keys
- query_status() str ¶
Query the status of the workflow from Argo The function is O(1)
- Returns:
Pending, Running, Succeeded, Failed, Error, etc
- query_step(name: str | List[str] | None = None, key: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None, parent_id: str | None = None, sort_by_generation: bool = False) List[ArgoStep] ¶
Query the existing steps of the workflow from Argo This function will query full steps from server [O(n)], then filter with conditions given in the arguments If you want to call this function multiple times successively, it is recommended to call query once and call get_step repeatedly, e.g. info = wf.query() step1 = info.get_step(key=”step1”) step2 = info.get_step(key=”step2”)
- Parameters:
name – filter by name of step, support regex
key – filter by key of step
phase – filter by phase of step
id – filter by id of step
type – filter by type of step
parent_id – get sub steps of a specific step
sort_by_generation – sort results by the number of generation from the root node
- Returns:
a list of steps
- query_step_by_key(key: str | List[str], name: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None) List[ArgoStep] ¶
Query the existing steps of the workflow from Argo by key This function will try to get key-ID map from the global outputs, then query step by ID, which is O(m) where m is the number of the requested keys. If failed, it will downgrade to query full steps
- Parameters:
key – filter by key of step
- Returns:
a list of steps
- resubmit() None ¶
Resubmit the workflow
- resume() None ¶
Resume the workflow
- retry() None ¶
Retry the workflow
- retry_steps(step_ids)¶
- stop() None ¶
Stop the workflow
- submit(reuse_step: List[ArgoStep] | None = None) ArgoWorkflow ¶
Submit the workflow
- Parameters:
reuse_step – a list of steps to be reused in the workflow
- suspend() None ¶
Suspend the workflow
- terminate() None ¶
Terminate the workflow
- to_dict()¶
- to_graph()¶
- to_graph_json(**kwargs)¶
- to_graph_yaml(**kwargs)¶
- to_json(**kwargs)¶
- to_yaml(**kwargs)¶
- wait(interval=1)¶
- dflow.argo_concat(param: ArgoVar) ArgoConcat ¶
Return the concatenation of a list of lists which is an Argo parameter
- Parameters:
param – the Argo parameter which is a list of lists
- dflow.argo_enumerate(*args, **kwargs) ArgoVar ¶
Return the enumeration of a list which is an Argo parameter
- Parameters:
param – the Argo parameter which is a list
- dflow.argo_len(param: ArgoVar | S3Artifact) ArgoVar ¶
Return the length of a list which is an Argo parameter
- Parameters:
param – the Argo parameter which is a list
- dflow.argo_range(*args) ArgoVar ¶
Return a str representing a range of integer in Argo It receives 1-3 arguments, which is similar to the function range in
Python
Each argument can be Argo parameter
- dflow.argo_sequence(count: int | ArgoVar | None = None, start: int | ArgoVar | None = None, end: int | ArgoVar | None = None, format: str | None = None) object ¶
Return a numeric sequence in Argo
- Parameters:
count – number of elements in the sequence (default: 0), not to be used with end, can be an Argo parameter
start – number at which to start the sequence (default: 0), can be an Argo parameter
end – number at which to end the sequence (default: 0), not to be used with count, can be an Argo parameter
format – a printf format string to format the value in the sequence
- dflow.argo_sum(param: ArgoVar) ArgoSum ¶
Return the sum of a list of integers which is an Argo parameter
- Parameters:
param – the Argo parameter which is a list of integers
- dflow.copy_artifact(src, dst, sort=False) S3Artifact ¶
Copy an artifact to another on server side
- Parameters:
src – source artifact
dst – destination artifact
sort – append the path list of dst after that of src
- dflow.copy_s3(src_key: str, dst_key: str, recursive: bool = True, ignore_catalog: bool = False, **kwargs) None ¶
- dflow.download_artifact(artifact, extract: bool = True, sub_path: str | None = None, slice: int | None = None, path: PathLike = '.', remove_catalog: bool = True, **kwargs) List[str] ¶
Download an artifact from Argo to local
- Parameters:
artifact – artifact to be downloaded
extract – extract files if the artifact is compressed
sub_path – download a subdir of an artifact
slice – download a slice of an artifact
path – local path
endpoint – endpoint for Minio
access_key – access key for Minio
secret_key – secret key for Minio
secure – secure or not for Minio
bucket_name – bucket name for Minio
skip_exists – skip files with the same MD5
- dflow.download_s3(key: str, path: PathLike = '.', recursive: bool = True, skip_exists: bool = False, keep_dir: bool = False, **kwargs) str ¶
- dflow.gen_code(graph)¶
- dflow.if_expression(_if: str | ArgoVar, _then: str | ArgoVar, _else: str | ArgoVar) IfExpression ¶
Return an if expression in Argo
- Parameters:
_if – a bool expression, which may be a comparison of two Argo parameters
_then – value returned if the condition is satisfied
_else – value returned if the condition is not satisfied
- dflow.path_list_of_artifact(art, **kwargs) List[str] ¶
- dflow.path_object_of_artifact(art, **kwargs) list | dict ¶
- dflow.query_archived_workflows(labels: Dict[str, str] | None = None, id: str | None = None) List[ArgoWorkflow] ¶
- dflow.query_workflows(labels: Dict[str, str] | None = None, fields: List[str] | None = None) List[ArgoWorkflow] ¶
- dflow.randstr(length: int = 5) str ¶
- dflow.set_config(**kwargs) None ¶
Set global configurations
- Parameters:
host – host of Argo server
namespace – k8s namespace
token – token for authentication, necessary for reused workflows
k8s_config_file – location of kube config file if it is used for
authentication
k8s_api_server – address of Kubernetes API server, necessary for reused
workflows
private_key_host_path – path of private key on the Kubernetes nodes
save_path_as_parameter – save catalog of artifacts as parameters
catalog_dir_name – catalog directory name for artifacts
archive_mode – “tar” for archiving with tar, None for no archive
util_image – image for util step
util_image_pull_policy – image pull policy for util step
extender_image – image for dflow extender
extender_image_pull_policy – image pull policy for dflow extender
dispatcher_image – image for dpdispatcher
dispatcher_image_pull_policy – image pull policy for dpdispatcher
save_keys_in_global_outputs – save keys of steps in global outputs
mode – “default” for normal, “debug” for debugging locally
lineage – lineage client, None by default
http_headers – HTTP headers for requesting Argo server
workflow_annotations – default annotations for workflows
overwrite_reused_artifact – overwrite reused artifact
- dflow.set_s3_config(**kwargs) None ¶
Set S3 configurations
- Parameters:
endpoint – endpoint for S3 storage
console – console address for S3 storage
access_key – access key for S3 storage
secret_key – secret key for S3 storage
secure – secure or not
bucket_name – name of S3 bucket
repo_key – key of artifact repository
repo – artifact repository, parsed from repo_key
repo_type – s3 or oss, parsed from repo_key
repo_prefix – prefix of artifact repository, parsed from repo_key
prefix – prefix of storage key
storage_client – client for plugin storage backend
extra_prefixes – extra prefixes ignored by auto-prefixing
- dflow.upload_artifact(path: PathLike | List[PathLike] | Set[PathLike] | Dict[str, PathLike] | list | dict, archive: str = 'default', namespace: str | None = None, dataset_name: str | None = None, **kwargs) S3Artifact ¶
Upload an artifact from local to Argo
- Parameters:
path – local path
archive – compress format of the artifact, None for no compression
endpoint – endpoint for Minio
access_key – access key for Minio
secret_key – secret key for Minio
secure – secure or not for Minio
bucket_name – bucket name for Minio
- dflow.upload_s3(path: ~os.PathLike, key: str | None = None, prefix: str | None = None, debug_func=<built-in function symlink>, **kwargs) str ¶