add after_install hook for node dependencies with execution caching

This commit is contained in:
jingrow 2025-11-14 01:43:14 +08:00
parent 8a09f790fd
commit 0933f37ab3
7 changed files with 2009 additions and 3 deletions

3
.gitignore vendored
View File

@ -44,3 +44,6 @@ apps/*
# uv/virtualenv忽略所有目录下的 .venv保留 uv.lock 追踪 # uv/virtualenv忽略所有目录下的 .venv保留 uv.lock 追踪
.venv/ .venv/
**/.venv/ **/.venv/
# 运行时缓存文件
.cache/

View File

@ -0,0 +1,102 @@
{
"metadata": {
"type": "batch_create_records",
"label": "批量创建记录",
"icon": "fa-layer-group",
"color": "#10b981",
"description": "批量创建页面记录,特别适用于处理产品列表等数组数据",
"group": "输出",
"component_type": "GenericNode"
},
"properties": {
"pagetype": {
"type": "string",
"title": "页面类型",
"description": "要创建记录的页面类型Jsite Product、Jsite Article",
"minLength": 1
},
"field_map": {
"type": "array",
"title": "字段映射",
"description": "配置字段映射关系,将上游节点的数据映射到目标字段。如果不配置,将使用自动标签映射",
"items": {
"type": "object",
"properties": {
"from": {
"type": "string",
"title": "来源字段",
"description": "上游节点输出的字段名title、price、images"
},
"to": {
"type": "string",
"title": "目标字段",
"description": "目标页面类型中的字段名(使用字段标签)"
}
},
"required": [
"from",
"to"
]
}
},
"default_site": {
"type": "string",
"title": "默认网站",
"description": "Jsite Product 等类型需要 site 字段请填写网站记录的名称name不是 URL。例如如果网站记录的名称是 'mysite',就填写 'mysite'。可以在网站设置页面查看网站记录的名称。"
},
"show_output_handle": {
"type": "boolean",
"title": "显示输出接口",
"description": "是否显示节点的输出连接点"
}
},
"required": [
"pagetype"
],
"_layout": {
"tabs": [
{
"id": "tab_1",
"label": "基础配置",
"sections": [
{
"id": "section_1",
"label": "基本信息",
"columns": [
{
"id": "column_1",
"label": "记录设置",
"fields": [
"pagetype",
"default_site",
"show_output_handle"
]
}
]
}
]
},
{
"id": "tab_2",
"label": "字段映射",
"sections": [
{
"id": "section_2",
"label": "映射配置",
"columns": [
{
"id": "column_2",
"label": "字段映射",
"fields": [
"field_map"
]
}
]
}
]
}
],
"activeTab": "tab_2"
}
}

View File

@ -0,0 +1,538 @@
import json
from typing import Dict, Any, Optional, List
import jingrow
def get_branch_upstream_nodes(current_node_id, flow_data):
"""获取同一分支的所有上游节点ID列表"""
if not current_node_id or not flow_data:
return []
edges = flow_data.get("edges", [])
upstream_nodes = set()
stack = [current_node_id]
while stack:
node_id = stack.pop()
for edge in edges:
if edge.get("target") == node_id:
source_id = edge.get("source")
if source_id and source_id not in upstream_nodes:
upstream_nodes.add(source_id)
stack.append(source_id)
return list(upstream_nodes)
def execute(context=None, inputs=None, config=None, **kwargs):
"""
批量创建记录节点
用于批量创建记录特别适用于处理 web_scrapers 节点返回的产品列表
"""
if context is None:
context = kwargs.get("context", {})
if inputs is None:
inputs = kwargs.get("inputs", {})
if config is None:
config = kwargs.get("config", {})
try:
# 参数处理
if not isinstance(inputs, dict):
try:
inputs = json.loads(inputs) if isinstance(inputs, str) else {}
except Exception:
inputs = {}
# 获取 pagetype
record_type = None
if isinstance(inputs, dict):
record_type = inputs.get("pagetype")
if not record_type and isinstance(config, dict):
record_type = config.get("pagetype")
if not isinstance(record_type, str) or not record_type:
return {
"success": False,
"error": "pagetype 必须为非空字符串,请检查配置"
}
# 获取字段映射配置
field_map = config.get("field_map", [])
# 获取默认网站配置(用于 Jsite Product 等需要 site 字段的类型)
default_site = config.get("default_site", "")
# 从 Jingrow 获取字段 label->fieldname 映射
label2field = jingrow.get_field_mapping_from_jingrow(record_type) or {}
# 获取字段元数据(用于检查字段长度限制)
field_metadata = {}
try:
from jingrow.model.page import get_page_instance
pg = get_page_instance(record_type)
meta_result = pg.get_meta()
if isinstance(meta_result, dict) and meta_result.get('success'):
meta_data = meta_result.get('data', {})
fields = meta_data.get('fields', [])
# 构建字段名到字段元数据的映射
for field in fields:
fieldname = field.get('fieldname')
if fieldname:
# 尝试多种方式获取长度限制
max_length = (
field.get('length') or
field.get('max_length') or
field.get('maxlength') or
(field.get('df') and field.get('df').get('length')) or
None
)
# 如果是整数,转换为整数
if max_length is not None:
try:
max_length = int(max_length)
except (ValueError, TypeError):
max_length = None
field_metadata[fieldname] = {
'max_length': max_length,
'fieldtype': field.get('fieldtype'),
'label': field.get('label')
}
except Exception as e:
# 如果获取元数据失败,记录错误但继续执行
print(f"获取字段元数据失败: {str(e)}")
pass
# 如果获取元数据失败,使用已知的常见字段长度限制
# 根据错误信息title 字段通常是 140 字符
# 为常见字段设置默认长度限制(即使元数据获取成功,也作为后备)
common_field_limits = {
'title': 140, # 从错误信息中得知
'name': 140,
}
for fieldname, default_length in common_field_limits.items():
if fieldname not in field_metadata:
field_metadata[fieldname] = {'max_length': default_length}
elif not field_metadata[fieldname].get('max_length'):
field_metadata[fieldname]['max_length'] = default_length
# 同时,为所有通过 label2field 映射的字段设置默认限制(如果它们看起来是标题字段)
for label, fieldname in label2field.items():
if fieldname not in field_metadata or not field_metadata[fieldname].get('max_length'):
# 如果字段标签包含"标题"或"title",且字段名是 title 或 name设置默认限制
label_str = str(label).lower()
if ('标题' in str(label) or 'title' in label_str) and fieldname in ['title', 'name']:
if fieldname not in field_metadata:
field_metadata[fieldname] = {'max_length': 140}
elif not field_metadata[fieldname].get('max_length'):
field_metadata[fieldname]['max_length'] = 140
# 获取数据数组
# 支持多种数据源:
# 1. 从 inputs 中获取 products 数组web_scrapers 节点输出)
# 2. 从 inputs 中获取 items 数组(通用数组)
# 3. 从上游节点获取 products 或 items
data_array = None
# 方法1: 从 inputs 直接获取
if isinstance(inputs, dict):
# web_scrapers 节点返回格式: {"products": [...]}
if "products" in inputs and isinstance(inputs["products"], list):
data_array = inputs["products"]
# 通用格式: {"items": [...]}
elif "items" in inputs and isinstance(inputs["items"], list):
data_array = inputs["items"]
# 如果 inputs 本身是数组
elif isinstance(inputs, list):
data_array = inputs
# 方法2: 从上游节点获取
if not data_array:
current_node_id = context.get("current_node_id")
flow_data = context.get("flow_data", {})
branch_upstream_nodes = get_branch_upstream_nodes(current_node_id, flow_data)
if context and "node_results" in context and branch_upstream_nodes:
node_results = context["node_results"]
for upstream_node_id in branch_upstream_nodes:
if upstream_node_id in node_results:
upstream_result = node_results[upstream_node_id]
if isinstance(upstream_result, dict):
# web_scrapers 节点返回格式
if "products" in upstream_result and isinstance(upstream_result["products"], list):
data_array = upstream_result["products"]
break
# 通用格式
elif "items" in upstream_result and isinstance(upstream_result["items"], list):
data_array = upstream_result["items"]
break
# 如果有 ai_raw_response尝试解析 JSON
elif "ai_raw_response" in upstream_result and isinstance(upstream_result["ai_raw_response"], str):
try:
raw_response = upstream_result["ai_raw_response"].strip()
start_brace = raw_response.find('[')
end_brace = raw_response.rfind(']')
if start_brace != -1 and end_brace != -1 and end_brace > start_brace:
json_content = raw_response[start_brace:end_brace + 1]
parsed_json = json.loads(json_content)
if isinstance(parsed_json, list):
data_array = parsed_json
break
except:
continue
if not data_array or not isinstance(data_array, list):
return {
"success": False,
"error": "未找到要创建的数据数组。请确保上游节点返回了 'products''items' 数组,或者直接传入数组数据。"
}
if len(data_array) == 0:
return {
"success": True,
"message": "数据数组为空,未创建任何记录",
"created_count": 0,
"created_records": []
}
# 批量创建记录
created_records = []
failed_records = []
for index, item_data in enumerate(data_array):
try:
# 确保 item_data 是字典
if not isinstance(item_data, dict):
failed_records.append({
"index": index,
"data": item_data,
"error": "数据项不是字典格式"
})
continue
# 目标记录数据
record_data = {}
mapped_fields = set()
# 1. 优先用 field_mapfrom→to
if field_map:
mapped_result = jingrow.map_fields_by_labels(field_map, item_data, label2field)
record_data.update(mapped_result)
mapped_fields.update([m.get("from") for m in field_map if m.get("from")])
# 同时标记目标字段为已映射,防止被自动映射覆盖
for mapping in field_map:
to_field = mapping.get("to")
if to_field:
mapped_fields.add(to_field)
# 调试:检查哪些字段映射失败了
failed_mappings = []
for mapping in field_map:
from_field = mapping.get("from")
to_field = mapping.get("to")
if from_field in item_data and from_field not in mapped_result:
# 字段映射失败
to_fieldname = label2field.get(str(to_field).strip())
failed_mappings.append({
"from": from_field,
"to_label": to_field,
"to_fieldname": to_fieldname,
"value": item_data.get(from_field)
})
# 2. 自动标签映射:处理未在 field_map 中映射的字段
for label, value in item_data.items():
if label in mapped_fields:
continue
fieldname = label2field.get(str(label).strip())
if fieldname is not None and fieldname not in record_data:
record_data[fieldname] = value
mapped_fields.add(label)
# 3. 处理特殊必填字段
# 对于 Jsite Product需要 site 字段
if record_type == "Jsite Product":
# 检查 site 字段是否已映射
site_fieldname = None
for label, fieldname in label2field.items():
if "网站" in str(label) or "site" in str(label).lower() or fieldname == "site":
site_fieldname = fieldname
break
if not site_fieldname:
site_fieldname = "site"
if site_fieldname not in record_data:
if default_site:
# 验证 default_site 不是 URL
site_value = str(default_site).strip()
if site_value.startswith(('http://', 'https://')):
# 如果是 URL尝试提取域名部分作为提示
from urllib.parse import urlparse
parsed = urlparse(site_value)
domain = parsed.netloc or parsed.path.split('/')[0]
# 不设置值,让创建失败并提示用户
# 但记录到调试信息中
pass
else:
# 使用配置的网站名称
record_data[site_fieldname] = site_value
else:
# 如果没有配置 default_site在错误信息中提示
# 但继续处理,让创建失败时能看到完整的错误信息
pass
# 确保 title 字段存在(如果数据中有 title 但映射失败)
# 检查所有可能的 title 字段名变体
title_fieldnames = ["title", "name"] # title 和 name 都可能是标题字段
title_mapped = False
for title_field in title_fieldnames:
if title_field in record_data and record_data[title_field]:
title_mapped = True
break
if not title_mapped and "title" in item_data and item_data.get("title"):
# 尝试多种方式映射 title
# 1. 查找 label2field 中 label 包含 "标题" 或 "title" 的字段
for label, fieldname in label2field.items():
if ("标题" in str(label) or "title" in str(label).lower()) and fieldname not in record_data:
record_data[fieldname] = item_data.get("title")
title_mapped = True
break
# 2. 如果还没映射,直接尝试使用 title 或 name 作为字段名
if not title_mapped:
for title_field in title_fieldnames:
if title_field not in record_data:
record_data[title_field] = item_data.get("title")
title_mapped = True
break
# 处理复杂数据类型(数组、对象)的序列化
# 某些字段可能需要转换为 JSON 字符串
# 同时处理字段长度限制,自动截断超长字段
processed_record_data = {}
truncated_fields = [] # 记录被截断的字段
for key, value in record_data.items():
if isinstance(value, (list, dict)):
# 如果是数组或对象,尝试转换为 JSON 字符串
# 但先检查目标字段类型,如果是 JSON 类型字段,可能需要保持原样
try:
# 先尝试保持原样,如果创建失败再转换
processed_record_data[key] = value
except:
processed_record_data[key] = json.dumps(value, ensure_ascii=False)
else:
# 处理字符串字段的长度限制
if isinstance(value, str) and value:
# 对于 title 和 name 字段,强制应用 140 字符限制(根据错误信息)
if key in ['title', 'name'] and len(value) > 140:
original_length = len(value)
processed_record_data[key] = value[:140]
truncated_fields.append({
"field": key,
"field_label": field_metadata.get(key, {}).get('label', key),
"original_length": original_length,
"max_length": 140,
"truncated_value": processed_record_data[key],
"truncation_method": "强制截断title/name 字段默认 140 字符)"
})
else:
# 检查字段是否有长度限制
field_info = field_metadata.get(key, {})
max_length = field_info.get('max_length')
# 如果字段元数据中没有,尝试通过 label2field 查找对应的字段
if not max_length:
# 尝试查找字段的 label然后通过 label 查找元数据
for label, fieldname in label2field.items():
if fieldname == key:
# 找到了对应的 label尝试查找元数据
field_info_from_label = field_metadata.get(fieldname, {})
if not max_length:
max_length = field_info_from_label.get('max_length')
break
# 执行截断
if max_length and isinstance(max_length, (int, float)):
max_length_int = int(max_length)
if len(value) > max_length_int:
# 自动截断超长字段
original_length = len(value)
processed_record_data[key] = value[:max_length_int]
truncated_fields.append({
"field": key,
"field_label": field_info.get('label', key),
"original_length": original_length,
"max_length": max_length_int,
"truncated_value": processed_record_data[key],
"truncation_method": "元数据截断"
})
else:
processed_record_data[key] = value
else:
processed_record_data[key] = value
else:
processed_record_data[key] = value
# 调用 Jingrow 创建记录
try:
# 直接调用 Page.create 以获取详细的错误信息
from jingrow.model.page import get_page_instance
pg = get_page_instance(record_type)
result = pg.create(processed_record_data)
# 初始化 error_msg确保在所有情况下都有值
error_msg = '创建记录失败'
if not isinstance(result, dict) or not result.get('success'):
# 获取详细错误信息
error_msg = result.get('error', '创建记录失败')
error_detail = result.get('detail', '')
if error_detail:
error_msg = f"{error_msg}: {error_detail}"
# 添加更详细的调试信息
debug_info = {
"record_type": record_type,
"mapped_fields_count": len(processed_record_data),
"field_names": list(processed_record_data.keys()),
"response": result,
"label2field_sample": dict(list(label2field.items())[:10]) if label2field else {},
"default_site_configured": bool(default_site),
"default_site_value": default_site if default_site else None,
"site_field_in_record_data": site_fieldname in processed_record_data if 'site_fieldname' in locals() else False,
"field_metadata_sample": dict(list(field_metadata.items())[:5]) if field_metadata else {},
"title_field_length": len(processed_record_data.get('title', '')) if 'title' in processed_record_data else None,
"title_field_in_metadata": 'title' in field_metadata,
"title_max_length": field_metadata.get('title', {}).get('max_length') if 'title' in field_metadata else None
}
# 如果有字段被截断,添加到调试信息
if 'truncated_fields' in locals() and truncated_fields:
debug_info["truncated_fields"] = truncated_fields
else:
# 如果没有截断信息,但 title 字段存在且超长,说明截断逻辑可能没有执行
if 'title' in processed_record_data:
title_value = processed_record_data['title']
if isinstance(title_value, str) and len(title_value) > 140:
debug_info["truncation_warning"] = f"title 字段长度为 {len(title_value)},超过 140但未执行截断。可能原因max_length 未正确获取。"
# 如果 default_site 是 URL添加警告
if default_site and str(default_site).strip().startswith(('http://', 'https://')):
debug_info["site_config_warning"] = "default_site 配置看起来是 URL但 site 字段需要的是网站记录的名称name不是 URL。请检查网站设置找到网站记录的名称并更新配置。"
# 如果有失败的映射,添加到调试信息中
if 'failed_mappings' in locals():
debug_info["failed_mappings"] = failed_mappings
failed_records.append({
"index": index,
"data": item_data,
"error": error_msg,
"record_data": processed_record_data,
"debug": debug_info
})
continue
created = result.get('data', {})
if not created:
debug_info = {
"record_type": record_type,
"response": result
}
if 'truncated_fields' in locals() and truncated_fields:
debug_info["truncated_fields"] = truncated_fields
failed_records.append({
"index": index,
"data": item_data,
"error": "创建记录失败:返回数据为空",
"record_data": processed_record_data,
"debug": debug_info
})
continue
# 记录成功创建,但如果有字段被截断,也记录下来
if 'truncated_fields' in locals() and truncated_fields:
# 在成功创建的情况下,截断信息可以通过返回结果查看
pass
except Exception as create_error:
debug_info = {
"record_type": record_type,
"mapped_fields_count": len(processed_record_data),
"field_names": list(processed_record_data.keys()),
"exception_type": type(create_error).__name__
}
if 'truncated_fields' in locals() and truncated_fields:
debug_info["truncated_fields"] = truncated_fields
failed_records.append({
"index": index,
"data": item_data,
"error": f"创建记录异常: {str(create_error)}",
"record_data": processed_record_data,
"debug": debug_info
})
continue
created_name = created.get("name") or record_data.get("name")
record_info = {
"index": index,
"name": created_name,
"pagetype": record_type
}
# 如果有字段被截断,添加到记录信息中
if 'truncated_fields' in locals() and truncated_fields:
record_info["truncated_fields"] = truncated_fields
created_records.append(record_info)
except Exception as e:
failed_records.append({
"index": index,
"data": item_data,
"error": str(e)
})
continue
# 构建返回结果
result = {
"success": True,
"total_count": len(data_array),
"created_count": len(created_records),
"failed_count": len(failed_records),
"created_records": created_records
}
if failed_records:
result["failed_records"] = failed_records
result["message"] = f"成功创建 {len(created_records)} 条记录,失败 {len(failed_records)}"
else:
result["message"] = f"成功创建 {len(created_records)} 条记录"
return result
except Exception as e:
# 构建详细的错误信息
error_details = {
"error": str(e),
"record_type": record_type if 'record_type' in locals() else None,
"inputs": inputs if 'inputs' in locals() else {},
"field_map": field_map if 'field_map' in locals() else []
}
print(f"BatchCreateRecords异常: {json.dumps(error_details, ensure_ascii=False, indent=2)}")
return {
"success": False,
"error": str(e),
"error_details": error_details
}

View File

@ -0,0 +1,15 @@
[project]
name = "jingrow-node-web-scrapers"
version = "1.0.0"
requires-python = ">=3.10"
dependencies = [
"beautifulsoup4",
"crawl4ai",
"playwright",
]
[tool.jingrow.after_install]
commands = [
"python -m playwright install-deps",
"python -m playwright install"
]

View File

@ -0,0 +1,181 @@
{
"metadata": {
"type": "web_scrapers",
"label": "网站采集",
"icon": "fa-spider",
"color": "rgba(71, 180, 133, 1)",
"description": "基于 Crawl4AI 采集网站产品信息(标题、价格、描述、图片等),支持单产品详情页和产品列表页两种模式",
"group": "数据",
"component_type": "GenericNode"
},
"properties": {
"url": {
"type": "string",
"title": "目标URL",
"description": "要采集的网站URL支持 Jinja2 模板语法,如:{{ product_url }}",
"format": "uri",
"minLength": 1
},
"crawl_mode": {
"type": "string",
"title": "爬取模式",
"description": "选择爬取模式single单产品详情页或 list产品列表页支持分页",
"default": "single",
"enum": [
"single",
"list"
],
"enumNames": [
"单产品详情页",
"产品列表页"
]
},
"max_pages": {
"type": "integer",
"title": "最大页数",
"description": "列表页模式下的最大爬取页数(防止无限循环)",
"default": 100,
"minimum": 1,
"maximum": 1000
},
"enrich_details": {
"type": "boolean",
"title": "补充产品详情",
"description": "列表页模式下是否爬取每个产品的详情页以补充详细信息图片、价格、公司信息、Product Details等"
},
"use_ai_extraction": {
"type": "boolean",
"title": "使用AI提取",
"description": "如果启用将使用AI模型从网页内容中智能提取结构化产品信息在手动提取的基础上进一步优化"
},
"ai_system_message": {
"type": "string",
"title": "系统消息",
"description": "定义AI的角色和行为",
"default": "你是一个专业的产品信息提取助手,请从网页内容中准确提取产品信息。",
"format": "textarea",
"rows": 5
},
"text_model": {
"type": "string",
"title": "AI模型",
"description": "选择要使用的AI模型与 ai_content_generation 节点保持一致)",
"default": "Jingrow",
"enum": [
"Jingrow",
"DeepSeek",
"Doubao",
"ChatGPT",
"Qwen",
"Wenxin",
"Hunyuan",
"Spark"
],
"enumNames": [
"Jingrow",
"DeepSeek",
"Doubao",
"ChatGPT",
"Qwen",
"文心",
"混元",
"讯飞星火"
]
},
"max_tokens": {
"type": "integer",
"title": "最大Token数",
"description": "限制生成内容的最大长度",
"default": 2000
},
"ai_temperature": {
"type": "number",
"title": "随机性 (Temperature)",
"description": "控制输出的随机性0为确定性2为最大随机性",
"default": 0.7
},
"ai_top_p": {
"type": "number",
"title": "多样性 (Top-p)",
"description": "控制词汇选择的多样性",
"default": 0.9
},
"hide_input_handle": {
"type": "boolean",
"title": "隐藏输入接口",
"description": "是否隐藏节点的输入连接点"
}
},
"required": [
"url"
],
"_layout": {
"tabs": [
{
"id": "tab_1",
"label": "基础配置",
"sections": [
{
"id": "section_1",
"label": "基本属性",
"columns": [
{
"id": "column_1",
"label": "主要设置",
"fields": [
"url",
"crawl_mode",
"max_pages",
"enrich_details",
"use_ai_extraction",
"ai_system_message"
]
}
]
}
]
},
{
"id": "tab_2",
"label": "AI模型设置",
"sections": [
{
"id": "section_2",
"label": "模型参数",
"columns": [
{
"id": "column_2",
"label": "模型配置",
"fields": [
"text_model",
"max_tokens",
"ai_temperature",
"ai_top_p"
]
}
]
}
]
},
{
"id": "tab_3",
"label": "节点设置",
"sections": [
{
"id": "section_3",
"label": "节点接口",
"columns": [
{
"id": "column_3",
"fields": [
"hide_input_handle"
]
}
]
}
]
}
],
"activeTab": "tab_3"
}
}

View File

@ -0,0 +1,982 @@
import json
import re
import asyncio
from typing import Dict, Any, Optional, List
from urllib.parse import urljoin, urlparse, parse_qs, urlencode
import jingrow
from jingrow.utils.jinja import render_template
from bs4 import BeautifulSoup
from crawl4ai import AsyncWebCrawler
# ==================== 产品列表页提取函数 ====================
def extract_current_category(soup):
"""提取当前分类名称"""
# 方法1: 查找 div.sr-txt-title 中的 h1.sr-txt-h2
title_div = soup.find('div', class_='sr-txt-title')
if title_div:
h1_tag = title_div.find('h1', class_='sr-txt-h2')
if h1_tag:
category_name = h1_tag.get_text(strip=True)
if category_name:
return category_name
# 方法2: 直接查找 h1.sr-txt-h2
category_title = soup.find('h1', class_='sr-txt-h2')
if category_title:
category_name = category_title.get_text(strip=True)
if category_name:
return category_name
# 方法3: 查找包含 itemscope 的 div.sr-txt-title 中的 h1
title_divs = soup.find_all('div', class_='sr-txt-title')
for div in title_divs:
if div.get('itemscope'):
h1_tag = div.find('h1')
if h1_tag:
category_name = h1_tag.get_text(strip=True)
if category_name:
return category_name
return None
def extract_products_from_page(soup, base_url=''):
"""从单个页面提取产品信息"""
products = []
# 找到所有产品项容器
product_items = soup.find_all('div', class_='prod-result-item')
# 遍历每个产品项,提取信息
for item in product_items:
product = {}
# 提取产品ID从data-prodid属性
product['product_id'] = item.get('data-prodid', '')
# 提取产品标题和链接(从 prod-title 中的 a 标签)
title_div = item.find('div', class_='prod-title')
if title_div:
title_link = title_div.find('a')
if title_link:
product['title'] = title_link.get_text(strip=True)
product['url'] = title_link.get('href', '')
# 确保URL是完整的
if product['url'] and not product['url'].startswith('http'):
if product['url'].startswith('//'):
product['url'] = 'https:' + product['url']
elif product['url'].startswith('/'):
if base_url:
parsed = urlparse(base_url)
product['url'] = f"{parsed.scheme}://{parsed.netloc}" + product['url']
else:
# 从当前页面URL推断基础URL
product['url'] = base_url + product['url']
# 提取产品价格(从 prod-price 中)
price_div = item.find('div', class_='prod-price')
if price_div:
# 方法1: 从title属性获取完整价格信息
price_title = price_div.get('title', '')
if price_title:
product['price'] = price_title
else:
# 方法2: 从内部文本提取
price_value = price_div.find('span', class_='value')
price_unit = price_div.find('span', class_='unit')
if price_value:
price_text = price_value.get_text(strip=True)
if price_unit:
price_text += ' ' + price_unit.get_text(strip=True)
product['price'] = price_text
# 提取最小订购量(从 min-order 中)
min_order_div = item.find('div', class_='min-order')
if min_order_div:
# 从title属性获取
min_order_title = min_order_div.get('title', '')
if min_order_title:
product['min_order'] = min_order_title
else:
# 从内部文本提取
min_order_value = min_order_div.find('span', class_='value')
if min_order_value:
product['min_order'] = min_order_value.get_text(strip=True)
# 提取产品图片(从 prod-image 中的 img 标签)
image_div = item.find('div', class_='prod-image')
if image_div:
img_tag = image_div.find('img')
if img_tag:
# 优先使用 data-original其次使用 src
img_url = img_tag.get('data-original') or img_tag.get('src', '')
if img_url:
# 确保URL是完整的
if img_url.startswith('//'):
img_url = 'https:' + img_url
elif img_url.startswith('/'):
if base_url:
parsed = urlparse(base_url)
img_url = f"{parsed.scheme}://{parsed.netloc}" + img_url
else:
img_url = base_url + img_url
product['image'] = img_url
product['image_alt'] = img_tag.get('alt', '')
# 检查是否有视频标记
video_mark = item.find('div', class_='prod-video-mark')
product['has_video'] = video_mark is not None
# 只添加有标题的产品(确保数据完整)
if product.get('title'):
products.append(product)
return products
def get_next_page_url(soup, current_url):
"""检查是否有下一页如果有则返回下一页URL"""
# 查找分页器
pager = soup.find('div', class_='pager')
if not pager:
return None
# 方法1: 查找 class 包含 "next" 的 a 标签
# 排除包含 "disClick" 的(禁用状态,表示没有下一页)
next_links = pager.find_all('a', class_=lambda x: x and 'next' in x if x else False)
for next_link in next_links:
# 检查是否被禁用
classes = next_link.get('class', [])
if 'disClick' in classes:
continue # 跳过禁用的链接
# 获取href属性
next_url = next_link.get('href')
if not next_url:
continue
# 过滤掉 JavaScript URL
if next_url.startswith('javascript:'):
# 尝试从 JavaScript 函数中提取页码,然后构建 URL
# 例如javascript:submitSearchByPage(2) -> 提取页码 2
page_match = re.search(r'submitSearchByPage\((\d+)\)', next_url)
if page_match:
page_num = page_match.group(1)
# 尝试从当前 URL 构建下一页 URL
# 查找当前 URL 中的页码参数
parsed = urlparse(current_url)
query_params = {}
if parsed.query:
query_params = parse_qs(parsed.query)
# 更新页码参数
# 常见的页码参数名page, p, pageNum, pageNumber, pn
page_param_found = False
for param_name in ['page', 'p', 'pageNum', 'pageNumber', 'pn', 'currentPage']:
if param_name in query_params:
query_params[param_name] = [page_num]
page_param_found = True
break
# 如果没有找到页码参数,尝试添加
if not page_param_found:
query_params['page'] = [page_num]
# 重建 URL
new_query = urlencode(query_params, doseq=True)
next_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{new_query}"
else:
# 无法从 JavaScript URL 提取页码,跳过
continue
# 确保URL是完整的
if next_url.startswith('//'):
next_url = 'https:' + next_url
elif next_url.startswith('/'):
# 从当前URL提取基础URL
parsed = urlparse(current_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
next_url = urljoin(base_url, next_url)
elif not next_url.startswith(('http://', 'https://', 'file://', 'raw:')):
# 相对路径
parsed = urlparse(current_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
next_url = urljoin(base_url, next_url)
# 验证 URL 格式
if not next_url.startswith(('http://', 'https://', 'file://', 'raw:')):
continue
# 确保下一页URL与当前URL不同
if next_url != current_url:
return next_url
# 方法2: 如果方法1失败尝试查找所有分页链接找到当前页的下一个
all_page_links = pager.find_all('a')
current_page_num = None
# 尝试从当前 URL 提取页码
parsed = urlparse(current_url)
if parsed.query:
query_params = parse_qs(parsed.query)
for param_name in ['page', 'p', 'pageNum', 'pageNumber', 'pn', 'currentPage']:
if param_name in query_params:
try:
current_page_num = int(query_params[param_name][0])
break
except (ValueError, IndexError):
pass
# 如果找到了当前页码,尝试找到下一页
if current_page_num is not None:
next_page_num = current_page_num + 1
# 查找包含下一页数字的链接
for link in all_page_links:
link_text = link.get_text(strip=True)
try:
if int(link_text) == next_page_num:
href = link.get('href')
if href and not href.startswith('javascript:'):
# 构建下一页 URL
query_params = parse_qs(parsed.query)
for param_name in ['page', 'p', 'pageNum', 'pageNumber', 'pn', 'currentPage']:
if param_name in query_params:
query_params[param_name] = [str(next_page_num)]
break
else:
query_params['page'] = [str(next_page_num)]
new_query = urlencode(query_params, doseq=True)
next_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{new_query}"
if next_url != current_url:
return next_url
except ValueError:
continue
return None
# ==================== 产品详情页提取函数 ====================
def extract_product_detail_images(soup, base_url=''):
"""从产品详情页提取所有图片"""
images = []
# 查找图片容器
slide_page = soup.find('div', class_='sr-proMainInfo-slide-page')
if slide_page:
# 查找所有图片项
slide_ul = slide_page.find('ul', class_='sr-proMainInfo-slide-pageUl')
if slide_ul:
# 查找所有 li.J-pic-dot
pic_items = slide_ul.find_all('li', class_='J-pic-dot')
for item in pic_items:
img_tag = item.find('img')
if img_tag:
img_url = img_tag.get('data-original') or img_tag.get('src', '')
if img_url:
# 确保URL是完整的
if img_url.startswith('//'):
img_url = 'https:' + img_url
elif img_url.startswith('/'):
if base_url:
parsed = urlparse(base_url)
img_url = f"{parsed.scheme}://{parsed.netloc}" + img_url
else:
img_url = base_url + img_url
if img_url not in images:
images.append(img_url)
return images
def extract_product_detail_prices(soup):
"""从产品详情页提取价格信息(统一格式:{"US$718.00":"5-19"}"""
prices = []
# 方法1: 提取swiper格式的价格多个价格项
swiper_wrapper = soup.find('div', class_='swiper-wrapper-div')
if swiper_wrapper:
# 查找所有价格项
slide_divs = swiper_wrapper.find_all('div', class_='swiper-slide-div')
for slide_div in slide_divs:
# 查找价格容器
money_container = slide_div.find('div', class_='swiper-money-container')
# 查找数量容器
unit_container = slide_div.find('div', class_='swiper-unit-container')
if money_container and unit_container:
price_text = money_container.get_text(strip=True)
unit_text = unit_container.get_text(strip=True)
# 提取数量范围(移除"Pieces"等词,只保留数字范围)
# 使用正则表达式提取数字和符号(如 "5-19", "50+", "20-49"
# 匹配数字范围格式:数字-数字 或 数字+
unit_match = re.search(r'(\d+\s*[-+]\s*\d+|\d+\+)', unit_text)
if unit_match:
unit_text = unit_match.group(1).strip()
else:
# 如果没匹配到,移除"Pieces"等词,提取数字
unit_text = unit_text.replace('Pieces', '').replace('Piece', '').strip()
# 提取数字,如果没有范围,添加+号表示最小数量
num_match = re.search(r'(\d+)', unit_text)
if num_match:
unit_text = num_match.group(1) + '+'
unit_text = ' '.join(unit_text.split()) # 清理多余空格
if price_text and unit_text:
# 格式: {"US$718.00":"5-19"}
price_obj = {price_text: unit_text}
prices.append(price_obj)
# 方法2: 如果swiper格式没有找到尝试提取only-one-price格式单个价格范围
if not prices:
only_one_price = soup.find('div', class_='only-one-priceNum')
if only_one_price:
# 提取价格范围
price_td = only_one_price.find('td')
if price_td:
price_span = price_td.find('span', class_='only-one-priceNum-td-left')
if price_span:
price_text = price_span.get_text(strip=True)
# 提取数量信息
price_info_td = only_one_price.find('td', class_='sa-only-property-price')
unit_text = ''
has_moq = False
if price_info_td:
# 先检查整个容器的文本是否包含MOQ
container_text = price_info_td.get_text(strip=True)
if 'MOQ' in container_text or 'moq' in container_text.lower():
has_moq = True
# 查找所有span标签
quantity_spans = price_info_td.find_all('span')
for span in quantity_spans:
span_text = span.get_text(strip=True)
# 提取数字(可能是 "5 Pieces" 或 "5"
num_match = re.search(r'(\d+)', span_text)
if num_match:
unit_text = num_match.group(1)
# 检查当前span是否包含MOQ
if 'MOQ' in span_text or 'moq' in span_text.lower():
has_moq = True
break
# 如果提取到了价格,即使没有数量信息也保存(数量设为空或默认值)
if price_text:
# 如果没有提取到数量,使用默认值"1+"表示最小订购量为1
if not unit_text:
unit_text = '1+'
# only-one-price格式通常表示最小订购量默认添加+号
elif not unit_text.endswith('+'):
unit_text += '+'
# 统一格式: {"US$1,018.00 - 1,118.00":"5+"}
price_obj = {price_text: unit_text}
prices.append(price_obj)
return prices
def extract_company_info(soup, base_url=''):
"""从产品详情页提取公司信息"""
company_info = {
'company_name': '',
'company_url': ''
}
# 查找公司信息容器
com_div = soup.find('div', class_='sr-com')
if com_div:
# 查找公司信息
com_info = com_div.find('div', class_='sr-com-info')
if com_info:
# 查找公司标题
title_div = com_info.find('div', class_='title-txt')
if title_div:
company_link = title_div.find('a')
if company_link:
company_info['company_name'] = company_link.get('title', '') or company_link.get_text(strip=True)
company_info['company_url'] = company_link.get('href', '')
# 确保URL是完整的
if company_info['company_url'] and not company_info['company_url'].startswith('http'):
if company_info['company_url'].startswith('//'):
company_info['company_url'] = 'https:' + company_info['company_url']
elif company_info['company_url'].startswith('/'):
if base_url:
parsed = urlparse(base_url)
company_info['company_url'] = f"{parsed.scheme}://{parsed.netloc}" + company_info['company_url']
else:
company_info['company_url'] = base_url + company_info['company_url']
return company_info
def extract_product_details(soup):
"""从产品详情页提取Product Details表格信息"""
details = {}
# 方法1: 查找包含"Product Details"文本的元素
details_elements = soup.find_all(string=lambda text: text and 'Product Details' in text)
for details_text in details_elements:
# 向上查找包含表格的容器
parent = details_text.find_parent()
# 在父元素的后续兄弟元素中查找表格
current = parent
for _ in range(5): # 最多查找5层
if current:
# 在当前元素及其后续兄弟中查找表格
table = current.find('table')
if not table:
# 查找下一个兄弟元素
next_sibling = current.find_next_sibling()
if next_sibling:
table = next_sibling.find('table')
if table:
# 提取表格数据
rows = table.find_all('tr')
for row in rows:
th = row.find('th')
td = row.find('td')
if th and td:
key = th.get_text(strip=True).replace(':', '').strip()
value = td.get_text(strip=True)
if key and value:
details[key] = value
break # 找到表格后退出
# 向上查找
current = current.find_parent()
else:
break
# 方法2: 如果没找到直接查找所有包含th和td的表格
if not details:
tables = soup.find_all('table')
for table in tables:
rows = table.find_all('tr')
# 检查表格是否包含Product Details相关的key
product_detail_keys = ['Customization', 'Type', 'Material', 'Application', 'Max Bear Weight', 'Warranty']
for row in rows:
th = row.find('th')
td = row.find('td')
if th and td:
key = th.get_text(strip=True).replace(':', '').strip()
value = td.get_text(strip=True)
# 如果key匹配产品详情的关键词则提取
if key and value and any(keyword in key for keyword in product_detail_keys):
details[key] = value
return details
def extract_product_title(soup):
"""提取产品标题"""
# 优先级h1.sr-proMainInfo-baseInfoH1 > span > og:title > title标签
h1_tag = soup.find('h1', class_='sr-proMainInfo-baseInfoH1')
if h1_tag:
title_span = h1_tag.find('span')
if title_span:
return title_span.get_text(strip=True)
# 如果h1没有找到尝试从og:title提取
og_title = soup.find('meta', property='og:title')
if og_title and og_title.get('content'):
return og_title.get('content').replace('[Hot Item]', '').strip()
# 如果还是没有从title标签提取
title_tag = soup.find('title')
if title_tag:
title_text = title_tag.get_text(strip=True)
# 移除 "- Smith Machine and Bench Press price" 等后缀
if ' - ' in title_text:
return title_text.split(' - ')[0].strip()
else:
return title_text
return ''
# ==================== 爬取函数 ====================
async def crawl_product_detail(crawler, product_url, product_index=None, total_products=None, base_url=''):
"""爬取单个产品详情页并提取详细信息"""
try:
result = await crawler.arun(url=product_url, wait_for="networkidle", page_timeout=30000)
if not result.success:
return {}
soup = BeautifulSoup(result.html, 'html.parser')
detail_info = {}
# 提取标题
title = extract_product_title(soup)
if title:
detail_info['title'] = title
# 提取图片
images = extract_product_detail_images(soup, base_url)
if images:
detail_info['images'] = images
# 提取价格
prices = extract_product_detail_prices(soup)
if prices:
detail_info['prices'] = prices
# 提取公司信息
company_info = extract_company_info(soup, base_url)
if company_info.get('company_name'):
detail_info['company_name'] = company_info['company_name']
detail_info['company_url'] = company_info['company_url']
# 提取Product Details
product_details = extract_product_details(soup)
if product_details:
detail_info['Product Details'] = product_details
return detail_info
except Exception as e:
return {}
async def crawl_single_list_page(crawler, url, page_num, base_url=''):
"""爬取单个产品列表页"""
result = await crawler.arun(url=url, wait_for="networkidle")
if not result.success:
return None, None, None
soup = BeautifulSoup(result.html, 'html.parser')
# 提取产品
products = extract_products_from_page(soup, base_url)
# 检查是否有下一页
next_url = get_next_page_url(soup, url)
# 只在第一页提取分类名称
category_name = None
if page_num == 1:
category_name = extract_current_category(soup)
return products, next_url, category_name
async def crawl_product_list(crawler, start_url, max_pages=100, enrich_details=False, base_url=''):
"""爬取产品列表(支持分页)"""
all_products = []
current_url = start_url
page_num = 1
category_name = None
while current_url and page_num <= max_pages:
# 爬取当前页
products, next_url, cat_name = await crawl_single_list_page(crawler, current_url, page_num, base_url)
# 保存第一页提取的分类名称
if page_num == 1 and cat_name:
category_name = cat_name
if products:
all_products.extend(products)
else:
break
# 检查是否有下一页
if next_url:
current_url = next_url
page_num += 1
# 添加延迟,避免请求过快
await asyncio.sleep(1)
else:
break
# 如果需要补充详情,为每个产品爬取详情页
if enrich_details and all_products:
for i, product in enumerate(all_products, 1):
if not product.get('url'):
continue
# 爬取产品详情
detail_info = await crawl_product_detail(
crawler,
product['url'],
i,
len(all_products),
base_url
)
# 合并详细信息到产品数据
if detail_info:
# 更新图片(使用详情页的图片列表)
if 'images' in detail_info:
product['images'] = detail_info['images']
# 保留第一张图片作为主图
if detail_info['images']:
product['image'] = detail_info['images'][0]
# 更新价格(使用详情页的价格)
if 'prices' in detail_info:
product['prices'] = detail_info['prices']
# 添加公司信息
if 'company_name' in detail_info:
product['company_name'] = detail_info['company_name']
if 'company_url' in detail_info:
product['company_url'] = detail_info['company_url']
# 添加Product Details
if 'Product Details' in detail_info:
product['Product Details'] = detail_info['Product Details']
# 添加延迟,避免请求过快
if i < len(all_products):
await asyncio.sleep(0.5)
# 将分类信息添加到结果中
result = {
'products': all_products,
'category_name': category_name,
'total_pages': page_num,
'total_products': len(all_products)
}
return result
# ==================== 节点执行函数 ====================
def get_branch_upstream_nodes(current_node_id, flow_data):
"""获取同一分支的所有上游节点ID列表"""
if not current_node_id or not flow_data:
return []
edges = flow_data.get("edges", [])
upstream_nodes = set()
stack = [current_node_id]
while stack:
node_id = stack.pop()
for edge in edges:
if edge.get("target") == node_id:
source_id = edge.get("source")
if source_id and source_id not in upstream_nodes:
upstream_nodes.add(source_id)
stack.append(source_id)
return list(upstream_nodes)
def execute(context=None, inputs=None, config=None, **kwargs):
"""
网站产品信息采集节点 - 基于 Crawl4AI
支持两种模式
1. 单产品详情页模式爬取单个产品详情页
2. 产品列表页模式爬取产品列表支持分页可选择是否补充详情
"""
if context is None:
context = kwargs.get("context", {})
if inputs is None:
inputs = kwargs.get("inputs", {})
if config is None:
config = kwargs.get("config", {})
try:
# 参数处理
if not isinstance(inputs, dict):
try:
inputs = json.loads(inputs) if isinstance(inputs, str) else {}
except Exception:
inputs = {}
# 获取配置
url = config.get("url") or inputs.get("url")
if not url:
return {"success": False, "error": "URL 必填"}
# 支持从上游节点获取 URL
current_node_id = context.get("current_node_id")
flow_data = context.get("flow_data", {})
branch_upstream_nodes = get_branch_upstream_nodes(current_node_id, flow_data)
if context and "node_results" in context and branch_upstream_nodes:
node_results = context["node_results"]
for upstream_node_id in branch_upstream_nodes:
if upstream_node_id in node_results:
upstream_result = node_results[upstream_node_id]
if isinstance(upstream_result, dict):
# 解析 AI 原始响应中的 JSON
if "ai_raw_response" in upstream_result and isinstance(upstream_result["ai_raw_response"], str):
try:
raw_response = upstream_result["ai_raw_response"].strip()
start_brace = raw_response.find('{')
end_brace = raw_response.rfind('}')
if start_brace != -1 and end_brace != -1 and end_brace > start_brace:
json_content = raw_response[start_brace:end_brace + 1]
parsed_json = json.loads(json_content)
if isinstance(parsed_json, dict):
for k, v in parsed_json.items():
if k not in inputs or not inputs.get(k):
inputs[k] = v
except:
pass
# 平铺其他字段到 inputs
for k, v in upstream_result.items():
if k not in inputs or not inputs.get(k):
inputs[k] = v
# 如果 inputs 中有 url优先使用
if inputs.get("url") and not url:
url = inputs.get("url")
# 构建模板上下文(支持 Jinja2 模板)
template_context = inputs.copy()
if context and context.get("flow_id"):
flow_id = context["flow_id"]
agent_data = jingrow.get_pg("Local Ai Agent", str(flow_id))
if agent_data:
for key, value in agent_data.items():
if key not in template_context or not template_context.get(key):
template_context[key] = value
# 如果 URL 包含模板语法,进行渲染
try:
url = render_template(url, template_context)
except Exception as e:
return {"success": False, "error": f"URL 模板渲染失败: {str(e)}"}
# URL 验证和规范化
if not url or not isinstance(url, str):
return {"success": False, "error": "URL 不能为空"}
url = url.strip()
if not url:
return {"success": False, "error": "URL 不能为空"}
# 确保 URL 有协议前缀
if not url.startswith(('http://', 'https://', 'file://', 'raw:')):
if url.startswith('//'):
url = 'https:' + url
elif not url.startswith('/'):
url = 'https://' + url
else:
return {"success": False, "error": f"URL 格式不正确: {url}"}
# 获取模式配置
crawl_mode = config.get("crawl_mode", "single") # "single" 或 "list"
max_pages = config.get("max_pages", 100) # 列表页模式的最大页数
enrich_details = config.get("enrich_details", False) # 是否补充产品详情
# AI 提取配置(仅用于单产品模式)
use_ai_extraction = config.get("use_ai_extraction", False)
text_model = config.get("text_model", "Jingrow")
ai_system_message = config.get("ai_system_message", "你是一个专业的产品信息提取助手,请从网页内容中准确提取产品信息。")
max_tokens = config.get("max_tokens", 2000)
ai_temperature = config.get("ai_temperature", 0.7)
ai_top_p = config.get("ai_top_p", 0.9)
# 使用 Crawl4AI 进行爬取
try:
from crawl4ai import AsyncWebCrawler
import asyncio
except ImportError:
return {
"success": False,
"error": "Crawl4AI 未安装,请运行: uv pip install crawl4ai"
}
async def crawl_website():
async with AsyncWebCrawler(verbose=True) as crawler:
# 提取基础URL用于相对路径处理
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
if crawl_mode == "list":
# 产品列表页模式
result = await crawl_product_list(
crawler,
url,
max_pages=max_pages,
enrich_details=enrich_details,
base_url=base_url
)
# 构建返回结果
return {
"success": True,
"url": url,
"mode": "list",
"category_name": result.get('category_name'),
"total_pages": result.get('total_pages', 0),
"total_products": result.get('total_products', 0),
"products": result.get('products', [])
}
else:
# 单产品详情页模式
result = await crawler.arun(
url=url,
wait_for="networkidle",
page_timeout=30000,
)
if not result.success:
return {
"success": False,
"error": f"爬取失败: {result.error_message or '未知错误'}"
}
html_content = result.html
markdown_content = result.markdown
# 初始化 BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# 提取产品信息
extracted_data = {}
# 提取标题
title = extract_product_title(soup)
if title:
extracted_data['title'] = title
# 提取图片
images = extract_product_detail_images(soup, base_url)
if images:
extracted_data['image'] = images
# 提取价格
prices = extract_product_detail_prices(soup)
if prices:
extracted_data['price'] = prices
# 提取公司信息
company_info = extract_company_info(soup, base_url)
if company_info.get('company_name'):
extracted_data['company_name'] = company_info['company_name']
extracted_data['url'] = company_info['company_url']
# 提取Product Details
product_details = extract_product_details(soup)
if product_details:
extracted_data['Product Details'] = product_details
# 如果启用了 AI 提取,使用 AI 模型进一步提取和补充信息
if use_ai_extraction:
try:
from jingrow.utils.jingrow_cloud import call_ai_model
# 构建 AI 提取提示词
ai_prompt = f"""请从以下网页内容中提取产品信息,返回 JSON 格式。
URL: {url}
网页内容Markdown前5000字符:
{markdown_content[:5000]}
已提取的基础信息
{json.dumps(extracted_data, ensure_ascii=False, indent=2)}
请基于网页内容和完善已有信息返回完整的 JSON 格式严格按照以下结构
{{
"title": "产品标题",
"price": [{{"US$877.00": "2-9"}}, {{"US$850.00": "10+"}}], // 价格数组每个元素是 {{"价格": "数量范围"}} 的格式
"image": ["图片URL1", "图片URL2"], // 图片URL数组
"Product Details": "产品详细信息文本",
"company_name": "公司名称",
"url": "公司主页URL"
}}
注意
- price 必须是数组每个元素是对象格式 {{"价格": "数量范围"}}
- image 必须是字符串数组
- 如果某些信息不存在使用空字符串或空数组"""
# 调用 AI 模型
ai_res = call_ai_model(
ai_prompt,
text_model=text_model,
ai_temperature=ai_temperature,
ai_top_p=ai_top_p,
ai_system_message=ai_system_message,
max_tokens=max_tokens,
image_urls=None,
)
if ai_res.get("success"):
ai_response = ai_res.get("response", "")
# 尝试解析 JSON
try:
start_brace = ai_response.find('{')
end_brace = ai_response.rfind('}')
if start_brace != -1 and end_brace != -1 and end_brace > start_brace:
json_str = ai_response[start_brace:end_brace + 1]
ai_data = json.loads(json_str)
if isinstance(ai_data, dict):
# 合并 AI 提取的数据AI 提取的数据优先
for key, value in ai_data.items():
if value is not None and value != "" and (not isinstance(value, list) or len(value) > 0):
extracted_data[key] = value
except json.JSONDecodeError:
pass
except Exception:
pass
# 确保输出格式一致
if "price" in extracted_data:
if not isinstance(extracted_data["price"], list):
if isinstance(extracted_data["price"], str):
extracted_data["price"] = [extracted_data["price"]]
else:
extracted_data["price"] = []
if "image" in extracted_data:
if not isinstance(extracted_data["image"], list):
extracted_data["image"] = []
# 确保所有必需字段存在
final_data = {
"title": extracted_data.get("title", ""),
"price": extracted_data.get("price", []),
"image": extracted_data.get("image", []),
"Product Details": extracted_data.get("Product Details", ""),
"company_name": extracted_data.get("company_name", ""),
"url": extracted_data.get("url", "")
}
# 构建返回结果
return {
"success": True,
"url": url,
"mode": "single",
"extracted_data": final_data
}
# 运行异步爬取
try:
result = asyncio.run(crawl_website())
return result
except Exception as e:
return {
"success": False,
"error": f"爬取执行失败: {str(e)}"
}
except Exception as e:
return {
"success": False,
"error": f"网站采集节点执行失败: {str(e)}"
}

View File

@ -2,11 +2,13 @@
节点依赖管理工具 节点依赖管理工具
支持节点独立管理自己的依赖使用 uv pip install 直接安装 支持节点独立管理自己的依赖使用 uv pip install 直接安装
""" """
import os
import hashlib
import subprocess import subprocess
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Dict, Any, Optional, List from typing import Dict, Any, Optional, List
from jingrow.utils.path import get_jingrow_root from jingrow.utils.path import get_jingrow_root, get_root_path
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -117,10 +119,22 @@ def install_node_dependencies(node_type: str, sync: bool = True) -> Dict[str, An
) )
if result.returncode == 0: if result.returncode == 0:
# 检查是否有后安装脚本需要执行
after_install_result = _run_node_after_install(node_type, node_dir, project_root, content)
message = f"节点 {node_type} 依赖安装成功"
if after_install_result:
if after_install_result.get("success"):
message += f" ({after_install_result.get('message', '后安装脚本执行成功')})"
else:
logger.warning(f"节点 {node_type} 后安装脚本执行失败: {after_install_result.get('error', '未知错误')}")
message += f" (后安装脚本警告: {after_install_result.get('error', '未知错误')})"
return { return {
"success": True, "success": True,
"message": f"节点 {node_type} 依赖安装成功", "message": message,
"output": result.stdout "output": result.stdout,
"post_install": after_install_result
} }
else: else:
error_msg = result.stderr or result.stdout error_msg = result.stderr or result.stdout
@ -222,6 +236,177 @@ def check_node_dependencies(node_type: str) -> Dict[str, Any]:
return {"success": None, "message": f"检查依赖时出错: {str(e)}"} return {"success": None, "message": f"检查依赖时出错: {str(e)}"}
def _run_node_after_install(node_type: str, node_dir: Path, project_root: Path, pyproject_content: str) -> Optional[Dict[str, Any]]:
"""
执行节点的后安装脚本如果配置了的话
支持配置格式
[tool.jingrow.after_install]
commands = [
"python -m playwright install-deps",
"python -m playwright install"
]
# 或单条命令
script = "python -m playwright install-deps && python -m playwright install"
Args:
node_type: 节点类型
node_dir: 节点目录
project_root: 项目根目录
pyproject_content: pyproject.toml 文件内容
Returns:
执行结果字典如果没有配置后安装脚本则返回 None
"""
try:
commands = []
# 解析 [tool.jingrow.after_install] 配置
in_after_install = False
in_commands_array = False
bracket_count = 0
for line in pyproject_content.split('\n'):
stripped = line.strip()
# 检测 [tool.jingrow.after_install] 段
if stripped.startswith('[tool.jingrow.after_install]'):
in_after_install = True
continue
# 如果遇到新的段,停止解析
if in_after_install and stripped.startswith('[') and not stripped.startswith('[tool.jingrow'):
break
if in_after_install:
# 查找 commands = [...] 数组格式(推荐,支持多条命令)
if stripped.startswith('commands') and '=' in stripped and '[' in stripped:
in_commands_array = True
bracket_count = stripped.count('[') - stripped.count(']')
# 检查是否在同一行结束
if ']' in stripped:
in_commands_array = False
# 提取同一行的命令
if '[' in stripped and ']' in stripped:
# 提取数组内容
array_content = stripped.split('[', 1)[1].rsplit(']', 1)[0]
for cmd in array_content.split(','):
cmd = cmd.strip().strip('"').strip("'")
if cmd:
commands.append(cmd)
continue
elif in_commands_array:
# 继续解析数组内容
bracket_count += stripped.count('[') - stripped.count(']')
if bracket_count <= 0 and ']' in stripped:
in_commands_array = False
# 如果这一行只有 ],直接跳过
if stripped.strip() == ']':
break
# 否则提取 ] 之前的内容
stripped = stripped.rsplit(']', 1)[0]
# 提取命令(去除引号和逗号)
cmd = stripped.strip('",\' ,')
# 跳过空行、注释和单独的 ]
if cmd and not cmd.startswith('#') and cmd != ']':
commands.append(cmd)
if bracket_count <= 0:
break
continue
# 查找 script = 配置(单条命令)
elif stripped.startswith('script') and '=' in stripped:
script_value = stripped.split('=', 1)[1].strip()
# 处理字符串格式(去除引号)
if script_value.startswith('"') and script_value.endswith('"'):
script_value = script_value[1:-1]
elif script_value.startswith("'") and script_value.endswith("'"):
script_value = script_value[1:-1]
if script_value:
commands = [script_value]
break
if not commands:
return None
# 将多条命令合并为一条脚本(用 && 连接)
script = ' && '.join(commands)
# 检查是否已经执行过相同的脚本
script_hash = hashlib.md5(script.encode('utf-8')).hexdigest()
# 将标记文件存储在项目根目录的缓存目录
cache_dir = get_root_path() / '.cache' / 'node_install'
cache_dir.mkdir(parents=True, exist_ok=True)
after_install_marker = cache_dir / f"{node_type}.after_install_done"
# 如果标记文件存在,检查脚本哈希值是否匹配
if after_install_marker.exists():
try:
with open(after_install_marker, 'r', encoding='utf-8') as f:
saved_hash = f.read().strip()
if saved_hash == script_hash:
logger.info(f"节点 {node_type} 的后安装脚本已执行过,跳过")
return {
"success": True,
"message": "后安装脚本已执行过,跳过",
"skipped": True
}
except Exception as e:
logger.warning(f"读取后安装标记文件失败: {str(e)},将重新执行")
logger.info(f"执行节点 {node_type} 的后安装脚本: {script}")
# 执行脚本(使用 shell 执行,支持 && 等操作符)
# 将脚本中的 python -m 命令替换为 uv run python -m确保在正确的环境中执行
import re
# 替换 python -m 为 uv run python -m避免重复替换已存在的 uv run
script = re.sub(r'(?<!uv run )\bpython -m\b', 'uv run python -m', script)
result = subprocess.run(
script,
shell=True,
cwd=project_root,
capture_output=True,
text=True,
timeout=600
)
if result.returncode == 0:
# 保存执行成功的标记
try:
with open(after_install_marker, 'w', encoding='utf-8') as f:
f.write(script_hash)
logger.debug(f"已保存节点 {node_type} 的后安装脚本执行标记")
except Exception as e:
logger.warning(f"保存后安装标记文件失败: {str(e)}")
return {
"success": True,
"message": "后安装脚本执行成功",
"output": result.stdout
}
else:
error_msg = result.stderr or result.stdout
return {
"success": False,
"error": f"后安装脚本执行失败: {error_msg}",
"stderr": result.stderr,
"stdout": result.stdout
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": "后安装脚本执行超时"
}
except Exception as e:
logger.warning(f"执行节点 {node_type} 后安装脚本时发生异常: {str(e)}")
return {
"success": False,
"error": f"后安装脚本执行异常: {str(e)}"
}
def ensure_node_dependencies(node_type: str) -> Dict[str, Any]: def ensure_node_dependencies(node_type: str) -> Dict[str, Any]:
""" """
确保节点依赖已安装检查并安装 确保节点依赖已安装检查并安装