From a499be2115685e8c233c4a493d8ef456bad39e7b Mon Sep 17 00:00:00 2001 From: chait Date: Sat, 8 Mar 2025 02:31:34 +0800 Subject: [PATCH] [update] modify the RefreshAgents interface to use batch operations; --- .../Repositories/IBotSharpRepository.cs | 10 + .../Services/AgentService.RefreshAgents.cs | 73 +++++-- .../FileRepository/FileRepository.Agent.cs | 30 +++ .../FileRepository.AgentTask.cs | 7 + .../Repository/MongoRepository.Agent.cs | 194 +++++++++++++++--- .../Repository/MongoRepository.AgentTask.cs | 16 +- 6 files changed, 275 insertions(+), 55 deletions(-) diff --git a/src/Infrastructure/BotSharp.Abstraction/Repositories/IBotSharpRepository.cs b/src/Infrastructure/BotSharp.Abstraction/Repositories/IBotSharpRepository.cs index d523dd09c..7a2b55e74 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Repositories/IBotSharpRepository.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Repositories/IBotSharpRepository.cs @@ -74,10 +74,18 @@ List GetUserAgents(string userId) => throw new NotImplementedException(); void BulkInsertAgents(List agents) => throw new NotImplementedException(); + ValueTask BulkInsertAgentsAsync(List agents) + => throw new NotImplementedException(); void BulkInsertUserAgents(List userAgents) => throw new NotImplementedException(); + ValueTask BulkInsertUserAgentsAsync(List userAgents) + => throw new NotImplementedException(); bool DeleteAgents() => throw new NotImplementedException(); + Task DeleteAgentsAsync() + => throw new NotImplementedException(); + ValueTask DeleteAgentsAsync(List agentIds) + => throw new NotImplementedException(); bool DeleteAgent(string agentId) => throw new NotImplementedException(); List GetAgentResponses(string agentId, string prefix, string intent) @@ -102,6 +110,8 @@ void InsertAgentTask(AgentTask task) => throw new NotImplementedException(); void BulkInsertAgentTasks(List tasks) => throw new NotImplementedException(); + ValueTask BulkInsertAgentTasksAsync(List tasks) + => throw new NotImplementedException(); void UpdateAgentTask(AgentTask task, AgentTaskField field) => throw new NotImplementedException(); bool DeleteAgentTask(string agentId, List taskIds) diff --git a/src/Infrastructure/BotSharp.Core/Agents/Services/AgentService.RefreshAgents.cs b/src/Infrastructure/BotSharp.Core/Agents/Services/AgentService.RefreshAgents.cs index eb0497aeb..a27c70cfd 100644 --- a/src/Infrastructure/BotSharp.Core/Agents/Services/AgentService.RefreshAgents.cs +++ b/src/Infrastructure/BotSharp.Core/Agents/Services/AgentService.RefreshAgents.cs @@ -1,4 +1,5 @@ using BotSharp.Abstraction.Repositories.Enums; +using BotSharp.Abstraction.Tasks.Models; using System.IO; namespace BotSharp.Core.Agents.Services; @@ -24,27 +25,24 @@ public async Task RefreshAgents() return "Unauthorized user."; } - var agentDir = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, - dbSettings.FileRepository, - _agentSettings.DataDir); - + var agentDir = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, dbSettings.FileRepository, _agentSettings.DataDir); if (!Directory.Exists(agentDir)) { refreshResult = $"Cannot find the directory: {agentDir}"; return refreshResult; } - - var refreshedAgents = new List(); + + List agents = []; + List agentTasks = []; + foreach (var dir in Directory.GetDirectories(agentDir)) { try { - var agentJson = File.ReadAllText(Path.Combine(dir, "agent.json")); - var agent = JsonSerializer.Deserialize(agentJson, _options); - + var (agent, msg) = GetAgentFormJson(Path.Combine(dir, "agent.json")); if (agent == null) { - _logger.LogError($"Cannot find agent in file directory: {dir}"); + _logger.LogError(msg); continue; } @@ -61,16 +59,8 @@ public async Task RefreshAgents() .SetSamples(samples); var tasks = GetTasksFromFile(dir); - - var isAgentDeleted = _db.DeleteAgent(agent.Id); - if (isAgentDeleted) - { - await Task.Delay(100); - _db.BulkInsertAgents(new List { agent }); - _db.BulkInsertAgentTasks(tasks); - refreshedAgents.Add(agent.Name); - _logger.LogInformation($"Agent {agent.Name} has been migrated."); - } + if (!tasks.IsNullOrEmpty()) agentTasks.AddRange(tasks); + agents.Add(agent); } catch (Exception ex) { @@ -78,10 +68,17 @@ public async Task RefreshAgents() } } - if (!refreshedAgents.IsNullOrEmpty()) + if (agents.Count > 0) { + var agentIds = agents.Select(a => a.Id).ToList(); + await _db.DeleteAgentsAsync(agentIds); + await Task.Delay(200); + await _db.BulkInsertAgentsAsync(agents); + await Task.Delay(200); + await _db.BulkInsertAgentTasksAsync(agentTasks); + Utilities.ClearCache(); - refreshResult = $"Agents are migrated!\r\n{string.Join("\r\n", refreshedAgents)}"; + refreshResult = $"Agents are migrated!\r\n{string.Join("\r\n", agents.Select(a => a.Name))}"; } else { @@ -91,4 +88,36 @@ public async Task RefreshAgents() _logger.LogInformation(refreshResult); return refreshResult; } + + private (Agent? agent, string msg) GetAgentFormJson(string agentPath) + { + var agentJson = File.ReadAllText(agentPath); + if (string.IsNullOrWhiteSpace(agentJson)) + return (null, $"Cannot find agent in file path: {agentPath}"); + + var isJson = IsValidedJson(agentJson); + if (isJson) + { + var agent = JsonSerializer.Deserialize(agentJson, _options); + return (agent, "ok"); + } + else + { + return (null, "The agent.json file data is not in JSON format!"); + } + } + + private bool IsValidedJson(string jsonString) + { + try + { + JsonDocument.Parse(jsonString); + return true; + } + catch (JsonException ex) + { + return false; + } + } + } diff --git a/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.Agent.cs b/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.Agent.cs index d7a6d1218..e6da35e3d 100644 --- a/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.Agent.cs +++ b/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.Agent.cs @@ -582,6 +582,12 @@ public void BulkInsertAgents(List agents) ResetInnerAgents(); } + public async ValueTask BulkInsertAgentsAsync(List agents) + { + await Task.Delay(100); + BulkInsertAgents(agents); + } + public void BulkInsertUserAgents(List userAgents) { if (userAgents.IsNullOrEmpty()) return; @@ -615,11 +621,35 @@ public void BulkInsertUserAgents(List userAgents) ResetInnerAgents(); } + public async ValueTask BulkInsertUserAgentsAsync(List userAgents) + { + await Task.Delay(200); + BulkInsertUserAgents(userAgents); + } + public bool DeleteAgents() { return false; } + public async Task DeleteAgentsAsync() + { + await Task.Delay(100); + return false; + } + + public async ValueTask DeleteAgentsAsync(List agentIds) + { + bool isDelete = false; + foreach (var agentId in agentIds) + { + isDelete = DeleteAgent(agentId); + await Task.Delay(200); + } + + return isDelete; + } + public bool DeleteAgent(string agentId) { if (string.IsNullOrEmpty(agentId)) return false; diff --git a/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.AgentTask.cs b/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.AgentTask.cs index 6d47c4cc4..ef02111c1 100644 --- a/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.AgentTask.cs +++ b/src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.AgentTask.cs @@ -142,6 +142,13 @@ public void BulkInsertAgentTasks(List tasks) } + public async ValueTask BulkInsertAgentTasksAsync(List tasks) + { + if (tasks.IsNullOrEmpty()) return; + + await Task.Delay(200); + } + public void UpdateAgentTask(AgentTask task, AgentTaskField field) { if (task == null || string.IsNullOrEmpty(task.Id)) return; diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Agent.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Agent.cs index 90dd9cdfd..b786476ca 100644 --- a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Agent.cs +++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Agent.cs @@ -2,6 +2,7 @@ using BotSharp.Abstraction.Functions.Models; using BotSharp.Abstraction.Repositories.Filters; using BotSharp.Abstraction.Routing.Models; +using Microsoft.Extensions.Logging; namespace BotSharp.Plugin.MongoStorage.Repository; @@ -487,39 +488,39 @@ public void BulkInsertAgents(List agents) { if (agents.IsNullOrEmpty()) return; - var agentDocs = agents.Select(x => new AgentDocument + var agentDocs = agents.Select(x => TransformAgent(x)); + InsertMany("Agents", _dc.Agents, agentDocs); + } + + public async ValueTask BulkInsertAgentsAsync(List agents) + { + if (agents.IsNullOrEmpty()) return; + + var agentDocs = agents.Select(x => TransformAgent(x)); + await InsertManyAsync("Agents", _dc.Agents, agentDocs); + } + + public void BulkInsertUserAgents(List userAgents) + { + if (userAgents.IsNullOrEmpty()) return; + + var filtered = userAgents.Where(x => !string.IsNullOrEmpty(x.UserId) && !string.IsNullOrEmpty(x.AgentId)).ToList(); + if (filtered.IsNullOrEmpty()) return; + + var userAgentDocs = filtered.Select(x => new UserAgentDocument { Id = !string.IsNullOrEmpty(x.Id) ? x.Id : Guid.NewGuid().ToString(), - Name = x.Name, - IconUrl = x.IconUrl, - Description = x.Description, - Instruction = x.Instruction, - Samples = x.Samples ?? [], - IsPublic = x.IsPublic, - Type = x.Type, - InheritAgentId = x.InheritAgentId, - Disabled = x.Disabled, - MergeUtility = x.MergeUtility, - MaxMessageCount = x.MaxMessageCount, - Profiles = x.Profiles ?? [], - Labels = x.Labels ?? [], - LlmConfig = AgentLlmConfigMongoElement.ToMongoElement(x.LlmConfig), - ChannelInstructions = x.ChannelInstructions?.Select(i => ChannelInstructionMongoElement.ToMongoElement(i))?.ToList() ?? [], - Templates = x.Templates?.Select(t => AgentTemplateMongoElement.ToMongoElement(t))?.ToList() ?? [], - Functions = x.Functions?.Select(f => FunctionDefMongoElement.ToMongoElement(f))?.ToList() ?? [], - Responses = x.Responses?.Select(r => AgentResponseMongoElement.ToMongoElement(r))?.ToList() ?? [], - RoutingRules = x.RoutingRules?.Select(r => RoutingRuleMongoElement.ToMongoElement(r))?.ToList() ?? [], - Utilities = x.Utilities?.Select(u => AgentUtilityMongoElement.ToMongoElement(u))?.ToList() ?? [], - KnowledgeBases = x.KnowledgeBases?.Select(k => AgentKnowledgeBaseMongoElement.ToMongoElement(k))?.ToList() ?? [], - Rules = x.Rules?.Select(e => AgentRuleMongoElement.ToMongoElement(e))?.ToList() ?? [], - CreatedTime = x.CreatedDateTime, - UpdatedTime = x.UpdatedDateTime - }).ToList(); + UserId = x.UserId, + AgentId = x.AgentId, + Actions = x.Actions, + CreatedTime = x.CreatedTime, + UpdatedTime = x.UpdatedTime + }); - _dc.Agents.InsertMany(agentDocs); + InsertMany("UserAgents", _dc.UserAgents, userAgentDocs); } - public void BulkInsertUserAgents(List userAgents) + public async ValueTask BulkInsertUserAgentsAsync(List userAgents) { if (userAgents.IsNullOrEmpty()) return; @@ -534,18 +535,59 @@ public void BulkInsertUserAgents(List userAgents) Actions = x.Actions, CreatedTime = x.CreatedTime, UpdatedTime = x.UpdatedTime - }).ToList(); + }); - _dc.UserAgents.InsertMany(userAgentDocs); + await InsertManyAsync("UserAgents", _dc.UserAgents, userAgentDocs); } public bool DeleteAgents() { try { + _dc.Agents.DeleteMany(Builders.Filter.Empty); _dc.UserAgents.DeleteMany(Builders.Filter.Empty); _dc.RoleAgents.DeleteMany(Builders.Filter.Empty); - _dc.Agents.DeleteMany(Builders.Filter.Empty); + _dc.AgentTasks.DeleteMany(Builders.Filter.Empty); + return true; + } + catch + { + return false; + } + } + + public async Task DeleteAgentsAsync() + { + try + { + await _dc.Agents.DeleteManyAsync(Builders.Filter.Empty); + await _dc.UserAgents.DeleteManyAsync(Builders.Filter.Empty); + await _dc.RoleAgents.DeleteManyAsync(Builders.Filter.Empty); + await _dc.AgentTasks.DeleteManyAsync(Builders.Filter.Empty); + return true; + } + catch + { + return false; + } + } + + public async ValueTask DeleteAgentsAsync(List agentIds) + { + if (agentIds.IsNullOrEmpty()) return false; + + try + { + var agentFilter = Builders.Filter.In(x => x.Id, agentIds); + var userAgentFilter = Builders.Filter.In(x => x.AgentId, agentIds); + var roleAgentFilter = Builders.Filter.In(x => x.AgentId, agentIds); + var agentTaskFilter = Builders.Filter.In(x => x.AgentId, agentIds); + + await _dc.Agents.DeleteManyAsync(agentFilter); + await _dc.UserAgents.DeleteManyAsync(userAgentFilter); + await _dc.RoleAgents.DeleteManyAsync(roleAgentFilter); + await _dc.AgentTasks.DeleteManyAsync(agentTaskFilter); + return true; } catch @@ -597,7 +639,7 @@ private Agent TransformAgentDocument(AgentDocument? agentDoc) Profiles = agentDoc.Profiles ?? [], Labels = agentDoc.Labels ?? [], MaxMessageCount = agentDoc.MaxMessageCount, - LlmConfig = AgentLlmConfigMongoElement.ToDomainElement(agentDoc.LlmConfig), + LlmConfig = AgentLlmConfigMongoElement.ToDomainElement(agentDoc.LlmConfig) ?? new(), ChannelInstructions = agentDoc.ChannelInstructions?.Select(i => ChannelInstructionMongoElement.ToDomainElement(i))?.ToList() ?? [], Templates = agentDoc.Templates?.Select(t => AgentTemplateMongoElement.ToDomainElement(t))?.ToList() ?? [], Functions = agentDoc.Functions?.Select(f => FunctionDefMongoElement.ToDomainElement(f)).ToList() ?? [], @@ -608,4 +650,92 @@ private Agent TransformAgentDocument(AgentDocument? agentDoc) Rules = agentDoc.Rules?.Select(e => AgentRuleMongoElement.ToDomainElement(e))?.ToList() ?? [] }; } + + private AgentDocument TransformAgent(Agent? agent) + { + if (agent == null) return new AgentDocument(); + + return new AgentDocument + { + Id = !string.IsNullOrEmpty(agent.Id) ? agent.Id : Guid.NewGuid().ToString(), + Name = agent.Name, + IconUrl = agent.IconUrl, + Description = agent.Description, + Instruction = agent.Instruction ?? string.Empty, + Samples = agent.Samples ?? [], + IsPublic = agent.IsPublic, + Type = agent.Type, + InheritAgentId = agent.InheritAgentId, + Disabled = agent.Disabled, + MergeUtility = agent.MergeUtility, + MaxMessageCount = agent.MaxMessageCount, + Profiles = agent.Profiles ?? [], + Labels = agent.Labels ?? [], + LlmConfig = AgentLlmConfigMongoElement.ToMongoElement(agent.LlmConfig), + ChannelInstructions = agent.ChannelInstructions?.Select(i => ChannelInstructionMongoElement.ToMongoElement(i))?.ToList() ?? [], + Templates = agent.Templates?.Select(t => AgentTemplateMongoElement.ToMongoElement(t))?.ToList() ?? [], + Functions = agent.Functions?.Select(f => FunctionDefMongoElement.ToMongoElement(f))?.ToList() ?? [], + Responses = agent.Responses?.Select(r => AgentResponseMongoElement.ToMongoElement(r))?.ToList() ?? [], + RoutingRules = agent.RoutingRules?.Select(r => RoutingRuleMongoElement.ToMongoElement(r))?.ToList() ?? [], + Utilities = agent.Utilities?.Select(u => AgentUtilityMongoElement.ToMongoElement(u))?.ToList() ?? [], + KnowledgeBases = agent.KnowledgeBases?.Select(k => AgentKnowledgeBaseMongoElement.ToMongoElement(k))?.ToList() ?? [], + Rules = agent.Rules?.Select(e => AgentRuleMongoElement.ToMongoElement(e))?.ToList() ?? [], + CreatedTime = agent.CreatedDateTime, + UpdatedTime = agent.UpdatedDateTime + }; + } + + private void InsertMany(string collectionName, IMongoCollection collection, IEnumerable documents) + { + // Configure InsertManyOptions to skip exceptions and continue inserting + var options = new InsertManyOptions { IsOrdered = false }; + + try + { + // Perform batch insertion operation + collection.InsertMany(documents, options); + _logger.LogInformation($"{collectionName} document inserted successfully."); + } + catch (MongoBulkWriteException ex) + { + StringBuilder strBuilder = new(); + strBuilder.AppendLine($"{collectionName} document could not be inserted:"); + foreach (var writeError in ex.WriteErrors) + { + strBuilder.AppendLine($"Index: {writeError.Index}, Code: {writeError.Code}, Message: {writeError.Message}"); + } + _logger.LogError(strBuilder.ToString()); + } + catch (Exception ex) + { + _logger.LogError($"An error occurred: {ex.Message}"); + } + } + + private async Task InsertManyAsync(string collectionName, IMongoCollection collection, IEnumerable documents) + { + // Configure InsertManyOptions to skip exceptions and continue inserting + var options = new InsertManyOptions { IsOrdered = false }; + + try + { + // Perform batch insertion operation + await collection.InsertManyAsync(documents, options); + _logger.LogInformation($"{collectionName} document inserted successfully."); + } + catch (MongoBulkWriteException ex) + { + StringBuilder strBuilder = new(); + strBuilder.AppendLine($"{collectionName} document could not be inserted:"); + foreach (var writeError in ex.WriteErrors) + { + strBuilder.AppendLine($"Index: {writeError.Index}, Code: {writeError.Code}, Message: {writeError.Message}"); + } + _logger.LogError(strBuilder.ToString()); + } + catch (Exception ex) + { + _logger.LogError($"An error occurred: {ex.Message}"); + } + } } diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.AgentTask.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.AgentTask.cs index db3cdfc6c..3097d4252 100644 --- a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.AgentTask.cs +++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.AgentTask.cs @@ -87,7 +87,21 @@ public void BulkInsertAgentTasks(List tasks) return task; }).ToList(); - _dc.AgentTasks.InsertMany(taskDocs); + InsertMany("AgentTasks", _dc.AgentTasks, taskDocs); + } + + public async ValueTask BulkInsertAgentTasksAsync(List tasks) + { + if (tasks.IsNullOrEmpty()) return; + + var taskDocs = tasks.Select(x => + { + var task = AgentTaskDocument.ToMongoModel(x); + task.Id = !string.IsNullOrEmpty(x.Id) ? x.Id : Guid.NewGuid().ToString(); + return task; + }); + + await InsertManyAsync("AgentTasks", _dc.AgentTasks, taskDocs); } public void UpdateAgentTask(AgentTask task, AgentTaskField field)