jcloud/jcloud/api/jlocal.py
jingrow e5c818d1a5 feat: 为 Local Tool 添加完整的 API 端点
- 新增 get_local_tool_list: 获取公开工具列表
- 新增 get_my_local_tool_list: 获取当前团队工具列表
- 新增 get_local_tool: 获取工具详情
- 新增 create_local_tool: 创建工具
- 新增 update_local_tool: 更新工具
- 新增 delete_local_tool: 删除工具

参考 Local Node 和 Local App 的实现模式,包含团队权限验证和批量查询优化
2025-11-21 11:37:41 +08:00

1038 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# Copyright (c) 2024, JINGROW
# For license information, please see license.txt
import jingrow
from jcloud.api.client import dashboard_whitelist
from jcloud.utils import get_current_team
def _get_team_user_names(apps):
"""批量获取团队关联用户名称避免N+1查询问题"""
if not apps:
return {}
# 收集所有团队ID
team_ids = [app.team for app in apps if app.team]
if not team_ids:
return {}
# 批量获取团队信息
teams = jingrow.get_all(
"Team",
filters={"name": ["in", team_ids]},
fields=["name", "user"]
)
# 收集所有用户ID
user_ids = [team.user for team in teams if team.user]
if not user_ids:
return {}
# 批量获取用户信息
users = jingrow.get_all(
"User",
filters={"name": ["in", user_ids]},
fields=["name"]
)
# 构建映射关系
team_to_user = {team.name: team.user for team in teams if team.user}
user_names = {user.name: user.name for user in users}
# 返回团队到用户名称的映射
result = {}
for team_id, user_id in team_to_user.items():
if user_id in user_names:
result[team_id] = user_names[user_id]
return result
@jingrow.whitelist(allow_guest=True)
def get_local_app_list(filters=None, order_by=None, limit_start=None, limit_page_length=None):
"""获取本地应用列表"""
# 构建查询条件
query_filters = {"public": 1}
if filters:
if isinstance(filters, str):
import json
filters = json.loads(filters)
query_filters.update(filters)
# 默认排序
if not order_by:
order_by = "app_name asc"
# 获取应用列表
apps = jingrow.get_all(
"Local App",
filters=query_filters,
fields=[
"name", "app_name", "title", "subtitle", "category",
"enabled", "public", "team", "status", "repository_url",
"file_url", "app_image", "creation", "modified"
],
order_by=order_by,
limit_start=limit_start,
limit_page_length=limit_page_length
)
# 批量获取团队用户名称
team_user_names = _get_team_user_names(apps)
# 格式化返回数据
result = []
for app in apps:
app_data = {
"name": app.name,
"app_name": app.app_name,
"title": app.title,
"subtitle": app.subtitle,
"category": app.category,
"enabled": app.enabled,
"public": app.public,
"team": team_user_names.get(app.team),
"status": app.status,
"repository_url": app.repository_url,
"file_url": app.file_url,
"app_image": app.app_image,
"creation": app.creation,
"modified": app.modified
}
result.append(app_data)
return result
@jingrow.whitelist(allow_guest=True)
def get_local_node_list(filters=None, order_by=None, limit_start=None, limit_page_length=None):
"""获取本地节点列表"""
# 构建查询条件
query_filters = {"public": 1}
if filters:
if isinstance(filters, str):
import json
filters = json.loads(filters)
query_filters.update(filters)
# 默认排序
if not order_by:
order_by = "node_type asc"
# 获取节点列表
nodes = jingrow.get_all(
"Local Node",
filters=query_filters,
fields=[
"name", "node_type", "title", "subtitle",
"enabled", "public", "team", "status", "repository_url",
"file_url", "node_image", "creation", "modified"
],
order_by=order_by,
limit_start=limit_start,
limit_page_length=limit_page_length
)
# 批量获取团队用户名称
team_user_names = _get_team_user_names(nodes)
# 格式化返回数据
result = []
for node in nodes:
node_data = {
"name": node.name,
"node_type": node.node_type,
"title": node.title,
"subtitle": node.subtitle,
"enabled": node.enabled,
"public": node.public,
"team": team_user_names.get(node.team),
"status": node.status,
"repository_url": node.repository_url,
"file_url": node.file_url,
"node_image": node.node_image,
"creation": node.creation,
"modified": node.modified
}
result.append(node_data)
return result
@jingrow.whitelist(allow_guest=True)
def get_local_agent_list(filters=None, order_by=None, limit_start=None, limit_page_length=None):
"""获取本地Agent列表"""
# 构建查询条件
query_filters = {"public": 1}
if filters:
if isinstance(filters, str):
import json
filters = json.loads(filters)
query_filters.update(filters)
# 默认排序
if not order_by:
order_by = "agent_name asc"
# 获取Agent列表
agents = jingrow.get_all(
"Local Agent",
filters=query_filters,
fields=[
"name", "agent_name", "title", "subtitle",
"enabled", "public", "team", "status", "repository_url",
"file_url", "agent_image", "creation", "modified"
],
order_by=order_by,
limit_start=limit_start,
limit_page_length=limit_page_length
)
# 批量获取团队用户名称
team_user_names = _get_team_user_names(agents)
# 格式化返回数据
result = []
for agent in agents:
agent_data = {
"name": agent.name,
"agent_name": agent.agent_name,
"title": agent.title,
"subtitle": agent.subtitle,
"enabled": agent.enabled,
"public": agent.public,
"team": team_user_names.get(agent.team),
"status": agent.status,
"repository_url": agent.repository_url,
"file_url": agent.file_url,
"agent_image": agent.agent_image,
"creation": agent.creation,
"modified": agent.modified
}
result.append(agent_data)
return result
@jingrow.whitelist(allow_guest=True)
def get_local_tool_list(filters=None, order_by=None, limit_start=None, limit_page_length=None):
"""获取本地工具列表"""
# 构建查询条件
query_filters = {"public": 1}
if filters:
if isinstance(filters, str):
import json
filters = json.loads(filters)
query_filters.update(filters)
# 默认排序
if not order_by:
order_by = "tool_name asc"
# 获取工具列表
tools = jingrow.get_all(
"Local Tool",
filters=query_filters,
fields=[
"name", "tool_name", "title", "subtitle",
"enabled", "public", "team", "status", "repository_url",
"file_url", "tool_image", "creation", "modified"
],
order_by=order_by,
limit_start=limit_start,
limit_page_length=limit_page_length
)
# 批量获取团队用户名称
team_user_names = _get_team_user_names(tools)
# 格式化返回数据
result = []
for tool in tools:
tool_data = {
"name": tool.name,
"tool_name": tool.tool_name,
"title": tool.title,
"subtitle": tool.subtitle,
"enabled": tool.enabled,
"public": tool.public,
"team": team_user_names.get(tool.team),
"status": tool.status,
"repository_url": tool.repository_url,
"file_url": tool.file_url,
"tool_image": tool.tool_image,
"creation": tool.creation,
"modified": tool.modified
}
result.append(tool_data)
return result
@jingrow.whitelist(allow_guest=True)
def get_local_app(name):
"""获取本地应用详情"""
if not jingrow.db.exists("Local App", name):
return {"success": False, "message": "应用不存在"}
app = jingrow.get_pg("Local App", name)
# 获取团队关联用户的名称
team_user_names = _get_team_user_names([app])
app_data = {
"name": app.name,
"app_name": app.app_name,
"title": app.title,
"subtitle": app.subtitle,
"description": app.description,
"category": app.category,
"enabled": app.enabled,
"public": app.public,
"team": team_user_names.get(app.team),
"status": app.status,
"repository_url": app.repository_url,
"file_url": app.file_url,
"app_image": app.app_image,
"creation": app.creation,
"modified": app.modified
}
return app_data
@dashboard_whitelist()
def get_my_local_app_list(filters=None, order_by=None, limit_start=None, limit_page_length=None):
"""获取当前团队的应用列表"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
# 构建查询条件
query_filters = {"team": team}
if filters:
if isinstance(filters, str):
import json
filters = json.loads(filters)
query_filters.update(filters)
# 默认排序
if not order_by:
order_by = "app_name asc"
# 获取应用列表
apps = jingrow.get_all(
"Local App",
filters=query_filters,
fields=[
"name", "app_name", "title", "subtitle", "category",
"enabled", "public", "team", "status", "repository_url",
"file_url", "app_image", "creation", "modified"
],
order_by=order_by,
limit_start=limit_start,
limit_page_length=limit_page_length
)
# 批量获取团队用户名称
team_user_names = _get_team_user_names(apps)
# 格式化返回数据
result = []
for app in apps:
app_data = {
"name": app.name,
"app_name": app.app_name,
"title": app.title,
"subtitle": app.subtitle,
"category": app.category,
"enabled": app.enabled,
"public": app.public,
"team": team_user_names.get(app.team),
"status": app.status,
"repository_url": app.repository_url,
"file_url": app.file_url,
"app_image": app.app_image,
"creation": app.creation,
"modified": app.modified
}
result.append(app_data)
return result
@dashboard_whitelist()
def create_local_app(app_data):
"""创建本地应用"""
team = get_current_team()
if isinstance(app_data, str):
import json
app_data = json.loads(app_data)
# 检查应用名称是否已存在
existing_app = jingrow.db.exists("Local App", {"app_name": app_data["app_name"]})
if existing_app:
return {"error": "应用名称已存在,请使用其他名称"}
# 创建应用
app = jingrow.get_pg({
"pagetype": "Local App",
"app_name": app_data["app_name"],
"title": app_data["title"],
"subtitle": app_data.get("subtitle", ""),
"description": app_data.get("description", ""),
"category": app_data.get("category"),
"team": team,
"repository_url": app_data.get("repository_url"),
"file_url": app_data.get("file_url"),
"app_image": app_data.get("app_image")
})
app.insert()
return {"success": True, "name": app.name, "message": "应用创建成功"}
@dashboard_whitelist()
def update_local_app(name, app_data):
"""更新本地应用"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
if not jingrow.db.exists("Local App", name):
return {"success": False, "message": "应用不存在"}
app = jingrow.get_pg("Local App", name)
# 验证权限:只能更新自己团队的应用
if app.team != team:
return {"success": False, "message": "您没有权限操作此应用"}
# 解析 JSON 数据
if isinstance(app_data, str):
try:
import json
app_data = json.loads(app_data)
except json.JSONDecodeError:
return {"success": False, "message": "JSON 数据格式错误"}
# 更新字段
updated_fields = []
for field, value in app_data.items():
if hasattr(app, field):
setattr(app, field, value)
updated_fields.append(field)
# 检查是否有字段被更新
if not updated_fields:
return {"success": False, "message": "没有有效的字段被更新"}
# 使用 ignore_permissions=True 因为我们已经手动验证了团队权限
app.save(ignore_permissions=True)
return {"success": True, "name": app.name, "updated_fields": updated_fields, "message": "应用更新成功"}
@dashboard_whitelist()
def delete_local_app(name):
"""删除本地应用"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
if not jingrow.db.exists("Local App", name):
return {"success": False, "message": "应用不存在"}
app = jingrow.get_pg("Local App", name)
# 验证权限:只能删除自己团队的应用
if app.team != team:
return {"success": False, "message": "您没有权限操作此应用"}
# 使用 ignore_permissions=True 因为我们已经手动验证了团队权限
app.delete(ignore_permissions=True)
return {"success": True, "message": "应用删除成功"}
@dashboard_whitelist()
def get_my_local_node_list(filters=None, order_by=None, limit_start=None, limit_page_length=None):
"""获取当前团队的节点列表"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
# 构建查询条件
query_filters = {"team": team}
if filters:
if isinstance(filters, str):
import json
filters = json.loads(filters)
query_filters.update(filters)
# 默认排序
if not order_by:
order_by = "node_type asc"
# 获取节点列表
nodes = jingrow.get_all(
"Local Node",
filters=query_filters,
fields=[
"name", "node_type", "title", "subtitle",
"enabled", "public", "team", "status", "repository_url",
"file_url", "node_image", "creation", "modified"
],
order_by=order_by,
limit_start=limit_start,
limit_page_length=limit_page_length
)
# 批量获取团队用户名称
team_user_names = _get_team_user_names(nodes)
# 格式化返回数据
result = []
for node in nodes:
node_data = {
"name": node.name,
"node_type": node.node_type,
"title": node.title,
"subtitle": node.subtitle,
"enabled": node.enabled,
"public": node.public,
"team": team_user_names.get(node.team),
"status": node.status,
"repository_url": node.repository_url,
"file_url": node.file_url,
"node_image": node.node_image,
"creation": node.creation,
"modified": node.modified
}
result.append(node_data)
return result
@jingrow.whitelist(allow_guest=True)
def get_local_node(name):
"""获取本地节点详情"""
if not jingrow.db.exists("Local Node", name):
return {"success": False, "message": "节点不存在"}
node = jingrow.get_pg("Local Node", name)
# 获取团队关联用户的名称
team_user_names = _get_team_user_names([node])
node_data = {
"name": node.name,
"node_type": node.node_type,
"title": node.title,
"subtitle": node.subtitle,
"description": node.description,
"enabled": node.enabled,
"public": node.public,
"team": team_user_names.get(node.team),
"status": node.status,
"repository_url": node.repository_url,
"file_url": node.file_url,
"node_image": node.node_image,
"creation": node.creation,
"modified": node.modified
}
return node_data
@dashboard_whitelist()
def create_local_node(node_data):
"""创建本地节点"""
team = get_current_team()
if isinstance(node_data, str):
import json
node_data = json.loads(node_data)
# 检查节点类型是否已存在
existing_node = jingrow.db.exists("Local Node", {"node_type": node_data["node_type"]})
if existing_node:
return {"error": "节点类型已存在,请使用其他类型"}
# 创建节点
node = jingrow.get_pg({
"pagetype": "Local Node",
"node_type": node_data["node_type"],
"title": node_data["title"],
"subtitle": node_data.get("subtitle", ""),
"description": node_data.get("description", ""),
"team": team,
"repository_url": node_data.get("repository_url"),
"file_url": node_data.get("file_url"),
"node_image": node_data.get("node_image")
})
node.insert()
return {"success": True, "name": node.name, "message": "节点创建成功"}
@dashboard_whitelist()
def update_local_node(name, node_data):
"""更新本地节点"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
if not jingrow.db.exists("Local Node", name):
return {"success": False, "message": "节点不存在"}
node = jingrow.get_pg("Local Node", name)
# 验证权限:只能更新自己团队的节点
if node.team != team:
return {"success": False, "message": "您没有权限操作此节点"}
# 解析 JSON 数据
if isinstance(node_data, str):
try:
import json
node_data = json.loads(node_data)
except json.JSONDecodeError:
return {"success": False, "message": "JSON 数据格式错误"}
# 更新字段
updated_fields = []
for field, value in node_data.items():
if hasattr(node, field):
setattr(node, field, value)
updated_fields.append(field)
# 检查是否有字段被更新
if not updated_fields:
return {"success": False, "message": "没有有效的字段被更新"}
# 使用 ignore_permissions=True 因为我们已经手动验证了团队权限
node.save(ignore_permissions=True)
return {"success": True, "name": node.name, "updated_fields": updated_fields, "message": "节点更新成功"}
@dashboard_whitelist()
def get_my_local_agent_list(filters=None, order_by=None, limit_start=None, limit_page_length=None):
"""获取当前团队的Agent列表"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
# 构建查询条件
query_filters = {"team": team}
if filters:
if isinstance(filters, str):
import json
filters = json.loads(filters)
query_filters.update(filters)
# 默认排序
if not order_by:
order_by = "agent_name asc"
# 获取Agent列表
agents = jingrow.get_all(
"Local Agent",
filters=query_filters,
fields=[
"name", "agent_name", "title", "subtitle",
"enabled", "public", "team", "status", "repository_url",
"file_url", "agent_image", "creation", "modified"
],
order_by=order_by,
limit_start=limit_start,
limit_page_length=limit_page_length
)
# 批量获取团队用户名称
team_user_names = _get_team_user_names(agents)
# 格式化返回数据
result = []
for agent in agents:
agent_data = {
"name": agent.name,
"agent_name": agent.agent_name,
"title": agent.title,
"subtitle": agent.subtitle,
"enabled": agent.enabled,
"public": agent.public,
"team": team_user_names.get(agent.team),
"status": agent.status,
"repository_url": agent.repository_url,
"file_url": agent.file_url,
"agent_image": agent.agent_image,
"creation": agent.creation,
"modified": agent.modified
}
result.append(agent_data)
return result
@dashboard_whitelist()
def get_my_local_tool_list(filters=None, order_by=None, limit_start=None, limit_page_length=None):
"""获取当前团队的工具列表"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
# 构建查询条件
query_filters = {"team": team}
if filters:
if isinstance(filters, str):
import json
filters = json.loads(filters)
query_filters.update(filters)
# 默认排序
if not order_by:
order_by = "tool_name asc"
# 获取工具列表
tools = jingrow.get_all(
"Local Tool",
filters=query_filters,
fields=[
"name", "tool_name", "title", "subtitle",
"enabled", "public", "team", "status", "repository_url",
"file_url", "tool_image", "creation", "modified"
],
order_by=order_by,
limit_start=limit_start,
limit_page_length=limit_page_length
)
# 批量获取团队用户名称
team_user_names = _get_team_user_names(tools)
# 格式化返回数据
result = []
for tool in tools:
tool_data = {
"name": tool.name,
"tool_name": tool.tool_name,
"title": tool.title,
"subtitle": tool.subtitle,
"enabled": tool.enabled,
"public": tool.public,
"team": team_user_names.get(tool.team),
"status": tool.status,
"repository_url": tool.repository_url,
"file_url": tool.file_url,
"tool_image": tool.tool_image,
"creation": tool.creation,
"modified": tool.modified
}
result.append(tool_data)
return result
@jingrow.whitelist(allow_guest=True)
def get_local_agent(name):
"""获取本地Agent详情"""
if not jingrow.db.exists("Local Agent", name):
return {"success": False, "message": "Agent不存在"}
agent = jingrow.get_pg("Local Agent", name)
# 获取团队关联用户的名称
team_user_names = _get_team_user_names([agent])
agent_data = {
"name": agent.name,
"agent_name": agent.agent_name,
"title": agent.title,
"subtitle": agent.subtitle,
"description": agent.description,
"enabled": agent.enabled,
"public": agent.public,
"team": team_user_names.get(agent.team),
"status": agent.status,
"repository_url": agent.repository_url,
"file_url": agent.file_url,
"agent_image": agent.agent_image,
"agent_flow": agent.agent_flow,
"creation": agent.creation,
"modified": agent.modified
}
return agent_data
@jingrow.whitelist(allow_guest=True)
def get_local_tool(name):
"""获取本地工具详情"""
if not jingrow.db.exists("Local Tool", name):
return {"success": False, "message": "工具不存在"}
tool = jingrow.get_pg("Local Tool", name)
# 获取团队关联用户的名称
team_user_names = _get_team_user_names([tool])
tool_data = {
"name": tool.name,
"tool_name": tool.tool_name,
"title": tool.title,
"subtitle": tool.subtitle,
"description": tool.description,
"enabled": tool.enabled,
"public": tool.public,
"team": team_user_names.get(tool.team),
"status": tool.status,
"repository_url": tool.repository_url,
"file_url": tool.file_url,
"tool_image": tool.tool_image,
"creation": tool.creation,
"modified": tool.modified
}
return tool_data
@dashboard_whitelist()
def create_local_agent(agent_data):
"""创建本地Agent"""
team = get_current_team()
if isinstance(agent_data, str):
import json
agent_data = json.loads(agent_data)
# 检查Agent名称是否已存在
existing_agent = jingrow.db.exists("Local Agent", {"agent_name": agent_data["agent_name"]})
if existing_agent:
return {"error": "Agent名称已存在请使用其他名称"}
# 创建Agent
agent_flow = agent_data.get("agent_flow")
agent = jingrow.get_pg({
"pagetype": "Local Agent",
"agent_name": agent_data["agent_name"],
"title": agent_data["title"],
"subtitle": agent_data.get("subtitle", ""),
"description": agent_data.get("description", ""),
"team": team,
"repository_url": agent_data.get("repository_url"),
"file_url": agent_data.get("file_url"),
"agent_image": agent_data.get("agent_image"),
"agent_flow": agent_flow
})
agent.insert()
return {"success": True, "name": agent.name, "message": "Agent创建成功"}
@dashboard_whitelist()
def create_local_tool(tool_data):
"""创建本地工具"""
team = get_current_team()
if isinstance(tool_data, str):
import json
tool_data = json.loads(tool_data)
# 检查工具名称是否已存在
existing_tool = jingrow.db.exists("Local Tool", {"tool_name": tool_data["tool_name"]})
if existing_tool:
return {"error": "工具名称已存在,请使用其他名称"}
# 创建工具
tool = jingrow.get_pg({
"pagetype": "Local Tool",
"tool_name": tool_data["tool_name"],
"title": tool_data["title"],
"subtitle": tool_data.get("subtitle", ""),
"description": tool_data.get("description", ""),
"team": team,
"repository_url": tool_data.get("repository_url"),
"file_url": tool_data.get("file_url"),
"tool_image": tool_data.get("tool_image")
})
tool.insert()
return {"success": True, "name": tool.name, "message": "工具创建成功"}
@dashboard_whitelist()
def update_local_agent(name, agent_data):
"""更新本地Agent"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
if not jingrow.db.exists("Local Agent", name):
return {"success": False, "message": "Agent不存在"}
agent = jingrow.get_pg("Local Agent", name)
# 验证权限只能更新自己团队的Agent
if agent.team != team:
return {"success": False, "message": "您没有权限操作此Agent"}
# 解析 JSON 数据
if isinstance(agent_data, str):
try:
import json
agent_data = json.loads(agent_data)
except json.JSONDecodeError:
return {"success": False, "message": "JSON 数据格式错误"}
# 更新字段
updated_fields = []
for field, value in agent_data.items():
if hasattr(agent, field):
setattr(agent, field, value)
updated_fields.append(field)
# 检查是否有字段被更新
if not updated_fields:
return {"success": False, "message": "没有有效的字段被更新"}
# 使用 ignore_permissions=True 因为我们已经手动验证了团队权限
agent.save(ignore_permissions=True)
return {"success": True, "name": agent.name, "updated_fields": updated_fields, "message": "Agent更新成功"}
@dashboard_whitelist()
def update_local_tool(name, tool_data):
"""更新本地工具"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
if not jingrow.db.exists("Local Tool", name):
return {"success": False, "message": "工具不存在"}
tool = jingrow.get_pg("Local Tool", name)
# 验证权限:只能更新自己团队的工具
if tool.team != team:
return {"success": False, "message": "您没有权限操作此工具"}
# 解析 JSON 数据
if isinstance(tool_data, str):
try:
import json
tool_data = json.loads(tool_data)
except json.JSONDecodeError:
return {"success": False, "message": "JSON 数据格式错误"}
# 更新字段
updated_fields = []
for field, value in tool_data.items():
if hasattr(tool, field):
setattr(tool, field, value)
updated_fields.append(field)
# 检查是否有字段被更新
if not updated_fields:
return {"success": False, "message": "没有有效的字段被更新"}
# 使用 ignore_permissions=True 因为我们已经手动验证了团队权限
tool.save(ignore_permissions=True)
return {"success": True, "name": tool.name, "updated_fields": updated_fields, "message": "工具更新成功"}
@dashboard_whitelist()
def delete_local_node(name):
"""删除本地节点"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
if not jingrow.db.exists("Local Node", name):
return {"success": False, "message": "节点不存在"}
node = jingrow.get_pg("Local Node", name)
# 验证权限:只能删除自己团队的节点
if node.team != team:
return {"success": False, "message": "您没有权限操作此节点"}
# 使用 ignore_permissions=True 因为我们已经手动验证了团队权限
node.delete(ignore_permissions=True)
return {"success": True, "message": "节点删除成功"}
@dashboard_whitelist()
def delete_local_agent(name):
"""删除本地Agent"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
if not jingrow.db.exists("Local Agent", name):
return {"success": False, "message": "Agent不存在"}
agent = jingrow.get_pg("Local Agent", name)
# 验证权限只能删除自己团队的Agent
if agent.team != team:
return {"success": False, "message": "您没有权限操作此Agent"}
# 使用 ignore_permissions=True 因为我们已经手动验证了团队权限
agent.delete(ignore_permissions=True)
return {"success": True, "message": "Agent删除成功"}
@dashboard_whitelist()
def delete_local_tool(name):
"""删除本地工具"""
team = get_current_team()
if not team:
return {"success": False, "message": "未找到当前团队信息"}
if not jingrow.db.exists("Local Tool", name):
return {"success": False, "message": "工具不存在"}
tool = jingrow.get_pg("Local Tool", name)
# 验证权限:只能删除自己团队的工具
if tool.team != team:
return {"success": False, "message": "您没有权限操作此工具"}
# 使用 ignore_permissions=True 因为我们已经手动验证了团队权限
tool.delete(ignore_permissions=True)
return {"success": True, "message": "工具删除成功"}