add node: web_scrapers_create

This commit is contained in:
jingrow 2025-11-14 04:09:44 +08:00
parent 91818f1786
commit cb1b3887e8
5 changed files with 409 additions and 1040 deletions

View File

@ -1,538 +0,0 @@
import json
from typing import Dict, Any, Optional, List
import jingrow
def get_branch_upstream_nodes(current_node_id, flow_data):
"""获取同一分支的所有上游节点ID列表"""
if not current_node_id or not flow_data:
return []
edges = flow_data.get("edges", [])
upstream_nodes = set()
stack = [current_node_id]
while stack:
node_id = stack.pop()
for edge in edges:
if edge.get("target") == node_id:
source_id = edge.get("source")
if source_id and source_id not in upstream_nodes:
upstream_nodes.add(source_id)
stack.append(source_id)
return list(upstream_nodes)
def execute(context=None, inputs=None, config=None, **kwargs):
"""
批量创建记录节点
用于批量创建记录特别适用于处理 web_scrapers 节点返回的产品列表
"""
if context is None:
context = kwargs.get("context", {})
if inputs is None:
inputs = kwargs.get("inputs", {})
if config is None:
config = kwargs.get("config", {})
try:
# 参数处理
if not isinstance(inputs, dict):
try:
inputs = json.loads(inputs) if isinstance(inputs, str) else {}
except Exception:
inputs = {}
# 获取 pagetype
record_type = None
if isinstance(inputs, dict):
record_type = inputs.get("pagetype")
if not record_type and isinstance(config, dict):
record_type = config.get("pagetype")
if not isinstance(record_type, str) or not record_type:
return {
"success": False,
"error": "pagetype 必须为非空字符串,请检查配置"
}
# 获取字段映射配置
field_map = config.get("field_map", [])
# 获取默认网站配置(用于 Jsite Product 等需要 site 字段的类型)
default_site = config.get("default_site", "")
# 从 Jingrow 获取字段 label->fieldname 映射
label2field = jingrow.get_field_mapping_from_jingrow(record_type) or {}
# 获取字段元数据(用于检查字段长度限制)
field_metadata = {}
try:
from jingrow.model.page import get_page_instance
pg = get_page_instance(record_type)
meta_result = pg.get_meta()
if isinstance(meta_result, dict) and meta_result.get('success'):
meta_data = meta_result.get('data', {})
fields = meta_data.get('fields', [])
# 构建字段名到字段元数据的映射
for field in fields:
fieldname = field.get('fieldname')
if fieldname:
# 尝试多种方式获取长度限制
max_length = (
field.get('length') or
field.get('max_length') or
field.get('maxlength') or
(field.get('df') and field.get('df').get('length')) or
None
)
# 如果是整数,转换为整数
if max_length is not None:
try:
max_length = int(max_length)
except (ValueError, TypeError):
max_length = None
field_metadata[fieldname] = {
'max_length': max_length,
'fieldtype': field.get('fieldtype'),
'label': field.get('label')
}
except Exception as e:
# 如果获取元数据失败,记录错误但继续执行
print(f"获取字段元数据失败: {str(e)}")
pass
# 如果获取元数据失败,使用已知的常见字段长度限制
# 根据错误信息title 字段通常是 140 字符
# 为常见字段设置默认长度限制(即使元数据获取成功,也作为后备)
common_field_limits = {
'title': 140, # 从错误信息中得知
'name': 140,
}
for fieldname, default_length in common_field_limits.items():
if fieldname not in field_metadata:
field_metadata[fieldname] = {'max_length': default_length}
elif not field_metadata[fieldname].get('max_length'):
field_metadata[fieldname]['max_length'] = default_length
# 同时,为所有通过 label2field 映射的字段设置默认限制(如果它们看起来是标题字段)
for label, fieldname in label2field.items():
if fieldname not in field_metadata or not field_metadata[fieldname].get('max_length'):
# 如果字段标签包含"标题"或"title",且字段名是 title 或 name设置默认限制
label_str = str(label).lower()
if ('标题' in str(label) or 'title' in label_str) and fieldname in ['title', 'name']:
if fieldname not in field_metadata:
field_metadata[fieldname] = {'max_length': 140}
elif not field_metadata[fieldname].get('max_length'):
field_metadata[fieldname]['max_length'] = 140
# 获取数据数组
# 支持多种数据源:
# 1. 从 inputs 中获取 products 数组web_scrapers 节点输出)
# 2. 从 inputs 中获取 items 数组(通用数组)
# 3. 从上游节点获取 products 或 items
data_array = None
# 方法1: 从 inputs 直接获取
if isinstance(inputs, dict):
# web_scrapers 节点返回格式: {"products": [...]}
if "products" in inputs and isinstance(inputs["products"], list):
data_array = inputs["products"]
# 通用格式: {"items": [...]}
elif "items" in inputs and isinstance(inputs["items"], list):
data_array = inputs["items"]
# 如果 inputs 本身是数组
elif isinstance(inputs, list):
data_array = inputs
# 方法2: 从上游节点获取
if not data_array:
current_node_id = context.get("current_node_id")
flow_data = context.get("flow_data", {})
branch_upstream_nodes = get_branch_upstream_nodes(current_node_id, flow_data)
if context and "node_results" in context and branch_upstream_nodes:
node_results = context["node_results"]
for upstream_node_id in branch_upstream_nodes:
if upstream_node_id in node_results:
upstream_result = node_results[upstream_node_id]
if isinstance(upstream_result, dict):
# web_scrapers 节点返回格式
if "products" in upstream_result and isinstance(upstream_result["products"], list):
data_array = upstream_result["products"]
break
# 通用格式
elif "items" in upstream_result and isinstance(upstream_result["items"], list):
data_array = upstream_result["items"]
break
# 如果有 ai_raw_response尝试解析 JSON
elif "ai_raw_response" in upstream_result and isinstance(upstream_result["ai_raw_response"], str):
try:
raw_response = upstream_result["ai_raw_response"].strip()
start_brace = raw_response.find('[')
end_brace = raw_response.rfind(']')
if start_brace != -1 and end_brace != -1 and end_brace > start_brace:
json_content = raw_response[start_brace:end_brace + 1]
parsed_json = json.loads(json_content)
if isinstance(parsed_json, list):
data_array = parsed_json
break
except:
continue
if not data_array or not isinstance(data_array, list):
return {
"success": False,
"error": "未找到要创建的数据数组。请确保上游节点返回了 'products''items' 数组,或者直接传入数组数据。"
}
if len(data_array) == 0:
return {
"success": True,
"message": "数据数组为空,未创建任何记录",
"created_count": 0,
"created_records": []
}
# 批量创建记录
created_records = []
failed_records = []
for index, item_data in enumerate(data_array):
try:
# 确保 item_data 是字典
if not isinstance(item_data, dict):
failed_records.append({
"index": index,
"data": item_data,
"error": "数据项不是字典格式"
})
continue
# 目标记录数据
record_data = {}
mapped_fields = set()
# 1. 优先用 field_mapfrom→to
if field_map:
mapped_result = jingrow.map_fields_by_labels(field_map, item_data, label2field)
record_data.update(mapped_result)
mapped_fields.update([m.get("from") for m in field_map if m.get("from")])
# 同时标记目标字段为已映射,防止被自动映射覆盖
for mapping in field_map:
to_field = mapping.get("to")
if to_field:
mapped_fields.add(to_field)
# 调试:检查哪些字段映射失败了
failed_mappings = []
for mapping in field_map:
from_field = mapping.get("from")
to_field = mapping.get("to")
if from_field in item_data and from_field not in mapped_result:
# 字段映射失败
to_fieldname = label2field.get(str(to_field).strip())
failed_mappings.append({
"from": from_field,
"to_label": to_field,
"to_fieldname": to_fieldname,
"value": item_data.get(from_field)
})
# 2. 自动标签映射:处理未在 field_map 中映射的字段
for label, value in item_data.items():
if label in mapped_fields:
continue
fieldname = label2field.get(str(label).strip())
if fieldname is not None and fieldname not in record_data:
record_data[fieldname] = value
mapped_fields.add(label)
# 3. 处理特殊必填字段
# 对于 Jsite Product需要 site 字段
if record_type == "Jsite Product":
# 检查 site 字段是否已映射
site_fieldname = None
for label, fieldname in label2field.items():
if "网站" in str(label) or "site" in str(label).lower() or fieldname == "site":
site_fieldname = fieldname
break
if not site_fieldname:
site_fieldname = "site"
if site_fieldname not in record_data:
if default_site:
# 验证 default_site 不是 URL
site_value = str(default_site).strip()
if site_value.startswith(('http://', 'https://')):
# 如果是 URL尝试提取域名部分作为提示
from urllib.parse import urlparse
parsed = urlparse(site_value)
domain = parsed.netloc or parsed.path.split('/')[0]
# 不设置值,让创建失败并提示用户
# 但记录到调试信息中
pass
else:
# 使用配置的网站名称
record_data[site_fieldname] = site_value
else:
# 如果没有配置 default_site在错误信息中提示
# 但继续处理,让创建失败时能看到完整的错误信息
pass
# 确保 title 字段存在(如果数据中有 title 但映射失败)
# 检查所有可能的 title 字段名变体
title_fieldnames = ["title", "name"] # title 和 name 都可能是标题字段
title_mapped = False
for title_field in title_fieldnames:
if title_field in record_data and record_data[title_field]:
title_mapped = True
break
if not title_mapped and "title" in item_data and item_data.get("title"):
# 尝试多种方式映射 title
# 1. 查找 label2field 中 label 包含 "标题" 或 "title" 的字段
for label, fieldname in label2field.items():
if ("标题" in str(label) or "title" in str(label).lower()) and fieldname not in record_data:
record_data[fieldname] = item_data.get("title")
title_mapped = True
break
# 2. 如果还没映射,直接尝试使用 title 或 name 作为字段名
if not title_mapped:
for title_field in title_fieldnames:
if title_field not in record_data:
record_data[title_field] = item_data.get("title")
title_mapped = True
break
# 处理复杂数据类型(数组、对象)的序列化
# 某些字段可能需要转换为 JSON 字符串
# 同时处理字段长度限制,自动截断超长字段
processed_record_data = {}
truncated_fields = [] # 记录被截断的字段
for key, value in record_data.items():
if isinstance(value, (list, dict)):
# 如果是数组或对象,尝试转换为 JSON 字符串
# 但先检查目标字段类型,如果是 JSON 类型字段,可能需要保持原样
try:
# 先尝试保持原样,如果创建失败再转换
processed_record_data[key] = value
except:
processed_record_data[key] = json.dumps(value, ensure_ascii=False)
else:
# 处理字符串字段的长度限制
if isinstance(value, str) and value:
# 对于 title 和 name 字段,强制应用 140 字符限制(根据错误信息)
if key in ['title', 'name'] and len(value) > 140:
original_length = len(value)
processed_record_data[key] = value[:140]
truncated_fields.append({
"field": key,
"field_label": field_metadata.get(key, {}).get('label', key),
"original_length": original_length,
"max_length": 140,
"truncated_value": processed_record_data[key],
"truncation_method": "强制截断title/name 字段默认 140 字符)"
})
else:
# 检查字段是否有长度限制
field_info = field_metadata.get(key, {})
max_length = field_info.get('max_length')
# 如果字段元数据中没有,尝试通过 label2field 查找对应的字段
if not max_length:
# 尝试查找字段的 label然后通过 label 查找元数据
for label, fieldname in label2field.items():
if fieldname == key:
# 找到了对应的 label尝试查找元数据
field_info_from_label = field_metadata.get(fieldname, {})
if not max_length:
max_length = field_info_from_label.get('max_length')
break
# 执行截断
if max_length and isinstance(max_length, (int, float)):
max_length_int = int(max_length)
if len(value) > max_length_int:
# 自动截断超长字段
original_length = len(value)
processed_record_data[key] = value[:max_length_int]
truncated_fields.append({
"field": key,
"field_label": field_info.get('label', key),
"original_length": original_length,
"max_length": max_length_int,
"truncated_value": processed_record_data[key],
"truncation_method": "元数据截断"
})
else:
processed_record_data[key] = value
else:
processed_record_data[key] = value
else:
processed_record_data[key] = value
# 调用 Jingrow 创建记录
try:
# 直接调用 Page.create 以获取详细的错误信息
from jingrow.model.page import get_page_instance
pg = get_page_instance(record_type)
result = pg.create(processed_record_data)
# 初始化 error_msg确保在所有情况下都有值
error_msg = '创建记录失败'
if not isinstance(result, dict) or not result.get('success'):
# 获取详细错误信息
error_msg = result.get('error', '创建记录失败')
error_detail = result.get('detail', '')
if error_detail:
error_msg = f"{error_msg}: {error_detail}"
# 添加更详细的调试信息
debug_info = {
"record_type": record_type,
"mapped_fields_count": len(processed_record_data),
"field_names": list(processed_record_data.keys()),
"response": result,
"label2field_sample": dict(list(label2field.items())[:10]) if label2field else {},
"default_site_configured": bool(default_site),
"default_site_value": default_site if default_site else None,
"site_field_in_record_data": site_fieldname in processed_record_data if 'site_fieldname' in locals() else False,
"field_metadata_sample": dict(list(field_metadata.items())[:5]) if field_metadata else {},
"title_field_length": len(processed_record_data.get('title', '')) if 'title' in processed_record_data else None,
"title_field_in_metadata": 'title' in field_metadata,
"title_max_length": field_metadata.get('title', {}).get('max_length') if 'title' in field_metadata else None
}
# 如果有字段被截断,添加到调试信息
if 'truncated_fields' in locals() and truncated_fields:
debug_info["truncated_fields"] = truncated_fields
else:
# 如果没有截断信息,但 title 字段存在且超长,说明截断逻辑可能没有执行
if 'title' in processed_record_data:
title_value = processed_record_data['title']
if isinstance(title_value, str) and len(title_value) > 140:
debug_info["truncation_warning"] = f"title 字段长度为 {len(title_value)},超过 140但未执行截断。可能原因max_length 未正确获取。"
# 如果 default_site 是 URL添加警告
if default_site and str(default_site).strip().startswith(('http://', 'https://')):
debug_info["site_config_warning"] = "default_site 配置看起来是 URL但 site 字段需要的是网站记录的名称name不是 URL。请检查网站设置找到网站记录的名称并更新配置。"
# 如果有失败的映射,添加到调试信息中
if 'failed_mappings' in locals():
debug_info["failed_mappings"] = failed_mappings
failed_records.append({
"index": index,
"data": item_data,
"error": error_msg,
"record_data": processed_record_data,
"debug": debug_info
})
continue
created = result.get('data', {})
if not created:
debug_info = {
"record_type": record_type,
"response": result
}
if 'truncated_fields' in locals() and truncated_fields:
debug_info["truncated_fields"] = truncated_fields
failed_records.append({
"index": index,
"data": item_data,
"error": "创建记录失败:返回数据为空",
"record_data": processed_record_data,
"debug": debug_info
})
continue
# 记录成功创建,但如果有字段被截断,也记录下来
if 'truncated_fields' in locals() and truncated_fields:
# 在成功创建的情况下,截断信息可以通过返回结果查看
pass
except Exception as create_error:
debug_info = {
"record_type": record_type,
"mapped_fields_count": len(processed_record_data),
"field_names": list(processed_record_data.keys()),
"exception_type": type(create_error).__name__
}
if 'truncated_fields' in locals() and truncated_fields:
debug_info["truncated_fields"] = truncated_fields
failed_records.append({
"index": index,
"data": item_data,
"error": f"创建记录异常: {str(create_error)}",
"record_data": processed_record_data,
"debug": debug_info
})
continue
created_name = created.get("name") or record_data.get("name")
record_info = {
"index": index,
"name": created_name,
"pagetype": record_type
}
# 如果有字段被截断,添加到记录信息中
if 'truncated_fields' in locals() and truncated_fields:
record_info["truncated_fields"] = truncated_fields
created_records.append(record_info)
except Exception as e:
failed_records.append({
"index": index,
"data": item_data,
"error": str(e)
})
continue
# 构建返回结果
result = {
"success": True,
"total_count": len(data_array),
"created_count": len(created_records),
"failed_count": len(failed_records),
"created_records": created_records
}
if failed_records:
result["failed_records"] = failed_records
result["message"] = f"成功创建 {len(created_records)} 条记录,失败 {len(failed_records)}"
else:
result["message"] = f"成功创建 {len(created_records)} 条记录"
return result
except Exception as e:
# 构建详细的错误信息
error_details = {
"error": str(e),
"record_type": record_type if 'record_type' in locals() else None,
"inputs": inputs if 'inputs' in locals() else {},
"field_map": field_map if 'field_map' in locals() else []
}
print(f"BatchCreateRecords异常: {json.dumps(error_details, ensure_ascii=False, indent=2)}")
return {
"success": False,
"error": str(e),
"error_details": error_details
}

View File

@ -1,181 +0,0 @@
{
"metadata": {
"type": "web_scrapers",
"label": "网站采集",
"icon": "fa-spider",
"color": "rgba(71, 180, 133, 1)",
"description": "基于 Crawl4AI 采集网站产品信息(标题、价格、描述、图片等),支持单产品详情页和产品列表页两种模式",
"group": "数据",
"component_type": "GenericNode"
},
"properties": {
"url": {
"type": "string",
"title": "目标URL",
"description": "要采集的网站URL支持 Jinja2 模板语法,如:{{ product_url }}",
"format": "uri",
"minLength": 1
},
"crawl_mode": {
"type": "string",
"title": "爬取模式",
"description": "选择爬取模式single单产品详情页或 list产品列表页支持分页",
"default": "single",
"enum": [
"single",
"list"
],
"enumNames": [
"单产品详情页",
"产品列表页"
]
},
"max_pages": {
"type": "integer",
"title": "最大页数",
"description": "列表页模式下的最大爬取页数(防止无限循环)",
"default": 100,
"minimum": 1,
"maximum": 1000
},
"enrich_details": {
"type": "boolean",
"title": "补充产品详情",
"description": "列表页模式下是否爬取每个产品的详情页以补充详细信息图片、价格、公司信息、Product Details等"
},
"use_ai_extraction": {
"type": "boolean",
"title": "使用AI提取",
"description": "如果启用将使用AI模型从网页内容中智能提取结构化产品信息在手动提取的基础上进一步优化"
},
"ai_system_message": {
"type": "string",
"title": "系统消息",
"description": "定义AI的角色和行为",
"default": "你是一个专业的产品信息提取助手,请从网页内容中准确提取产品信息。",
"format": "textarea",
"rows": 5
},
"text_model": {
"type": "string",
"title": "AI模型",
"description": "选择要使用的AI模型与 ai_content_generation 节点保持一致)",
"default": "Jingrow",
"enum": [
"Jingrow",
"DeepSeek",
"Doubao",
"ChatGPT",
"Qwen",
"Wenxin",
"Hunyuan",
"Spark"
],
"enumNames": [
"Jingrow",
"DeepSeek",
"Doubao",
"ChatGPT",
"Qwen",
"文心",
"混元",
"讯飞星火"
]
},
"max_tokens": {
"type": "integer",
"title": "最大Token数",
"description": "限制生成内容的最大长度",
"default": 2000
},
"ai_temperature": {
"type": "number",
"title": "随机性 (Temperature)",
"description": "控制输出的随机性0为确定性2为最大随机性",
"default": 0.7
},
"ai_top_p": {
"type": "number",
"title": "多样性 (Top-p)",
"description": "控制词汇选择的多样性",
"default": 0.9
},
"hide_input_handle": {
"type": "boolean",
"title": "隐藏输入接口",
"description": "是否隐藏节点的输入连接点"
}
},
"required": [
"url"
],
"_layout": {
"tabs": [
{
"id": "tab_1",
"label": "基础配置",
"sections": [
{
"id": "section_1",
"label": "基本属性",
"columns": [
{
"id": "column_1",
"label": "主要设置",
"fields": [
"url",
"crawl_mode",
"max_pages",
"enrich_details",
"use_ai_extraction",
"ai_system_message"
]
}
]
}
]
},
{
"id": "tab_2",
"label": "AI模型设置",
"sections": [
{
"id": "section_2",
"label": "模型参数",
"columns": [
{
"id": "column_2",
"label": "模型配置",
"fields": [
"text_model",
"max_tokens",
"ai_temperature",
"ai_top_p"
]
}
]
}
]
},
{
"id": "tab_3",
"label": "节点设置",
"sections": [
{
"id": "section_3",
"label": "节点接口",
"columns": [
{
"id": "column_3",
"fields": [
"hide_input_handle"
]
}
]
}
]
}
],
"activeTab": "tab_3"
}
}

View File

@ -1,15 +1,17 @@
[project]
name = "jingrow-node-web-scrapers"
name = "web_scrapers_create"
version = "1.0.0"
requires-python = ">=3.10"
dependencies = [
"beautifulsoup4",
"crawl4ai",
"playwright",
"requests",
]
[tool.jingrow.after_install]
commands = [
"python -m playwright install-deps",
"python -m playwright install"
]
]

View File

@ -1,14 +1,21 @@
{
"metadata": {
"type": "batch_create_records",
"label": "批量创建记录",
"icon": "fa-layer-group",
"color": "#10b981",
"description": "批量创建页面记录,特别适用于处理产品列表等数组数据",
"group": "输出",
"type": "web_scrapers_create",
"label": "网站采集并创建",
"icon": "fa-spider",
"color": "rgba(71, 180, 133, 1)",
"description": "采集网站产品信息并立即创建记录(一体化节点),支持字段映射和图片上传",
"group": "数据",
"component_type": "GenericNode"
},
"properties": {
"url": {
"type": "string",
"title": "目标URL",
"description": "要采集的网站URL支持 Jinja2 模板语法,如:{{ product_url }}",
"format": "uri",
"minLength": 1
},
"pagetype": {
"type": "string",
"title": "页面类型",
@ -18,14 +25,14 @@
"field_map": {
"type": "array",
"title": "字段映射",
"description": "配置字段映射关系,将上游节点的数据映射到目标字段。如果不配置,将使用自动标签映射",
"description": "配置字段映射关系,将采集的数据映射到目标字段。如果不配置,将使用自动标签映射",
"items": {
"type": "object",
"properties": {
"from": {
"type": "string",
"title": "来源字段",
"description": "上游节点输出的字段名title、price、images"
"description": "采集数据中的字段名title、price、images、Product Details"
},
"to": {
"type": "string",
@ -44,13 +51,22 @@
"title": "默认网站",
"description": "Jsite Product 等类型需要 site 字段请填写网站记录的名称name不是 URL。例如如果网站记录的名称是 'mysite',就填写 'mysite'。可以在网站设置页面查看网站记录的名称。"
},
"show_output_handle": {
"max_pages": {
"type": "integer",
"title": "最大页数",
"description": "列表页模式下的最大爬取页数(防止无限循环)",
"default": 100,
"minimum": 1,
"maximum": 1000
},
"hide_input_handle": {
"type": "boolean",
"title": "显示输出接口",
"description": "是否显示节点的输出连接点"
"title": "隐藏输入接口",
"description": "是否隐藏节点的输入连接点"
}
},
"required": [
"url",
"pagetype"
],
"_layout": {
@ -61,15 +77,16 @@
"sections": [
{
"id": "section_1",
"label": "基本信息",
"label": "基本属性",
"columns": [
{
"id": "column_1",
"label": "记录设置",
"label": "主要设置",
"fields": [
"url",
"pagetype",
"default_site",
"show_output_handle"
"max_pages",
"default_site"
]
}
]
@ -82,11 +99,11 @@
"sections": [
{
"id": "section_2",
"label": "映射配置",
"label": "字段映射配置",
"columns": [
{
"id": "column_2",
"label": "字段映射",
"label": "映射设置",
"fields": [
"field_map"
]
@ -94,9 +111,27 @@
]
}
]
},
{
"id": "tab_3",
"label": "节点设置",
"sections": [
{
"id": "section_3",
"label": "节点接口",
"columns": [
{
"id": "column_3",
"fields": [
"hide_input_handle"
]
}
]
}
]
}
],
"activeTab": "tab_2"
"activeTab": "tab_1"
}
}

View File

@ -1,6 +1,8 @@
import json
import re
import asyncio
import requests
import uuid
from typing import Dict, Any, Optional, List
from urllib.parse import urljoin, urlparse, parse_qs, urlencode
@ -73,7 +75,6 @@ def extract_products_from_page(soup, base_url=''):
parsed = urlparse(base_url)
product['url'] = f"{parsed.scheme}://{parsed.netloc}" + product['url']
else:
# 从当前页面URL推断基础URL
product['url'] = base_url + product['url']
# 提取产品价格(从 prod-price 中)
@ -573,90 +574,6 @@ async def crawl_single_list_page(crawler, url, page_num, base_url=''):
return products, next_url, category_name
async def crawl_product_list(crawler, start_url, max_pages=100, enrich_details=False, base_url=''):
"""爬取产品列表(支持分页)"""
all_products = []
current_url = start_url
page_num = 1
category_name = None
while current_url and page_num <= max_pages:
# 爬取当前页
products, next_url, cat_name = await crawl_single_list_page(crawler, current_url, page_num, base_url)
# 保存第一页提取的分类名称
if page_num == 1 and cat_name:
category_name = cat_name
if products:
all_products.extend(products)
else:
break
# 检查是否有下一页
if next_url:
current_url = next_url
page_num += 1
# 添加延迟,避免请求过快
await asyncio.sleep(1)
else:
break
# 如果需要补充详情,为每个产品爬取详情页
if enrich_details and all_products:
for i, product in enumerate(all_products, 1):
if not product.get('url'):
continue
# 爬取产品详情
detail_info = await crawl_product_detail(
crawler,
product['url'],
i,
len(all_products),
base_url
)
# 合并详细信息到产品数据
if detail_info:
# 更新图片(使用详情页的图片列表)
if 'images' in detail_info:
product['images'] = detail_info['images']
# 保留第一张图片作为主图
if detail_info['images']:
product['image'] = detail_info['images'][0]
# 更新价格(使用详情页的价格)
if 'prices' in detail_info:
product['prices'] = detail_info['prices']
# 添加公司信息
if 'company_name' in detail_info:
product['company_name'] = detail_info['company_name']
if 'company_url' in detail_info:
product['company_url'] = detail_info['company_url']
# 添加Product Details
if 'Product Details' in detail_info:
product['Product Details'] = detail_info['Product Details']
# 添加延迟,避免请求过快
if i < len(all_products):
await asyncio.sleep(0.5)
# 将分类信息添加到结果中
result = {
'products': all_products,
'category_name': category_name,
'total_pages': page_num,
'total_products': len(all_products)
}
return result
# ==================== 节点执行函数 ====================
def get_branch_upstream_nodes(current_node_id, flow_data):
"""获取同一分支的所有上游节点ID列表"""
if not current_node_id or not flow_data:
@ -678,12 +595,329 @@ def get_branch_upstream_nodes(current_node_id, flow_data):
return list(upstream_nodes)
async def download_and_upload_image(image_url: str, record_name: str, record_type: str, field_name: str = None) -> Optional[str]:
"""下载图片并上传到服务器"""
try:
# 下载图片
response = requests.get(image_url, verify=False, timeout=30)
if response.status_code != 200:
return None
# 获取文件扩展名
parsed_url = urlparse(image_url)
path = parsed_url.path
file_extension = "jpg"
if path:
ext = path.split('.')[-1].lower()
if ext in ['jpg', 'jpeg', 'png', 'gif', 'webp']:
file_extension = ext
# 生成文件名
filename = f"scraped_{uuid.uuid4().hex[:8]}.{file_extension}"
# 上传图片
upload_result = jingrow.upload_file(
file_data=response.content,
filename=filename,
attached_to_pagetype=record_type,
attached_to_name=record_name,
attached_to_field=field_name
)
if upload_result.get('success'):
return upload_result.get('file_url')
return None
except Exception as e:
print(f"[ERROR] 图片上传失败 {image_url}: {str(e)}")
return None
async def upload_images(images: List[str], record_name: str, record_type: str, field_name: str = None) -> List[str]:
"""批量上传图片"""
uploaded_urls = []
for image_url in images:
if not image_url or not image_url.startswith('http'):
continue
uploaded_url = await download_and_upload_image(image_url, record_name, record_type, field_name)
if uploaded_url:
uploaded_urls.append(uploaded_url)
return uploaded_urls
def map_product_data_to_record(product_data: Dict[str, Any], field_map: List[Dict], label2field: Dict, record_type: str, default_site: str = "") -> Dict[str, Any]:
"""将产品数据映射为记录字段"""
record_data = {}
mapped_fields = set()
# 识别图片字段(需要特殊处理,不能直接映射)
image_source_fields = set()
image_target_fields = set()
for mapping in field_map:
from_field = mapping.get("from")
to_field = mapping.get("to")
if from_field in ["image", "images"]:
image_source_fields.add(from_field)
if to_field:
image_target_field = label2field.get(str(to_field).strip(), to_field)
if image_target_field:
image_target_fields.add(image_target_field)
# 1. 优先使用 field_map 映射(排除图片字段)
if field_map:
# 过滤掉图片字段的映射
filtered_field_map = [m for m in field_map if m.get("from") not in image_source_fields]
if filtered_field_map:
mapped_result = jingrow.map_fields_by_labels(filtered_field_map, product_data, label2field)
record_data.update(mapped_result)
mapped_fields.update([m.get("from") for m in field_map if m.get("from")])
# 2. 自动标签映射:处理未映射的字段(排除图片字段)
for label, value in product_data.items():
if label in mapped_fields or label in image_source_fields:
continue
fieldname = label2field.get(str(label).strip())
if fieldname is not None and fieldname not in record_data and fieldname not in image_target_fields:
record_data[fieldname] = value
mapped_fields.add(label)
# 3. 处理特殊必填字段(如果配置了 default_site尝试设置 site 字段)
if default_site:
# 查找 site 字段
site_fieldname = None
for label, fieldname in label2field.items():
if "网站" in str(label) or "site" in str(label).lower() or fieldname == "site":
site_fieldname = fieldname
break
# 如果没找到,尝试使用 "site" 作为字段名
if not site_fieldname:
# 检查 "site" 是否在字段名集合中
if "site" in label2field.values():
site_fieldname = "site"
else:
# 如果 label2field 中有 site 相关的字段,使用第一个
for fieldname in label2field.values():
if "site" in fieldname.lower():
site_fieldname = fieldname
break
# 如果找到了 site 字段且未设置,则设置默认值
if site_fieldname and site_fieldname not in record_data:
site_value = str(default_site).strip()
# 验证 default_site 不是 URL
if not site_value.startswith(('http://', 'https://')):
record_data[site_fieldname] = site_value
# 4. 处理字段长度限制
processed_data = {}
for key, value in record_data.items():
if isinstance(value, str) and len(value) > 140 and key in ['title', 'name']:
processed_data[key] = value[:140]
elif isinstance(value, (list, dict)):
try:
processed_data[key] = json.dumps(value, ensure_ascii=False)
except:
processed_data[key] = str(value)
else:
processed_data[key] = value
return processed_data
async def create_record_async(product_data: Dict[str, Any], config: Dict[str, Any], label2field: Dict) -> Dict[str, Any]:
"""异步创建记录"""
try:
record_type = config.get("pagetype")
field_map = config.get("field_map", [])
default_site = config.get("default_site", "")
# 映射字段
record_data = map_product_data_to_record(product_data, field_map, label2field, record_type, default_site)
# 处理图片上传
image_field = None
for mapping in field_map:
if mapping.get("from") in ["image", "images"]:
to_field = mapping.get("to")
if to_field:
image_field = label2field.get(str(to_field).strip(), to_field)
break
# 如果没找到映射,尝试自动查找图片字段
if not image_field:
for label, fieldname in label2field.items():
if "图片" in str(label) or "image" in str(label).lower():
image_field = fieldname
break
# 上传图片
images_to_upload = []
if "images" in product_data and isinstance(product_data["images"], list):
images_to_upload = product_data["images"]
elif "image" in product_data and product_data["image"]:
images_to_upload = [product_data["image"]]
# 先创建记录(需要 record_name 来上传图片)
# 获取标题作为记录名称
record_name = record_data.get("title") or record_data.get("name") or f"product_{uuid.uuid4().hex[:8]}"
if isinstance(record_name, str) and len(record_name) > 140:
record_name = record_name[:140]
# 创建记录
created = jingrow.create_pg(record_type, record_data)
if not created:
return {
"success": False,
"error": "创建记录失败",
"data": product_data
}
created_name = created.get('name') or record_name
# 上传图片(如果有)
update_data = {}
if images_to_upload and image_field:
uploaded_urls = await upload_images(images_to_upload, created_name, record_type, image_field)
if uploaded_urls:
# 只设置第一张图片作为主图(避免多张图片时设置数组)
# 如果目标字段支持多张图片,可以通过其他方式处理
update_data[image_field] = uploaded_urls[0]
# 如果有需要更新的数据,更新记录
if update_data:
jingrow.update_pg(record_type, created_name, update_data)
return {
"success": True,
"record_name": created_name,
"data": product_data
}
except Exception as e:
return {
"success": False,
"error": str(e),
"data": product_data
}
async def crawl_and_create_list(crawler, start_url: str, config: Dict[str, Any], base_url: str = '', max_pages: int = 100):
"""爬取产品列表并逐条创建记录"""
record_type = config.get("pagetype")
if not record_type:
return {
"success": False,
"error": "pagetype 必须配置"
}
# 获取字段映射
label2field = jingrow.get_field_mapping_from_jingrow(record_type) or {}
current_url = start_url
page_num = 1
category_name = None
created_count = 0
failed_count = 0
created_records = []
failed_records = []
while current_url and page_num <= max_pages:
# 爬取当前页
products, next_url, cat_name = await crawl_single_list_page(crawler, current_url, page_num, base_url)
if page_num == 1 and cat_name:
category_name = cat_name
if not products:
break
# 为每个产品爬取详情并创建记录
for product in products:
if not product.get('url'):
continue
# 爬取产品详情
detail_info = await crawl_product_detail(crawler, product['url'], None, None, base_url)
# 合并数据(列表页数据 + 详情页数据)
full_product_data = product.copy()
if detail_info:
# 更新标题(使用详情页的标题,更准确)
if 'title' in detail_info:
full_product_data['title'] = detail_info['title']
# 提取副标题(如果有合适的来源字段)
# 可以从 title 中提取副标题,例如 "主标题 - 副标题"
if 'title' in full_product_data:
title_parts = full_product_data['title'].split(' - ', 1)
if len(title_parts) == 2:
full_product_data['title'] = title_parts[0].strip()
full_product_data['subtitle'] = title_parts[1].strip()
# 更新图片(使用详情页的图片列表,更完整)
if 'images' in detail_info and detail_info['images']:
full_product_data['images'] = detail_info['images']
full_product_data['image'] = detail_info['images'][0]
elif 'image' in full_product_data and not full_product_data.get('images'):
# 如果详情页没有图片,使用列表页的图片
full_product_data['images'] = [full_product_data['image']]
# 更新价格(使用详情页的价格,更详细)
if 'prices' in detail_info:
full_product_data['prices'] = detail_info['prices']
# 添加公司信息
if 'company_name' in detail_info:
full_product_data['company_name'] = detail_info['company_name']
if 'company_url' in detail_info:
full_product_data['company_url'] = detail_info['company_url']
# 添加Product Details完整的产品详情
if 'Product Details' in detail_info:
full_product_data['Product Details'] = detail_info['Product Details']
# 异步创建记录
create_result = await create_record_async(full_product_data, config, label2field)
if create_result.get('success'):
created_count += 1
created_records.append({
"record_name": create_result.get('record_name'),
"title": full_product_data.get('title')
})
else:
failed_count += 1
failed_records.append({
"title": full_product_data.get('title'),
"error": create_result.get('error')
})
# 添加延迟,避免请求过快
await asyncio.sleep(0.5)
# 检查是否有下一页
if next_url:
current_url = next_url
page_num += 1
await asyncio.sleep(1)
else:
break
return {
"success": True,
"category_name": category_name,
"total_pages": page_num,
"created_count": created_count,
"failed_count": failed_count,
"created_records": created_records,
"failed_records": failed_records
}
def execute(context=None, inputs=None, config=None, **kwargs):
"""
网站产品信息采集节点 - 基于 Crawl4AI
支持两种模式
1. 单产品详情页模式爬取单个产品详情页
2. 产品列表页模式爬取产品列表支持分页可选择是否补充详情
网站采集并创建记录节点一体化
采集一条数据后立即异步创建一条记录
"""
if context is None:
context = kwargs.get("context", {})
@ -691,7 +925,7 @@ def execute(context=None, inputs=None, config=None, **kwargs):
inputs = kwargs.get("inputs", {})
if config is None:
config = kwargs.get("config", {})
try:
# 参数处理
if not isinstance(inputs, dict):
@ -699,12 +933,17 @@ def execute(context=None, inputs=None, config=None, **kwargs):
inputs = json.loads(inputs) if isinstance(inputs, str) else {}
except Exception:
inputs = {}
# 获取配置
url = config.get("url") or inputs.get("url")
if not url:
return {"success": False, "error": "URL 必填"}
# 获取 pagetype
record_type = config.get("pagetype")
if not record_type:
return {"success": False, "error": "pagetype 必须配置"}
# 支持从上游节点获取 URL
current_node_id = context.get("current_node_id")
flow_data = context.get("flow_data", {})
@ -716,7 +955,6 @@ def execute(context=None, inputs=None, config=None, **kwargs):
if upstream_node_id in node_results:
upstream_result = node_results[upstream_node_id]
if isinstance(upstream_result, dict):
# 解析 AI 原始响应中的 JSON
if "ai_raw_response" in upstream_result and isinstance(upstream_result["ai_raw_response"], str):
try:
raw_response = upstream_result["ai_raw_response"].strip()
@ -733,16 +971,14 @@ def execute(context=None, inputs=None, config=None, **kwargs):
except:
pass
# 平铺其他字段到 inputs
for k, v in upstream_result.items():
if k not in inputs or not inputs.get(k):
inputs[k] = v
# 如果 inputs 中有 url优先使用
if inputs.get("url") and not url:
url = inputs.get("url")
# 构建模板上下文(支持 Jinja2 模板)
# 构建模板上下文
template_context = inputs.copy()
if context and context.get("flow_id"):
flow_id = context["flow_id"]
@ -751,13 +987,13 @@ def execute(context=None, inputs=None, config=None, **kwargs):
for key, value in agent_data.items():
if key not in template_context or not template_context.get(key):
template_context[key] = value
# 如果 URL 包含模板语法,进行渲染
# 渲染 URL 模板
try:
url = render_template(url, template_context)
except Exception as e:
return {"success": False, "error": f"URL 模板渲染失败: {str(e)}"}
# URL 验证和规范化
if not url or not isinstance(url, str):
return {"success": False, "error": "URL 不能为空"}
@ -766,7 +1002,6 @@ def execute(context=None, inputs=None, config=None, **kwargs):
if not url:
return {"success": False, "error": "URL 不能为空"}
# 确保 URL 有协议前缀
if not url.startswith(('http://', 'https://', 'file://', 'raw:')):
if url.startswith('//'):
url = 'https:' + url
@ -774,209 +1009,25 @@ def execute(context=None, inputs=None, config=None, **kwargs):
url = 'https://' + url
else:
return {"success": False, "error": f"URL 格式不正确: {url}"}
# 获取模式配置
crawl_mode = config.get("crawl_mode", "single") # "single" 或 "list"
max_pages = config.get("max_pages", 100) # 列表页模式的最大页数
enrich_details = config.get("enrich_details", False) # 是否补充产品详情
# AI 提取配置(仅用于单产品模式)
use_ai_extraction = config.get("use_ai_extraction", False)
text_model = config.get("text_model", "Jingrow")
ai_system_message = config.get("ai_system_message", "你是一个专业的产品信息提取助手,请从网页内容中准确提取产品信息。")
max_tokens = config.get("max_tokens", 2000)
ai_temperature = config.get("ai_temperature", 0.7)
ai_top_p = config.get("ai_top_p", 0.9)
# 使用 Crawl4AI 进行爬取
try:
from crawl4ai import AsyncWebCrawler
import asyncio
except ImportError:
return {
"success": False,
"error": "Crawl4AI 未安装,请运行: uv pip install crawl4ai"
}
async def crawl_website():
# 获取配置
max_pages = config.get("max_pages", 100)
# 提取基础URL
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
# 执行爬取和创建
async def run_crawl():
async with AsyncWebCrawler(verbose=True) as crawler:
# 提取基础URL用于相对路径处理
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
if crawl_mode == "list":
# 产品列表页模式
result = await crawl_product_list(
crawler,
url,
max_pages=max_pages,
enrich_details=enrich_details,
base_url=base_url
)
# 构建返回结果
return {
"success": True,
"url": url,
"mode": "list",
"category_name": result.get('category_name'),
"total_pages": result.get('total_pages', 0),
"total_products": result.get('total_products', 0),
"products": result.get('products', [])
}
else:
# 单产品详情页模式
result = await crawler.arun(
url=url,
wait_for="networkidle",
page_timeout=30000,
)
if not result.success:
return {
"success": False,
"error": f"爬取失败: {result.error_message or '未知错误'}"
}
html_content = result.html
markdown_content = result.markdown
# 初始化 BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# 提取产品信息
extracted_data = {}
# 提取标题
title = extract_product_title(soup)
if title:
extracted_data['title'] = title
# 提取图片
images = extract_product_detail_images(soup, base_url)
if images:
extracted_data['image'] = images
# 提取价格
prices = extract_product_detail_prices(soup)
if prices:
extracted_data['price'] = prices
# 提取公司信息
company_info = extract_company_info(soup, base_url)
if company_info.get('company_name'):
extracted_data['company_name'] = company_info['company_name']
extracted_data['url'] = company_info['company_url']
# 提取Product Details
product_details = extract_product_details(soup)
if product_details:
extracted_data['Product Details'] = product_details
# 如果启用了 AI 提取,使用 AI 模型进一步提取和补充信息
if use_ai_extraction:
try:
from jingrow.utils.jingrow_cloud import call_ai_model
# 构建 AI 提取提示词
ai_prompt = f"""请从以下网页内容中提取产品信息,返回 JSON 格式。
URL: {url}
网页内容Markdown前5000字符:
{markdown_content[:5000]}
已提取的基础信息
{json.dumps(extracted_data, ensure_ascii=False, indent=2)}
请基于网页内容和完善已有信息返回完整的 JSON 格式严格按照以下结构
{{
"title": "产品标题",
"price": [{{"US$877.00": "2-9"}}, {{"US$850.00": "10+"}}], // 价格数组每个元素是 {{"价格": "数量范围"}} 的格式
"image": ["图片URL1", "图片URL2"], // 图片URL数组
"Product Details": "产品详细信息文本",
"company_name": "公司名称",
"url": "公司主页URL"
}}
注意
- price 必须是数组每个元素是对象格式 {{"价格": "数量范围"}}
- image 必须是字符串数组
- 如果某些信息不存在使用空字符串或空数组"""
# 调用 AI 模型
ai_res = call_ai_model(
ai_prompt,
text_model=text_model,
ai_temperature=ai_temperature,
ai_top_p=ai_top_p,
ai_system_message=ai_system_message,
max_tokens=max_tokens,
image_urls=None,
)
if ai_res.get("success"):
ai_response = ai_res.get("response", "")
# 尝试解析 JSON
try:
start_brace = ai_response.find('{')
end_brace = ai_response.rfind('}')
if start_brace != -1 and end_brace != -1 and end_brace > start_brace:
json_str = ai_response[start_brace:end_brace + 1]
ai_data = json.loads(json_str)
if isinstance(ai_data, dict):
# 合并 AI 提取的数据AI 提取的数据优先
for key, value in ai_data.items():
if value is not None and value != "" and (not isinstance(value, list) or len(value) > 0):
extracted_data[key] = value
except json.JSONDecodeError:
pass
except Exception:
pass
# 确保输出格式一致
if "price" in extracted_data:
if not isinstance(extracted_data["price"], list):
if isinstance(extracted_data["price"], str):
extracted_data["price"] = [extracted_data["price"]]
else:
extracted_data["price"] = []
if "image" in extracted_data:
if not isinstance(extracted_data["image"], list):
extracted_data["image"] = []
# 确保所有必需字段存在
final_data = {
"title": extracted_data.get("title", ""),
"price": extracted_data.get("price", []),
"image": extracted_data.get("image", []),
"Product Details": extracted_data.get("Product Details", ""),
"company_name": extracted_data.get("company_name", ""),
"url": extracted_data.get("url", "")
}
# 构建返回结果
return {
"success": True,
"url": url,
"mode": "single",
"extracted_data": final_data
}
# 运行异步爬取
try:
result = asyncio.run(crawl_website())
return result
except Exception as e:
return {
"success": False,
"error": f"爬取执行失败: {str(e)}"
}
return await crawl_and_create_list(crawler, url, config, base_url, max_pages)
result = asyncio.run(run_crawl())
return result
except Exception as e:
return {
"success": False,
"error": f"网站采集节点执行失败: {str(e)}"
"error": f"执行失败: {str(e)}"
}