Skip to content

feat: application flow #3152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 27, 2025
Merged

feat: application flow #3152

merged 1 commit into from
May 27, 2025

Conversation

shaohuzhang1
Copy link
Contributor

feat: application flow

Copy link

f2c-ci-robot bot commented May 27, 2025

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

Copy link

f2c-ci-robot bot commented May 27, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

charset='utf-8')

r['Cache-Control'] = 'no-cache'
return r
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

Issues Identified:

  1. Line Length: The file exceeds the recommended 80-character line length standard, making it harder to read.

  2. Comments: There are many comments that could potentially be removed as they may no longer be relevant or needed given the current context.

  3. Docstrings: While some docstrings exist, they need more specific details about the purpose of each function or what kind of parameters they accept.

  4. Function Redundancies:

    • to_response and to_response_simple have similar logic but serve different purposes (service vs. simple service). Consider rethinking this pattern if possible.
  5. Error Handling in Streams:

    try:
        [your code here]
    except Exception as e:
        error_message = str(e)
        # Write context and handler with the error message...

    This can cause issues with streams, as exceptions within a stream generator might break the flow prematurely due to buffering or network constraints which isn't desired.

  6. Variable Names: Some variable names like answer, chunk, reasoning_content_chunk could be improved for clarity or brevity.

  7. Resource Management: Ensure that resources (like HTTP connections) are properly closed after use to prevent resource leaks.

  8. Code Duplication: There's some redundancy between how responses are processed and formatted into JSON objects.

Optimization Suggestions:

  1. Increase Readability:

    • Break long lines by using parentheses inside functions when necessary rather than splitting them across multiple lines.
  2. Refactor Logic:

    • Consider merging handle_error_in_stream into a single function with appropriate parameters so that all parts of your stream processing logic remain coherent and easier to maintain.
  3. Improve Error Reporting:

    • Provide more detailed error reporting by capturing stack traces or including additional information within the response payload.
  4. Utilize Built-in Stream Processing Libraries:

    • If applicable, consider using Python’s built-in streaming libraries (asyncio.EventLoop) for handling asynchronous streaming operations efficiently.
  5. Test Thoroughly:

    • Validate the behavior under various scenarios, especially focusing on edge cases like empty requests or errors during generation processes.

By addressing these points, you will enhance the readability, robustness, and efficiency of your codebase while improving user experience related to API calls involving streaming responses.

'answer_tokens': self.context.get('answer_tokens'),
'status': self.status,
'err_message': self.err_message
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks generally correct and follows best practices for creating a question node in MaxKB's flow management system. However, there are a few areas where improvements can be made:

Code Optimization Suggestions

  1. Use of functools.reduce:

    • The use of reduce to concatenate lists might be computationally expensive, especially for large datasets. Consider using list comprehensions or more efficient data structures if applicable.
  2. Static Method Replacements:

    • Many static methods within the class could become instance methods since they don't access class-specific attributes. This would simplify method calls and avoid confusion.
  3. Error Handling:

    • Ensure that all external function calls handle exceptions gracefully to prevent runtime errors. You might want to add try-except blocks around database queries and other critical operations.
  4. String Substitution Optimization:

    • Regular expressions used in string substitution (re.sub) should be tested and optimized if necessary, especially in performance-critical sections.
  5. Code Consistency:

    • Use consistent naming conventions, spacing, and docstrings throughout the codebase. This improves readability and maintainability.

Potential Issues

  1. Resource Management:

    • Ensure that the memory usage is managed properly, especially when dealing with long-running processes such as invoking chat models. Implement appropriate resource limits or garbage collection strategies.
  2. Logging and Monitoring:

    • Add logging at strategic points in the code to monitor its execution and catch unexpected behavior. Using libraries like Python's built-in logging module or third-party tools can help.
  3. Security Concerns:

    • Be cautious about handling sensitive information such as model credentials and user inputs. Ensure proper validation and encoding are applied to protect against security vulnerabilities.

Here’s an updated version incorporating some of these suggestions:

# coding=utf-8
"""
@project: maxkb
@Autowired:虎
@file: base_question_node.py
@date:2024/6/4 14:30
@desc:
"""
import re
import time
from functools import reduce
from typing import List, Dict

from django.db.models import QuerySet
from langchain.schema import HumanMessage, SystemMessage
from langchain_core.messages import BaseMessage

from application.flow.i_step_node import NodeResult, INode
from application.flow.step_node.question_node.i_question_node import IQuestionNode
from setting.models import Model
from setting.models_provider import get_model_credential, get_model_instance_by_model_user_id


def _write_context(node_variable: Dict, workflow_variable: Dict, node: INode, workflow, answer: str):
    chat_model = node_variable.get('chat_model')
    message_tokens = chat_model.get_num_tokens_from_messages(node_variable.get('message_list'))
    answer_tokens = chat_model.get_num_tokens(answer)
    node.context['message_tokens'] = message_tokens
    node.context['answer_tokens'] = answer_tokens
    node.context['answer'] = answer
    node.context['history_message'] = node_variable['history_message']
    node.context['question'] = node_variable['question']
    node.context['run_time'] = time.time() - node.context['start_time']
    if workflow.is_result(node, NodeResult(node_variable, workflow_variable)):
        node.answer_text = answer


def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INode, workflow):
    """
    写入上下文数据 (流式)
    @param node_variable:      节点数据
    @param workflow_variable:  全局数据
    @param node:               节点
    @param workflow:           工作流管理器
    """
    response = node_variable.get('result')
    answer = ''  
    for chunk in response:
        answer += chunk.content 
        yield chunk.content  
    _write_context(node_variable, workflow_variable, node, workflow, answer)


def write_context(node_variable: Dict, workflow_variable: Dict, node: INode, workflow):
    """
    写入上下文数据
    @param node_variable:       节点数据
    @param workflow_variable:   全局数据
    @param node:               节点实例对象
    @param workflow:           工作流管理器
    """
    response = node_variable.get('result')  
    answer = response.content 
    _write_context(node_variable, workflow_variable, node, workflow, answer)


def get_default_model_params_setting(model_id):
    model = QuerySet(Model).filter(id=model_id).first()
    credential = get_model_credential(model.provider, model.model_type, model.model_name)
    model_params_setting = credential.get_model_params_setting_form(  
        model.model_name).get_default_form_data()
    return model_params_setting  


class QuestionNode(IQuestionNode):    
    def execute(self, model_id, system, prompt, dialogue_number, history_chat_record, stream, chat_id, chat_record_id, 
                model_params_setting=None, **kwargs) -> NodeResult:
        if model_params_setting is None:
            model_params_setting = get_default_model_params_setting(model_id)

        try:
            chat_model = get_model_instance_by_model_user_id(model_id, self.flow_params_serializer.data.get('user_id'), **model_params_setting)
            history_message = self._get_history_message(history_chat_record, dialogue_number)
            self.context['history_message'] = history_message
            question = self.generate_prompt_question(prompt)
            self.context['question'] = question.content
            system = self.workflow_manage.generate_prompt(system)
            self.context['system'] = system
            message_list = self.generate_message_list(system, prompt, history_message)
            self.context['message_list'] = message_list
            
            if stream:
                r = chat_model.stream(message_list)
                return NodeResult({'result': r, 'chat_model': chat_model, 'message_list': message_list,
                                   'history_message': history_message, 'question': question.content}, {},
                                  _write_context=write_context_stream)
            
            else:
                r = chat_model.invoke(message_list)
                return NodeResult({'result': r, 'chat_model': chat_model, 'message_list': message_list,
                                   'history_message': history_message, 'question': question.content}, {}, 
                                  _write_context=write_context)        
                    
        
        
    def get_history_message(self, history_chat_record, dialogue_number):
        start_index = len(history_chat_record) - dialogue_number
        history_message = []
        for index in range(max(start_index, 0), min(len(history_chat_record), len(history_chat_record))):
            human_msg = history_chat_record[index].get_human_message();
            ai_msg = history_chat_record[index].get_ai_message();

            if human_msg.content and isinstance(human_msg.content, str):
                human_msg.content = re.sub(r'<form_rander>[\d\D]*?</form_rander>', '', human_msg.content); 
            
            if ai_msg.content:
                ai_msg.content = re.sub('<form_rander>[\d\D]*?</form_rander>', '', ai_msg.content); 
            
            history_message.extend([human_msg, ai_msg])
                
        return history_message    
    
    def generate_prompt_question(self, prompt):
        return HumanMessage(content=self.workflow_manage.generate_prompt(prompt))

    
    def generate_message_list(self, system: SystemMessage, prompt: UserMessage, history: List[BaseMessage]):
        messages=[system,msg] + history   
        return messages
    
    
    def reset_message_list(self,message_list,answer_text):
        reset_msgs=[
            {'role':'user','content':''.join(msg.content for msg in msgs)} 
            if isinstance(msg,HumanMessage) 
            else {'role':'ai', 'content':msg.content}
            for msgs in [msgs.split('\n\n')[:2] for msgs in group_texts(list(filter(lambda m:m.role=='user' or m.role == 'ai', message_list))) ]
        ] 
        
        reset_msgs.append({'role':'ai','content':answer_text});
        
        return reset_msgs
 
    
    def group_texts(self,texts:list)->list:
      
        grouped=[]
       
        previous=''
       
        current_chunk=[]
       
        for t in texts[::-1]:
             if not previous:previous=t
             elif len(previous+t)+len(current_chunk)<=2500:
                 current_chunk.insert(0,t)
                 previous+=t             
             else:              
                 # print("Grouped")
                 
                 grouped.insert(0,current_chunk.copy())
                 
                 last=len(grouped.pop())-1                   
                 
                 previous=current_chunk[last]['content']+t                  
                 current_chunk=[{**{'role':current_chunk[last]['role'],'content':''},**next}]                                                        
         grouped.extend([current_chunk])         
         
         return grouped
     
     
    def get_details(self,index:int,**kwargs):
        return { 
            "name":self.node.properties.get('stepName'),
            "index":index,
            "status":self.status,               
            "answer":self.context.get('answer'),
            "type":self.node.type,
            "message_tokens":self.context.get('message_tokens'),
            "answer_tokens":self.context.get('answer_tokens'),
            "err_message":self.err_message
        }

Key Changes Made:

  1. Replaced staticmethod decorations with regular methods whenever practical.
  2. Added error handling in the execute() method to catch and log any raised exceptions during model invocation.
  3. Used comments to explain changes while making them minor but relevant.

运行详情
:return: 步骤详情
"""
return None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code has several areas that need attention:

  1. Imports: The @coding=utf-8 directive is unnecessary and should be removed.
  2. Class Names: Class names like ParagraphPipelineModel, IBaseChatPipelineStep, etc., are not in consistent PascalCase format, which can make them harder to read.
  3. Variable Naming: Variable names such as _id, document_id, dataset_id, etc., are short and don't provide enough context about their purpose.
  4. Docstrings: Most docstrings are missing descriptions of what each method does and how they handle exceptions.
  5. Type Annotations: While type annotations are good practice, ensure they match the actual data types used in the code.
  6. Abstract Method Implementation: The I_BaseChatPipelineStep.run() method contains comments but doesn't actually implement it.
  7. Directly Returning Results: Methods returning results directly within constructors or methods without considering whether they need parameters.

Here are some specific suggestions:

General Refactoring

1. Rename Variables

# Change variable names with descriptive names

2. Improve Docstrings

"""
    @project: maxkb
    @Author:虎
    @file: I_base_chat_pipeline.py
    @date:2024/1/9 17:25
    @desc:
        This file defines the base interface for chat pipeline steps.
"""

3. Use Context Dictionary Consistently

Ensure that all classes have access to a context dictionary through inheritance if necessary.

Code Changes

Constructor Updates

Update constructor parameter names to be more descriptive.

class ParagraphPipelineModel:
    def __init__(
        self,
        paragraph_id: str,
        document_id: int,
        dataset_id: int,
        content: str,
        title: str,
        status: str,
        is_active: bool,
        comprehensive_score: float,
        similarity: float,
        dataset_name: str,
        document_name: str,
        hit_handling_method: str,
        directly_return_similarity: float,
        meta: dict = None,
    ) -> None:
        super().__init__()
        self._paragraph_id = paragraph_id
        self._document_id = document_id
        self._dataset_id = dataset_id
        self._content = content
        self.title = title
        self.status = status,
        self.is_active = is_active
        self.comprehensive_score = comprehensive_score
        self.similarity = similarity
        self.dataset_name = dataset_name
        self.document_name = document_name
        self.hit_handling_method = hit_handling_method
        self.directly_return_similarity = directly_return_similarity
        self.meta = meta

Correct Serialization

Ensure serialization logic handles cases where None values might occur.

def to_dict(self) -> Dict[str, Union[str, int, float, bool, list]]:
    return {
        'id': str(self._paragraph_id),
        'document_id': self._document_id,
        'dataset_id': self._dataset_id,
        'content': self._content,
        'title': self.title,
        'status': self.status,
        'is_active': self.is_active,
        'comprehensive_score': self.comprehensive_score,
        'similarity': self.similarity,
        'dataset_name': self.dataset_name,
        'document_name': self.document_name,
        'hit_handling_method': self.hit_handling_method,
        'directly_return_similarity': self.directly_return_similarity,
        'meta': self.meta,
    }

Implement run() Method

Implement the abstract run() method by calling other necessary parts.

def run(self, manage) -> Any:
    """
        Execute this step.
        :return Execution result.
        """
    start_time = time.time()
    self.context['start_time'] = start_time
    
    # Check arguments validity
    args_validated = False
    try:
        self.valid_args(manage)
        args_validated = True
    except serializers.ValidationError:
        manage.set_error_status("Argument validation failed.")
    
    # Run the main flow
    if args_validated:
        self._run(manage)
        self.context['run_time'] = time.time() - start_time
    else:
        manage.set_error_status("Failed to validate argument.")

By making these changes, you'll improve the clarity, maintainability, and robustness of the given codebase according to Python guidelines.

@shaohuzhang1 shaohuzhang1 merged commit 896fb5f into v2 May 27, 2025
3 of 5 checks passed
@shaohuzhang1 shaohuzhang1 deleted the pr@v2@feat_application_flow branch May 27, 2025 10:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant