Skip to content

rebase #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Sep 16, 2020
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ docker run --network=host --rm ubercadence/cli:master --do sample domain registe
## Installation cadence-python

```
pip install cadence-client==1.0.0b2
pip install cadence-client==1.0.0b3
```

## Hello World Sample

```
```python
import sys
import logging
from cadence.activity_method import activity_method
Expand All @@ -63,7 +63,7 @@ class GreetingActivities:
# Activities Implementation
class GreetingActivitiesImpl:
def compose_greeting(self, greeting: str, name: str):
return greeting + " " + name + "!"
return f"{greeting} {name}!"


# Workflow Interface
Expand Down
15 changes: 12 additions & 3 deletions cadence/decision_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
HistoryEvent, EventType, WorkflowType, ScheduleActivityTaskDecisionAttributes, \
CancelWorkflowExecutionDecisionAttributes, StartTimerDecisionAttributes, TimerFiredEventAttributes, \
FailWorkflowExecutionDecisionAttributes, RecordMarkerDecisionAttributes, Header, WorkflowQuery, \
RespondQueryTaskCompletedRequest, QueryTaskCompletedType, QueryWorkflowResponse
RespondQueryTaskCompletedRequest, QueryTaskCompletedType, QueryWorkflowResponse, DecisionTaskFailedCause
from cadence.conversions import json_to_args, args_to_json
from cadence.decisions import DecisionId, DecisionTarget
from cadence.exception_handling import serialize_exception, deserialize_exception
Expand Down Expand Up @@ -537,6 +537,7 @@ class ReplayDecider:
decision_events: DecisionEvents = None
decisions: OrderedDict[DecisionId, DecisionStateMachine] = field(default_factory=OrderedDict)
decision_context: DecisionContext = None
workflow_id: str = None

activity_id_to_scheduled_event_id: Dict[str, int] = field(default_factory=dict)

Expand Down Expand Up @@ -674,6 +675,11 @@ def handle_activity_task_failed(self, event: HistoryEvent):
def handle_activity_task_timed_out(self, event: HistoryEvent):
self.decision_context.handle_activity_task_timed_out(event)

def handle_decision_task_failed(self, event: HistoryEvent):
attr = event.decision_task_failed_event_attributes
if attr and attr.cause == DecisionTaskFailedCause.RESET_WORKFLOW:
self.decision_context.set_current_run_id(attr.new_run_id)

def handle_workflow_execution_signaled(self, event: HistoryEvent):
signaled_event_attributes = event.workflow_execution_signaled_event_attributes
signal_input = signaled_event_attributes.input
Expand Down Expand Up @@ -816,6 +822,7 @@ def on_timer_canceled(self: ReplayDecider, event: HistoryEvent):
EventType.DecisionTaskScheduled: noop,
EventType.DecisionTaskStarted: noop, # Filtered by HistoryHelper
EventType.DecisionTaskTimedOut: noop, # TODO: check
EventType.DecisionTaskFailed: ReplayDecider.handle_decision_task_failed,
EventType.ActivityTaskScheduled: ReplayDecider.handle_activity_task_scheduled,
EventType.ActivityTaskStarted: ReplayDecider.handle_activity_task_started,
EventType.ActivityTaskCompleted: ReplayDecider.handle_activity_task_completed,
Expand Down Expand Up @@ -904,14 +911,16 @@ def poll(self) -> Optional[PollForDecisionTaskResponse]:

def process_task(self, decision_task: PollForDecisionTaskResponse) -> List[Decision]:
execution_id = str(decision_task.workflow_execution)
decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker)
decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker,
workflow_id=decision_task.workflow_execution.workflow_id)
decisions: List[Decision] = decider.decide(decision_task.history.events)
decider.destroy()
return decisions

def process_query(self, decision_task: PollForDecisionTaskResponse) -> bytes:
execution_id = str(decision_task.workflow_execution)
decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker)
decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker,
workflow_id=decision_task.workflow_execution.workflow_id)
decider.decide(decision_task.history.events)
try:
result = decider.query(decision_task, decision_task.query)
Expand Down
15 changes: 14 additions & 1 deletion cadence/tests/test_decision_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from cadence.cadence_types import HistoryEvent, EventType, PollForDecisionTaskResponse, \
ScheduleActivityTaskDecisionAttributes, WorkflowExecutionStartedEventAttributes, Decision, \
ActivityTaskStartedEventAttributes, MarkerRecordedEventAttributes
ActivityTaskStartedEventAttributes, MarkerRecordedEventAttributes, DecisionTaskFailedEventAttributes, \
DecisionTaskFailedCause
from cadence.clock_decision_context import VERSION_MARKER_NAME
from cadence.decision_loop import HistoryHelper, is_decision_event, DecisionTaskLoop, ReplayDecider, DecisionEvents, \
nano_to_milli
Expand Down Expand Up @@ -365,6 +366,18 @@ def test_handle_activity_task_started(self):
args, kwargs = state_machine.handle_started_event.call_args_list[0]
self.assertIn(event, args)

def test_handle_decision_task_failed(self):
event = HistoryEvent(event_id=15)
event.event_type = EventType.DecisionTaskFailed
event.decision_task_failed_event_attributes = DecisionTaskFailedEventAttributes()
event.decision_task_failed_event_attributes.cause = DecisionTaskFailedCause.RESET_WORKFLOW
event.decision_task_failed_event_attributes.new_run_id = "the-new-run-id"
self.decider.decision_context = decision_context = MagicMock()
self.decider.handle_decision_task_failed(event)
decision_context.set_current_run_id.assert_called()
args, kwargs = decision_context.set_current_run_id.call_args_list[0]
assert args[0] == "the-new-run-id"

def tearDown(self) -> None:
self.decider.destroy()

Expand Down
14 changes: 13 additions & 1 deletion cadence/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from typing import Callable, List, Type, Dict, Tuple
from uuid import uuid4

from six import reraise

from cadence.activity import ActivityCompletionClient
from cadence.activity_method import RetryParameters, ActivityOptions
Expand Down Expand Up @@ -90,6 +89,19 @@ def get_logger(name):
task: ITask = ITask.current()
return task.decider.decision_context.get_logger(name)

@staticmethod
def get_workflow_id():
from cadence.decision_loop import ITask
task: ITask = ITask.current()
return task.decider.workflow_id

@staticmethod
def get_execution_id():
from cadence.decision_loop import ITask
task: ITask = ITask.current()
return task.decider.execution_id



class WorkflowStub:
pass
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
more-itertools==7.0.0
ply==3.11
six==1.12.0
tblib==1.6.0
thriftrw==1.7.2
dataclasses-json==0.3.8
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="cadence-client",
version="1.0.0-beta2",
version="1.0.0-beta3",
author="Mohammed Firdaus",
author_email="firdaus.halim@gmail.com",
description="Python framework for Cadence Workflow Service",
Expand All @@ -17,12 +17,13 @@
"dataclasses-json>=0.3.8",
"more-itertools>=7.0.0",
"ply>=3.11",
"six>=1.12.0",
"tblib>=1.6.0",
"thriftrw>=1.7.2",
],
classifiers=[
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
Expand Down