Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Argo workflows joins failing with wide foreach flows #1655

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,9 +1034,16 @@ def _container_templates(self):
# Ideally, we would like these task ids to be the same as node name
# (modulo retry suffix) on Argo Workflows but that doesn't seem feasible
# right now.

task_str = node.name + "-{{workflow.creationTimestamp}}"
# The input paths might be quite large for foreach joins,
# so we need to take care not to unnecessarily repeat these in the scripts due to a template max size limit with Argo.
# Note: We inline a Python script here because jq might not be present, and bundling this as an utility will not work
# as the metaflow module initializes only after this line.
# read input-paths directly from ARGO_TEMPLATE environment variable.
input_paths_expr = """export INPUT_PATHS=$(python -c \\"import sys, os, json; template = json.loads(os.environ['ARGO_TEMPLATE']); input_paths = next((param['value'] for param in template.get('inputs', {'parameters': []})['parameters'] if param['name'] == 'input-paths'), None); sys.stdout.write(input_paths if input_paths else '')\\")"""
if node.name != "start":
task_str += "-{{inputs.parameters.input-paths}}"
task_str += "-$(echo $INPUT_PATHS)"
if any(self.graph[n].type == "foreach" for n in node.in_funcs):
task_str += "-{{inputs.parameters.split-index}}"
# Generated task_ids need to be non-numeric - see register_task_id in
Expand Down Expand Up @@ -1087,6 +1094,7 @@ def _container_templates(self):
# env var.
'${METAFLOW_INIT_SCRIPT:+eval \\"${METAFLOW_INIT_SCRIPT}\\"}',
"mkdir -p $PWD/.logs",
input_paths_expr,
task_id_expr,
mflog_expr,
]
Expand All @@ -1098,7 +1106,7 @@ def _container_templates(self):
node.name, self.flow_datastore.TYPE
)

input_paths = "{{inputs.parameters.input-paths}}"
input_paths = "$(echo $INPUT_PATHS)"

top_opts_dict = {
"with": [
Expand Down
2 changes: 1 addition & 1 deletion metaflow/plugins/argo/process_input_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def process_input_paths(input_paths):
flow, run_id, task_ids = input_paths.split("/")
task_ids = re.sub("[\[\]{}]", "", task_ids)
task_ids = task_ids.split(",")
tasks = [t.split(":")[1] for t in task_ids]
tasks = [t.split(":")[1].strip('"') for t in task_ids]
return "{}/{}/:{}".format(flow, run_id, ",".join(tasks))


Expand Down