优化get_pagetype_module_app,如果module是custom类型,app从package取值

This commit is contained in:
jingrow 2025-10-29 17:17:17 +08:00
parent 92c96474b9
commit be9df68ce0

View File

@ -4,9 +4,12 @@ import os
import shutil
import subprocess
import tempfile
import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional
from jingrow.ai.utils.node_schema import get_node_schema_fields
from jingrow.modules.utils import get_pagetype_module, get_module_app
from jingrow.modules.export_file import export_pg
from jingrow import _
@jingrow.whitelist()
@ -647,14 +650,17 @@ def get_pagetype_module_app(pagetype: str) -> Dict[str, Any]:
module = get_pagetype_module(pagetype)
module_scrub = jingrow.scrub(module)
# 首先从 module_app 映射获取O(1) 字典查找)
# 先查缓存
app = jingrow.local.module_app.get(module_scrub)
# 如果找不到,查询数据库(可能是 custom module
# 未命中则一次性读取 Module Defcustom 优先 package否则取 app_name最后兜底
if app is None:
app_name = jingrow.db.get_value("Module Def", {"module_name": module}, "app_name", ignore=True)
app = app_name
# 将结果回写到 module_app 映射,下次直接命中(优化性能)
module_def = jingrow.get_pg("Module Def", {"module_name": module})
if module_def.custom:
app = module_def.package or module_def.app_name
else:
app = get_module_app(module)
jingrow.local.module_app[module_scrub] = app
return {
@ -739,96 +745,185 @@ def sync_app_files(app_name: str, app_path: str, force: bool = True) -> Dict[str
@jingrow.whitelist()
def install_package(package_file_url: str) -> Dict[str, Any]:
"""一键安装包到数据库"""
try:
"""一键安装包到数据库"""
try:
filename = package_file_url.lstrip('/files/').lstrip('files/')
site_path = jingrow.get_site_path()
file_path = os.path.join(site_path, "public", "files", filename)
file_path = os.path.abspath(file_path)
if not os.path.exists(file_path):
return {
'success': False,
'error': f'文件不存在: {file_path}'
}
temp_dir = tempfile.mkdtemp(prefix="jingrow_package_install_")
try:
result = subprocess.run(
["tar", "xzf", file_path, "-C", temp_dir],
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
return {'success': False, 'error': f"解压失败: {result.stderr}"}
root_items = os.listdir(temp_dir)
if not root_items:
return {'success': False, 'error': '压缩包为空'}
root_dir = os.path.join(temp_dir, root_items[0]) if len(root_items) == 1 and os.path.isdir(os.path.join(temp_dir, root_items[0])) else temp_dir
package_name = os.path.basename(root_dir)
package_json_path = os.path.join(root_dir, f"{package_name}.json")
if not os.path.exists(package_json_path):
# 搜索所有 json 文件
for root, dirs, files in os.walk(root_dir):
for file in files:
if file.endswith('.json'):
try:
with open(os.path.join(root, file), 'r', encoding='utf-8') as f:
data = json.load(f)
if data.get('pagetype') == 'Package':
package_json_path = os.path.join(root, file)
break
except:
pass
if os.path.exists(package_json_path):
break
if os.path.exists(package_json_path):
break
if not os.path.exists(package_json_path):
return {'success': False, 'error': '找不到 Package.json 文件'}
from jingrow.modules.import_file import import_pg, import_file_by_path
from jingrow.model.sync import get_pg_files
with open(package_json_path, 'r', encoding='utf-8') as f:
pg_dict = json.load(f)
package_doc = import_pg(pg_dict, ignore_version=True)
jingrow.flags.package = package_doc
files = []
for module in os.listdir(root_dir):
module_path = os.path.join(root_dir, module)
if os.path.isdir(module_path):
files = get_pg_files(files, module_path)
imported_files = []
for file_path_item in files:
try:
import_file_by_path(file_path_item, force=True, ignore_version=True)
imported_files.append(file_path_item)
except Exception:
pass
shutil.rmtree(temp_dir)
return {
'success': True,
'message': f'扩展包 {package_name} 安装成功',
'package_name': package_name,
'imported_files': imported_files,
'file_count': len(imported_files)
}
except Exception as e:
return {'success': False, 'error': str(e)}
except Exception as e:
return {'success': False, 'error': str(e)}
filename = package_file_url.lstrip('/files/').lstrip('files/')
site_path = jingrow.get_site_path()
file_path = os.path.join(site_path, "public", "files", filename)
file_path = os.path.abspath(file_path)
if not os.path.exists(file_path):
return {
'success': False,
'error': f'文件不存在: {file_path}'
}
temp_dir = tempfile.mkdtemp(prefix="jingrow_package_install_")
try:
result = subprocess.run(
["tar", "xzf", file_path, "-C", temp_dir],
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
return {'success': False, 'error': f"解压失败: {result.stderr}"}
root_items = os.listdir(temp_dir)
if not root_items:
return {'success': False, 'error': '压缩包为空'}
root_dir = os.path.join(temp_dir, root_items[0]) if len(root_items) == 1 and os.path.isdir(os.path.join(temp_dir, root_items[0])) else temp_dir
package_name = os.path.basename(root_dir)
package_json_path = os.path.join(root_dir, f"{package_name}.json")
if not os.path.exists(package_json_path):
# 搜索所有 json 文件
for root, dirs, files in os.walk(root_dir):
for file in files:
if file.endswith('.json'):
try:
with open(os.path.join(root, file), 'r', encoding='utf-8') as f:
data = json.load(f)
if data.get('pagetype') == 'Package':
package_json_path = os.path.join(root, file)
break
except:
pass
if os.path.exists(package_json_path):
break
if os.path.exists(package_json_path):
break
if not os.path.exists(package_json_path):
return {'success': False, 'error': '找不到 Package.json 文件'}
from jingrow.modules.import_file import import_pg, import_file_by_path
from jingrow.model.sync import get_pg_files
with open(package_json_path, 'r', encoding='utf-8') as f:
pg_dict = json.load(f)
package_doc = import_pg(pg_dict, ignore_version=True)
jingrow.flags.package = package_doc
files = []
for module in os.listdir(root_dir):
module_path = os.path.join(root_dir, module)
if os.path.isdir(module_path):
files = get_pg_files(files, module_path)
imported_files = []
for file_path_item in files:
try:
import_file_by_path(file_path_item, force=True, ignore_version=True)
imported_files.append(file_path_item)
except Exception:
pass
shutil.rmtree(temp_dir)
return {
'success': True,
'message': f'扩展包 {package_name} 安装成功',
'package_name': package_name,
'imported_files': imported_files,
'file_count': len(imported_files)
}
except Exception as e:
return {'success': False, 'error': str(e)}
except Exception as e:
return {'success': False, 'error': str(e)}
@jingrow.whitelist()
def export_app_metadata(app_name: str) -> Dict[str, Any]:
"""导出应用扩展包元数据 - 直接返回Package、Module、PageType的JSON数据结构"""
try:
# 1. 根据 app_name 查找对应的 Package
package = None
packages = jingrow.get_all("Package", filters={"name": app_name})
if packages:
package = jingrow.get_pg("Package", packages[0].name)
else:
# 如果没有找到,尝试使用 app_name 作为 package_name 查找
packages = jingrow.get_all("Package", filters={"package_name": app_name})
if packages:
package = jingrow.get_pg("Package", packages[0].name)
if not package:
return {
"success": False,
"error": f"未找到应用 '{app_name}' 对应的 Package"
}
# 2. 收集Package元数据
metadata = {
"package": package.as_dict(no_nulls=True),
"package_name": package.package_name,
"readme": str(package.readme or ""),
"license": str(package.license or "") if package.license else None,
"modules": {}
}
# 3. 收集所有Module及其文档的元数据
modules = jingrow.get_all("Module Def", filters={"package": package.name})
for m in modules:
try:
module = jingrow.get_pg("Module Def", m.name)
if not module or not hasattr(module, 'meta'):
continue
# 收集该模块下的所有文档
module_docs = {}
for l in module.meta.links:
if l.link_pagetype == "Module Def":
continue
try:
# 获取该类型的所有文档
docs = jingrow.get_all(l.link_pagetype, filters={"module": m.name})
for d in docs:
try:
doc = jingrow.get_pg(l.link_pagetype, d.name)
# 导出文档数据
doc_export = doc.as_dict(no_nulls=True)
# 添加到模块文档
pagetype_name = l.link_pagetype
if pagetype_name not in module_docs:
module_docs[pagetype_name] = {}
module_docs[pagetype_name][doc.name] = doc_export
except Exception:
continue
except Exception:
continue
metadata["modules"][m.name] = {
"module_info": m.as_dict(no_nulls=True),
"docs": module_docs
}
except Exception:
continue
# 4. 返回元数据
result = {
"success": True,
"metadata": metadata
}
return result
except Exception as e:
import traceback
jingrow.log_error("Export app metadata error", str(e))
return {
"success": False,
"error": f"导出扩展包元数据失败: {str(e)}"
}