-
Notifications
You must be signed in to change notification settings - Fork 46
/
Copy pathworkflow_async.py
67 lines (51 loc) · 2.41 KB
/
workflow_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""
This example describes how to use the workflow with async.
"""
import logging
import os
import time
from typing import Optional
from cozepy import COZE_CN_BASE_URL, Coze, DeviceOAuthApp, TokenAuth, WorkflowExecuteStatus, setup_logging
def get_coze_api_base() -> str:
# The default access is api.coze.cn, but if you need to access api.coze.com,
# please use base_url to configure the api endpoint to access
coze_api_base = os.getenv("COZE_API_BASE")
if coze_api_base:
return coze_api_base
return COZE_CN_BASE_URL # default
def get_coze_api_token(workspace_id: Optional[str] = None) -> str:
# Get an access_token through personal access token or oauth.
coze_api_token = os.getenv("COZE_API_TOKEN")
if coze_api_token:
return coze_api_token
coze_api_base = get_coze_api_base()
device_oauth_app = DeviceOAuthApp(client_id="57294420732781205987760324720643.app.coze", base_url=coze_api_base)
device_code = device_oauth_app.get_device_code(workspace_id)
print(f"Please Open: {device_code.verification_url} to get the access token")
return device_oauth_app.get_access_token(device_code=device_code.device_code, poll=True).access_token
# Init the Coze client through the access_token.
coze = Coze(auth=TokenAuth(token=get_coze_api_token()), base_url=get_coze_api_base())
# Whether to print detailed logs
is_debug = os.getenv("DEBUG")
if is_debug:
setup_logging(logging.DEBUG)
# Create a workflow instance in Coze, copy the last number from the web link as the workflow's ID.
workflow_id = os.getenv("COZE_WORKFLOW_ID") or "your workflow id"
# Call the coze.workflows.runs.create method to create a workflow run. The create method
# is a non-streaming chat and will return a WorkflowRunResult class.
workflow_run = coze.workflows.runs.create(workflow_id=workflow_id, is_async=True)
print("Start async workflow run:", workflow_run.execute_id)
while True:
run_history = coze.workflows.runs.run_histories.retrieve(
workflow_id=workflow_id, execute_id=workflow_run.execute_id
)
if run_history.execute_status == WorkflowExecuteStatus.FAIL:
print("Workflow run fail:", run_history.error_message)
break
elif run_history.execute_status == WorkflowExecuteStatus.RUNNING:
print("Workflow still running, sleep 1s and continue")
time.sleep(1)
continue
else:
print("Workflow run success:", run_history.output)
break