Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ready for Review] Fix issue where resuming on successful run will fail. #1956

Merged
merged 4 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ def resume(
)

if step_to_rerun is None:
clone_steps = set()
steps_to_rerun = set()
else:
# validate step name
if step_to_rerun not in obj.graph.nodes:
Expand All @@ -660,7 +660,7 @@ def resume(
step_to_rerun, ",".join(list(obj.graph.nodes.keys()))
)
)
clone_steps = {step_to_rerun}
steps_to_rerun = {step_to_rerun}

if run_id:
# Run-ids that are provided by the metadata service are always integers.
Expand Down Expand Up @@ -688,7 +688,7 @@ def resume(
clone_run_id=origin_run_id,
clone_only=clone_only,
reentrant=reentrant,
clone_steps=clone_steps,
steps_to_rerun=steps_to_rerun,
max_workers=max_workers,
max_num_splits=max_num_splits,
max_log_size=max_log_size * 1024 * 1024,
Expand Down
3 changes: 3 additions & 0 deletions metaflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ def __init__(self, flow):
self.name = flow.__name__
self.nodes = self._create_nodes(flow)
self.doc = deindent_docstring(flow.__doc__)
# nodes sorted in topological order.
self.sorted_nodes = []
self._traverse_graph()
self._postprocess()

Expand All @@ -197,6 +199,7 @@ def _postprocess(self):

def _traverse_graph(self):
def traverse(node, seen, split_parents):
self.sorted_nodes.append(node.name)
if node.type in ("split", "foreach"):
node.split_parents = split_parents
split_parents = split_parents + [node.name]
Expand Down
21 changes: 17 additions & 4 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(
clone_run_id=None,
clone_only=False,
reentrant=False,
clone_steps=None,
steps_to_rerun=None,
max_workers=MAX_WORKERS,
max_num_splits=MAX_NUM_SPLITS,
max_log_size=MAX_LOG_SIZE,
Expand Down Expand Up @@ -110,12 +110,21 @@ def __init__(

self._clone_run_id = clone_run_id
self._clone_only = clone_only
self._clone_steps = {} if clone_steps is None else clone_steps
self._cloned_tasks = []
self._cloned_task_index = set()
self._reentrant = reentrant
self._run_url = None

# If steps_to_rerun is specified, we will not clone them in resume mode.
self._steps_to_rerun = steps_to_rerun or {}
# sorted_nodes are in topological order already, so we only need to
# iterate through the nodes once to get a stable set of rerun steps.
for step_name in self._graph.sorted_nodes:
if step_name in self._steps_to_rerun:
out_funcs = self._graph[step_name].out_funcs or []
for next_step in out_funcs:
self._steps_to_rerun.add(next_step)

self._origin_ds_set = None
if clone_run_id:
# resume logic
Expand Down Expand Up @@ -166,7 +175,7 @@ def _new_task(self, step, input_paths=None, **kwargs):
else:
may_clone = all(self._is_cloned[path] for path in input_paths)

if step in self._clone_steps:
if step in self._steps_to_rerun:
may_clone = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think with this new way, we can clean out all the may_clone, etc flags taht are being set. I think it may be confusing because I don't think they are used anymore so that would simplify the code a bit and make it clearer that all the resume logic is now in one place (the clone functions) as opposed to scattered around the runtime code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but the code refactor should be in a different PR (I want this fix PR to be concise).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let's do a separate PR to clean that up then.


if step == "_parameters":
Expand Down Expand Up @@ -336,7 +345,11 @@ def clone_original_run(self, generate_task_obj=False, verbose=True):
_, step_name, task_id = task_ds.pathspec.split("/")
pathspec_index = task_ds.pathspec_index

if task_ds["_task_ok"] and step_name != "_parameters":
if (
task_ds["_task_ok"]
and step_name != "_parameters"
and (step_name not in self._steps_to_rerun)
):
# "_unbounded_foreach" is a special flag to indicate that the transition is an unbounded foreach.
# Both parent and splitted children tasks will have this flag set. The splitted control/mapper tasks
# have no "foreach_param" because UBF is always followed by a join step.
Expand Down
1 change: 1 addition & 0 deletions test/core/metaflow_test/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ def __init__(self, graphspec, test):
self.graphspec = graphspec
self.test = test
self.should_resume = getattr(test, "RESUME", False)
self.resume_step = getattr(test, "RESUME_STEP", None)
self.should_fail = getattr(test, "SHOULD_FAIL", False)
self.flow_name = "%sFlow" % self.test.__class__.__name__
self.used = set()
Expand Down
17 changes: 14 additions & 3 deletions test/core/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ def log(msg, formatter=None, context=None, real_bad=False, real_good=False):


def run_test(formatter, context, debug, checks, env_base, executor):
def run_cmd(mode):
def run_cmd(mode, args=None):
cmd = [context["python"], "-B", "test_flow.py"]
cmd.extend(context["top_options"])
cmd.extend((mode, "--run-id-file", "run-id"))
cmd.append(mode)
if args:
cmd.extend(args)
cmd.extend(("--run-id-file", "run-id"))
cmd.extend(context["run_options"])
return cmd

Expand Down Expand Up @@ -204,9 +207,17 @@ def construct_arg_dicts_from_click_api():
elif formatter.should_resume:
log("Resuming flow", formatter, context)
if executor == "cli":
flow_ret = subprocess.call(run_cmd("resume"), env=env)
flow_ret = subprocess.call(
run_cmd(
"resume",
[formatter.resume_step] if formatter.resume_step else [],
),
env=env,
)
elif executor == "api":
_, resume_level_dict = construct_arg_dicts_from_click_api()
if formatter.resume_step:
resume_level_dict["step_to_rerun"] = formatter.resume_step
result = runner.resume(**resume_level_dict)
flow_ret = result.command_obj.process.returncode
else:
Expand Down
46 changes: 46 additions & 0 deletions test/core/tests/resume_succeeded_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from metaflow_test import MetaflowTest, ExpectationFailed, steps


class ResumeSucceededStepTest(MetaflowTest):
"""
Resuming from the succeeded end step should work
"""

RESUME = True
# resuming on a successful step.
RESUME_STEP = "a"
PRIORITY = 3
PARAMETERS = {"int_param": {"default": 123}}

@steps(0, ["start"])
def step_start(self):
if is_resumed():
self.data = "start_r"
else:
self.data = "start"

@steps(0, ["singleton-end"], required=True)
def step_end(self):
if is_resumed():
self.data = "end_r"
else:
self.data = "end"
raise ResumeFromHere()

@steps(2, ["all"])
def step_all(self):
if is_resumed():
self.data = "test_r"
else:
self.data = "test"

def check_results(self, flow, checker):
for step in flow:
# task copied in resume will not have artifact with "_r" suffix.
if step.name == "start":
checker.assert_artifact(step.name, "data", "start")
# resumed step will rerun and hence data will have this "_r" suffix.
elif step.name == "a":
checker.assert_artifact(step.name, "data", "test_r")
elif step.name == "end":
checker.assert_artifact(step.name, "data", "end_r")
Loading