diff --git a/.gitignore b/.gitignore index 8ebf5de..4facb6f 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,6 @@ apps/* # uv/virtualenv:忽略所有目录下的 .venv,保留 uv.lock 追踪 .venv/ **/.venv/ + +# 运行时缓存文件 +.cache/ diff --git a/apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.json b/apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.json new file mode 100644 index 0000000..bf5db7a --- /dev/null +++ b/apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.json @@ -0,0 +1,102 @@ +{ + "metadata": { + "type": "batch_create_records", + "label": "批量创建记录", + "icon": "fa-layer-group", + "color": "#10b981", + "description": "批量创建页面记录,特别适用于处理产品列表等数组数据", + "group": "输出", + "component_type": "GenericNode" + }, + "properties": { + "pagetype": { + "type": "string", + "title": "页面类型", + "description": "要创建记录的页面类型,如:Jsite Product、Jsite Article", + "minLength": 1 + }, + "field_map": { + "type": "array", + "title": "字段映射", + "description": "配置字段映射关系,将上游节点的数据映射到目标字段。如果不配置,将使用自动标签映射", + "items": { + "type": "object", + "properties": { + "from": { + "type": "string", + "title": "来源字段", + "description": "上游节点输出的字段名,如:title、price、images" + }, + "to": { + "type": "string", + "title": "目标字段", + "description": "目标页面类型中的字段名(使用字段标签)" + } + }, + "required": [ + "from", + "to" + ] + } + }, + "default_site": { + "type": "string", + "title": "默认网站", + "description": "Jsite Product 等类型需要 site 字段,请填写网站记录的名称(name),不是 URL。例如:如果网站记录的名称是 'mysite',就填写 'mysite'。可以在网站设置页面查看网站记录的名称。" + }, + "show_output_handle": { + "type": "boolean", + "title": "显示输出接口", + "description": "是否显示节点的输出连接点" + } + }, + "required": [ + "pagetype" + ], + "_layout": { + "tabs": [ + { + "id": "tab_1", + "label": "基础配置", + "sections": [ + { + "id": "section_1", + "label": "基本信息", + "columns": [ + { + "id": "column_1", + "label": "记录设置", + "fields": [ + "pagetype", + "default_site", + "show_output_handle" + ] + } + ] + } + ] + }, + { + "id": "tab_2", + "label": "字段映射", + "sections": [ + { + "id": "section_2", + "label": "映射配置", + "columns": [ + { + "id": "column_2", + "label": "字段映射", + "fields": [ + "field_map" + ] + } + ] + } + ] + } + ], + "activeTab": "tab_2" + } +} + diff --git a/apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.py b/apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.py new file mode 100644 index 0000000..f386bf2 --- /dev/null +++ b/apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.py @@ -0,0 +1,538 @@ +import json +from typing import Dict, Any, Optional, List + +import jingrow + + +def get_branch_upstream_nodes(current_node_id, flow_data): + """获取同一分支的所有上游节点ID列表""" + if not current_node_id or not flow_data: + return [] + + edges = flow_data.get("edges", []) + upstream_nodes = set() + stack = [current_node_id] + + while stack: + node_id = stack.pop() + for edge in edges: + if edge.get("target") == node_id: + source_id = edge.get("source") + if source_id and source_id not in upstream_nodes: + upstream_nodes.add(source_id) + stack.append(source_id) + + return list(upstream_nodes) + + +def execute(context=None, inputs=None, config=None, **kwargs): + """ + 批量创建记录节点 + 用于批量创建记录,特别适用于处理 web_scrapers 节点返回的产品列表 + """ + if context is None: + context = kwargs.get("context", {}) + if inputs is None: + inputs = kwargs.get("inputs", {}) + if config is None: + config = kwargs.get("config", {}) + + try: + # 参数处理 + if not isinstance(inputs, dict): + try: + inputs = json.loads(inputs) if isinstance(inputs, str) else {} + except Exception: + inputs = {} + + # 获取 pagetype + record_type = None + if isinstance(inputs, dict): + record_type = inputs.get("pagetype") + if not record_type and isinstance(config, dict): + record_type = config.get("pagetype") + if not isinstance(record_type, str) or not record_type: + return { + "success": False, + "error": "pagetype 必须为非空字符串,请检查配置" + } + + # 获取字段映射配置 + field_map = config.get("field_map", []) + + # 获取默认网站配置(用于 Jsite Product 等需要 site 字段的类型) + default_site = config.get("default_site", "") + + # 从 Jingrow 获取字段 label->fieldname 映射 + label2field = jingrow.get_field_mapping_from_jingrow(record_type) or {} + + # 获取字段元数据(用于检查字段长度限制) + field_metadata = {} + try: + from jingrow.model.page import get_page_instance + pg = get_page_instance(record_type) + meta_result = pg.get_meta() + if isinstance(meta_result, dict) and meta_result.get('success'): + meta_data = meta_result.get('data', {}) + fields = meta_data.get('fields', []) + # 构建字段名到字段元数据的映射 + for field in fields: + fieldname = field.get('fieldname') + if fieldname: + # 尝试多种方式获取长度限制 + max_length = ( + field.get('length') or + field.get('max_length') or + field.get('maxlength') or + (field.get('df') and field.get('df').get('length')) or + None + ) + # 如果是整数,转换为整数 + if max_length is not None: + try: + max_length = int(max_length) + except (ValueError, TypeError): + max_length = None + + field_metadata[fieldname] = { + 'max_length': max_length, + 'fieldtype': field.get('fieldtype'), + 'label': field.get('label') + } + except Exception as e: + # 如果获取元数据失败,记录错误但继续执行 + print(f"获取字段元数据失败: {str(e)}") + pass + + # 如果获取元数据失败,使用已知的常见字段长度限制 + # 根据错误信息,title 字段通常是 140 字符 + # 为常见字段设置默认长度限制(即使元数据获取成功,也作为后备) + common_field_limits = { + 'title': 140, # 从错误信息中得知 + 'name': 140, + } + for fieldname, default_length in common_field_limits.items(): + if fieldname not in field_metadata: + field_metadata[fieldname] = {'max_length': default_length} + elif not field_metadata[fieldname].get('max_length'): + field_metadata[fieldname]['max_length'] = default_length + + # 同时,为所有通过 label2field 映射的字段设置默认限制(如果它们看起来是标题字段) + for label, fieldname in label2field.items(): + if fieldname not in field_metadata or not field_metadata[fieldname].get('max_length'): + # 如果字段标签包含"标题"或"title",且字段名是 title 或 name,设置默认限制 + label_str = str(label).lower() + if ('标题' in str(label) or 'title' in label_str) and fieldname in ['title', 'name']: + if fieldname not in field_metadata: + field_metadata[fieldname] = {'max_length': 140} + elif not field_metadata[fieldname].get('max_length'): + field_metadata[fieldname]['max_length'] = 140 + + # 获取数据数组 + # 支持多种数据源: + # 1. 从 inputs 中获取 products 数组(web_scrapers 节点输出) + # 2. 从 inputs 中获取 items 数组(通用数组) + # 3. 从上游节点获取 products 或 items + data_array = None + + # 方法1: 从 inputs 直接获取 + if isinstance(inputs, dict): + # web_scrapers 节点返回格式: {"products": [...]} + if "products" in inputs and isinstance(inputs["products"], list): + data_array = inputs["products"] + # 通用格式: {"items": [...]} + elif "items" in inputs and isinstance(inputs["items"], list): + data_array = inputs["items"] + # 如果 inputs 本身是数组 + elif isinstance(inputs, list): + data_array = inputs + + # 方法2: 从上游节点获取 + if not data_array: + current_node_id = context.get("current_node_id") + flow_data = context.get("flow_data", {}) + branch_upstream_nodes = get_branch_upstream_nodes(current_node_id, flow_data) + + if context and "node_results" in context and branch_upstream_nodes: + node_results = context["node_results"] + for upstream_node_id in branch_upstream_nodes: + if upstream_node_id in node_results: + upstream_result = node_results[upstream_node_id] + if isinstance(upstream_result, dict): + # web_scrapers 节点返回格式 + if "products" in upstream_result and isinstance(upstream_result["products"], list): + data_array = upstream_result["products"] + break + # 通用格式 + elif "items" in upstream_result and isinstance(upstream_result["items"], list): + data_array = upstream_result["items"] + break + # 如果有 ai_raw_response,尝试解析 JSON + elif "ai_raw_response" in upstream_result and isinstance(upstream_result["ai_raw_response"], str): + try: + raw_response = upstream_result["ai_raw_response"].strip() + start_brace = raw_response.find('[') + end_brace = raw_response.rfind(']') + + if start_brace != -1 and end_brace != -1 and end_brace > start_brace: + json_content = raw_response[start_brace:end_brace + 1] + parsed_json = json.loads(json_content) + if isinstance(parsed_json, list): + data_array = parsed_json + break + except: + continue + + if not data_array or not isinstance(data_array, list): + return { + "success": False, + "error": "未找到要创建的数据数组。请确保上游节点返回了 'products' 或 'items' 数组,或者直接传入数组数据。" + } + + if len(data_array) == 0: + return { + "success": True, + "message": "数据数组为空,未创建任何记录", + "created_count": 0, + "created_records": [] + } + + # 批量创建记录 + created_records = [] + failed_records = [] + + for index, item_data in enumerate(data_array): + try: + # 确保 item_data 是字典 + if not isinstance(item_data, dict): + failed_records.append({ + "index": index, + "data": item_data, + "error": "数据项不是字典格式" + }) + continue + + # 目标记录数据 + record_data = {} + mapped_fields = set() + + # 1. 优先用 field_map(from→to) + if field_map: + mapped_result = jingrow.map_fields_by_labels(field_map, item_data, label2field) + record_data.update(mapped_result) + mapped_fields.update([m.get("from") for m in field_map if m.get("from")]) + # 同时标记目标字段为已映射,防止被自动映射覆盖 + for mapping in field_map: + to_field = mapping.get("to") + if to_field: + mapped_fields.add(to_field) + + # 调试:检查哪些字段映射失败了 + failed_mappings = [] + for mapping in field_map: + from_field = mapping.get("from") + to_field = mapping.get("to") + if from_field in item_data and from_field not in mapped_result: + # 字段映射失败 + to_fieldname = label2field.get(str(to_field).strip()) + failed_mappings.append({ + "from": from_field, + "to_label": to_field, + "to_fieldname": to_fieldname, + "value": item_data.get(from_field) + }) + + # 2. 自动标签映射:处理未在 field_map 中映射的字段 + for label, value in item_data.items(): + if label in mapped_fields: + continue + fieldname = label2field.get(str(label).strip()) + if fieldname is not None and fieldname not in record_data: + record_data[fieldname] = value + mapped_fields.add(label) + + # 3. 处理特殊必填字段 + # 对于 Jsite Product,需要 site 字段 + if record_type == "Jsite Product": + # 检查 site 字段是否已映射 + site_fieldname = None + for label, fieldname in label2field.items(): + if "网站" in str(label) or "site" in str(label).lower() or fieldname == "site": + site_fieldname = fieldname + break + + if not site_fieldname: + site_fieldname = "site" + + if site_fieldname not in record_data: + if default_site: + # 验证 default_site 不是 URL + site_value = str(default_site).strip() + if site_value.startswith(('http://', 'https://')): + # 如果是 URL,尝试提取域名部分作为提示 + from urllib.parse import urlparse + parsed = urlparse(site_value) + domain = parsed.netloc or parsed.path.split('/')[0] + # 不设置值,让创建失败并提示用户 + # 但记录到调试信息中 + pass + else: + # 使用配置的网站名称 + record_data[site_fieldname] = site_value + else: + # 如果没有配置 default_site,在错误信息中提示 + # 但继续处理,让创建失败时能看到完整的错误信息 + pass + + # 确保 title 字段存在(如果数据中有 title 但映射失败) + # 检查所有可能的 title 字段名变体 + title_fieldnames = ["title", "name"] # title 和 name 都可能是标题字段 + title_mapped = False + + for title_field in title_fieldnames: + if title_field in record_data and record_data[title_field]: + title_mapped = True + break + + if not title_mapped and "title" in item_data and item_data.get("title"): + # 尝试多种方式映射 title + # 1. 查找 label2field 中 label 包含 "标题" 或 "title" 的字段 + for label, fieldname in label2field.items(): + if ("标题" in str(label) or "title" in str(label).lower()) and fieldname not in record_data: + record_data[fieldname] = item_data.get("title") + title_mapped = True + break + + # 2. 如果还没映射,直接尝试使用 title 或 name 作为字段名 + if not title_mapped: + for title_field in title_fieldnames: + if title_field not in record_data: + record_data[title_field] = item_data.get("title") + title_mapped = True + break + + # 处理复杂数据类型(数组、对象)的序列化 + # 某些字段可能需要转换为 JSON 字符串 + # 同时处理字段长度限制,自动截断超长字段 + processed_record_data = {} + truncated_fields = [] # 记录被截断的字段 + + for key, value in record_data.items(): + if isinstance(value, (list, dict)): + # 如果是数组或对象,尝试转换为 JSON 字符串 + # 但先检查目标字段类型,如果是 JSON 类型字段,可能需要保持原样 + try: + # 先尝试保持原样,如果创建失败再转换 + processed_record_data[key] = value + except: + processed_record_data[key] = json.dumps(value, ensure_ascii=False) + else: + # 处理字符串字段的长度限制 + if isinstance(value, str) and value: + # 对于 title 和 name 字段,强制应用 140 字符限制(根据错误信息) + if key in ['title', 'name'] and len(value) > 140: + original_length = len(value) + processed_record_data[key] = value[:140] + truncated_fields.append({ + "field": key, + "field_label": field_metadata.get(key, {}).get('label', key), + "original_length": original_length, + "max_length": 140, + "truncated_value": processed_record_data[key], + "truncation_method": "强制截断(title/name 字段默认 140 字符)" + }) + else: + # 检查字段是否有长度限制 + field_info = field_metadata.get(key, {}) + max_length = field_info.get('max_length') + + # 如果字段元数据中没有,尝试通过 label2field 查找对应的字段 + if not max_length: + # 尝试查找字段的 label,然后通过 label 查找元数据 + for label, fieldname in label2field.items(): + if fieldname == key: + # 找到了对应的 label,尝试查找元数据 + field_info_from_label = field_metadata.get(fieldname, {}) + if not max_length: + max_length = field_info_from_label.get('max_length') + break + + # 执行截断 + if max_length and isinstance(max_length, (int, float)): + max_length_int = int(max_length) + if len(value) > max_length_int: + # 自动截断超长字段 + original_length = len(value) + processed_record_data[key] = value[:max_length_int] + truncated_fields.append({ + "field": key, + "field_label": field_info.get('label', key), + "original_length": original_length, + "max_length": max_length_int, + "truncated_value": processed_record_data[key], + "truncation_method": "元数据截断" + }) + else: + processed_record_data[key] = value + else: + processed_record_data[key] = value + else: + processed_record_data[key] = value + + # 调用 Jingrow 创建记录 + try: + # 直接调用 Page.create 以获取详细的错误信息 + from jingrow.model.page import get_page_instance + pg = get_page_instance(record_type) + result = pg.create(processed_record_data) + + # 初始化 error_msg,确保在所有情况下都有值 + error_msg = '创建记录失败' + + if not isinstance(result, dict) or not result.get('success'): + # 获取详细错误信息 + error_msg = result.get('error', '创建记录失败') + error_detail = result.get('detail', '') + if error_detail: + error_msg = f"{error_msg}: {error_detail}" + + # 添加更详细的调试信息 + debug_info = { + "record_type": record_type, + "mapped_fields_count": len(processed_record_data), + "field_names": list(processed_record_data.keys()), + "response": result, + "label2field_sample": dict(list(label2field.items())[:10]) if label2field else {}, + "default_site_configured": bool(default_site), + "default_site_value": default_site if default_site else None, + "site_field_in_record_data": site_fieldname in processed_record_data if 'site_fieldname' in locals() else False, + "field_metadata_sample": dict(list(field_metadata.items())[:5]) if field_metadata else {}, + "title_field_length": len(processed_record_data.get('title', '')) if 'title' in processed_record_data else None, + "title_field_in_metadata": 'title' in field_metadata, + "title_max_length": field_metadata.get('title', {}).get('max_length') if 'title' in field_metadata else None + } + + # 如果有字段被截断,添加到调试信息 + if 'truncated_fields' in locals() and truncated_fields: + debug_info["truncated_fields"] = truncated_fields + else: + # 如果没有截断信息,但 title 字段存在且超长,说明截断逻辑可能没有执行 + if 'title' in processed_record_data: + title_value = processed_record_data['title'] + if isinstance(title_value, str) and len(title_value) > 140: + debug_info["truncation_warning"] = f"title 字段长度为 {len(title_value)},超过 140,但未执行截断。可能原因:max_length 未正确获取。" + + # 如果 default_site 是 URL,添加警告 + if default_site and str(default_site).strip().startswith(('http://', 'https://')): + debug_info["site_config_warning"] = "default_site 配置看起来是 URL,但 site 字段需要的是网站记录的名称(name),不是 URL。请检查网站设置,找到网站记录的名称并更新配置。" + + # 如果有失败的映射,添加到调试信息中 + if 'failed_mappings' in locals(): + debug_info["failed_mappings"] = failed_mappings + + failed_records.append({ + "index": index, + "data": item_data, + "error": error_msg, + "record_data": processed_record_data, + "debug": debug_info + }) + continue + + created = result.get('data', {}) + if not created: + debug_info = { + "record_type": record_type, + "response": result + } + if 'truncated_fields' in locals() and truncated_fields: + debug_info["truncated_fields"] = truncated_fields + + failed_records.append({ + "index": index, + "data": item_data, + "error": "创建记录失败:返回数据为空", + "record_data": processed_record_data, + "debug": debug_info + }) + continue + + # 记录成功创建,但如果有字段被截断,也记录下来 + if 'truncated_fields' in locals() and truncated_fields: + # 在成功创建的情况下,截断信息可以通过返回结果查看 + pass + + except Exception as create_error: + debug_info = { + "record_type": record_type, + "mapped_fields_count": len(processed_record_data), + "field_names": list(processed_record_data.keys()), + "exception_type": type(create_error).__name__ + } + if 'truncated_fields' in locals() and truncated_fields: + debug_info["truncated_fields"] = truncated_fields + + failed_records.append({ + "index": index, + "data": item_data, + "error": f"创建记录异常: {str(create_error)}", + "record_data": processed_record_data, + "debug": debug_info + }) + continue + + created_name = created.get("name") or record_data.get("name") + record_info = { + "index": index, + "name": created_name, + "pagetype": record_type + } + + # 如果有字段被截断,添加到记录信息中 + if 'truncated_fields' in locals() and truncated_fields: + record_info["truncated_fields"] = truncated_fields + + created_records.append(record_info) + + except Exception as e: + failed_records.append({ + "index": index, + "data": item_data, + "error": str(e) + }) + continue + + # 构建返回结果 + result = { + "success": True, + "total_count": len(data_array), + "created_count": len(created_records), + "failed_count": len(failed_records), + "created_records": created_records + } + + if failed_records: + result["failed_records"] = failed_records + result["message"] = f"成功创建 {len(created_records)} 条记录,失败 {len(failed_records)} 条" + else: + result["message"] = f"成功创建 {len(created_records)} 条记录" + + return result + + except Exception as e: + # 构建详细的错误信息 + error_details = { + "error": str(e), + "record_type": record_type if 'record_type' in locals() else None, + "inputs": inputs if 'inputs' in locals() else {}, + "field_map": field_map if 'field_map' in locals() else [] + } + + print(f"BatchCreateRecords异常: {json.dumps(error_details, ensure_ascii=False, indent=2)}") + + return { + "success": False, + "error": str(e), + "error_details": error_details + } + diff --git a/apps/jingrow/jingrow/ai/nodes/web_scrapers/pyproject.toml b/apps/jingrow/jingrow/ai/nodes/web_scrapers/pyproject.toml new file mode 100644 index 0000000..30c764e --- /dev/null +++ b/apps/jingrow/jingrow/ai/nodes/web_scrapers/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "jingrow-node-web-scrapers" +version = "1.0.0" +requires-python = ">=3.10" +dependencies = [ + "beautifulsoup4", + "crawl4ai", + "playwright", +] + +[tool.jingrow.after_install] +commands = [ + "python -m playwright install-deps", + "python -m playwright install" +] \ No newline at end of file diff --git a/apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.json b/apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.json new file mode 100644 index 0000000..5fe96fd --- /dev/null +++ b/apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.json @@ -0,0 +1,181 @@ +{ + "metadata": { + "type": "web_scrapers", + "label": "网站采集", + "icon": "fa-spider", + "color": "rgba(71, 180, 133, 1)", + "description": "基于 Crawl4AI 采集网站产品信息(标题、价格、描述、图片等),支持单产品详情页和产品列表页两种模式", + "group": "数据", + "component_type": "GenericNode" + }, + "properties": { + "url": { + "type": "string", + "title": "目标URL", + "description": "要采集的网站URL,支持 Jinja2 模板语法,如:{{ product_url }}", + "format": "uri", + "minLength": 1 + }, + "crawl_mode": { + "type": "string", + "title": "爬取模式", + "description": "选择爬取模式:single(单产品详情页)或 list(产品列表页,支持分页)", + "default": "single", + "enum": [ + "single", + "list" + ], + "enumNames": [ + "单产品详情页", + "产品列表页" + ] + }, + "max_pages": { + "type": "integer", + "title": "最大页数", + "description": "列表页模式下的最大爬取页数(防止无限循环)", + "default": 100, + "minimum": 1, + "maximum": 1000 + }, + "enrich_details": { + "type": "boolean", + "title": "补充产品详情", + "description": "列表页模式下,是否爬取每个产品的详情页以补充详细信息(图片、价格、公司信息、Product Details等)" + }, + "use_ai_extraction": { + "type": "boolean", + "title": "使用AI提取", + "description": "如果启用,将使用AI模型从网页内容中智能提取结构化产品信息(在手动提取的基础上进一步优化)" + }, + "ai_system_message": { + "type": "string", + "title": "系统消息", + "description": "定义AI的角色和行为", + "default": "你是一个专业的产品信息提取助手,请从网页内容中准确提取产品信息。", + "format": "textarea", + "rows": 5 + }, + "text_model": { + "type": "string", + "title": "AI模型", + "description": "选择要使用的AI模型(与 ai_content_generation 节点保持一致)", + "default": "Jingrow", + "enum": [ + "Jingrow", + "DeepSeek", + "Doubao", + "ChatGPT", + "Qwen", + "Wenxin", + "Hunyuan", + "Spark" + ], + "enumNames": [ + "Jingrow", + "DeepSeek", + "Doubao", + "ChatGPT", + "Qwen", + "文心", + "混元", + "讯飞星火" + ] + }, + "max_tokens": { + "type": "integer", + "title": "最大Token数", + "description": "限制生成内容的最大长度", + "default": 2000 + }, + "ai_temperature": { + "type": "number", + "title": "随机性 (Temperature)", + "description": "控制输出的随机性,0为确定性,2为最大随机性", + "default": 0.7 + }, + "ai_top_p": { + "type": "number", + "title": "多样性 (Top-p)", + "description": "控制词汇选择的多样性", + "default": 0.9 + }, + "hide_input_handle": { + "type": "boolean", + "title": "隐藏输入接口", + "description": "是否隐藏节点的输入连接点" + } + }, + "required": [ + "url" + ], + "_layout": { + "tabs": [ + { + "id": "tab_1", + "label": "基础配置", + "sections": [ + { + "id": "section_1", + "label": "基本属性", + "columns": [ + { + "id": "column_1", + "label": "主要设置", + "fields": [ + "url", + "crawl_mode", + "max_pages", + "enrich_details", + "use_ai_extraction", + "ai_system_message" + ] + } + ] + } + ] + }, + { + "id": "tab_2", + "label": "AI模型设置", + "sections": [ + { + "id": "section_2", + "label": "模型参数", + "columns": [ + { + "id": "column_2", + "label": "模型配置", + "fields": [ + "text_model", + "max_tokens", + "ai_temperature", + "ai_top_p" + ] + } + ] + } + ] + }, + { + "id": "tab_3", + "label": "节点设置", + "sections": [ + { + "id": "section_3", + "label": "节点接口", + "columns": [ + { + "id": "column_3", + "fields": [ + "hide_input_handle" + ] + } + ] + } + ] + } + ], + "activeTab": "tab_3" + } +} \ No newline at end of file diff --git a/apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.py b/apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.py new file mode 100644 index 0000000..cdb059d --- /dev/null +++ b/apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.py @@ -0,0 +1,982 @@ +import json +import re +import asyncio +from typing import Dict, Any, Optional, List +from urllib.parse import urljoin, urlparse, parse_qs, urlencode + +import jingrow +from jingrow.utils.jinja import render_template +from bs4 import BeautifulSoup +from crawl4ai import AsyncWebCrawler + + +# ==================== 产品列表页提取函数 ==================== + +def extract_current_category(soup): + """提取当前分类名称""" + # 方法1: 查找 div.sr-txt-title 中的 h1.sr-txt-h2 + title_div = soup.find('div', class_='sr-txt-title') + if title_div: + h1_tag = title_div.find('h1', class_='sr-txt-h2') + if h1_tag: + category_name = h1_tag.get_text(strip=True) + if category_name: + return category_name + + # 方法2: 直接查找 h1.sr-txt-h2 + category_title = soup.find('h1', class_='sr-txt-h2') + if category_title: + category_name = category_title.get_text(strip=True) + if category_name: + return category_name + + # 方法3: 查找包含 itemscope 的 div.sr-txt-title 中的 h1 + title_divs = soup.find_all('div', class_='sr-txt-title') + for div in title_divs: + if div.get('itemscope'): + h1_tag = div.find('h1') + if h1_tag: + category_name = h1_tag.get_text(strip=True) + if category_name: + return category_name + + return None + + +def extract_products_from_page(soup, base_url=''): + """从单个页面提取产品信息""" + products = [] + + # 找到所有产品项容器 + product_items = soup.find_all('div', class_='prod-result-item') + + # 遍历每个产品项,提取信息 + for item in product_items: + product = {} + + # 提取产品ID(从data-prodid属性) + product['product_id'] = item.get('data-prodid', '') + + # 提取产品标题和链接(从 prod-title 中的 a 标签) + title_div = item.find('div', class_='prod-title') + if title_div: + title_link = title_div.find('a') + if title_link: + product['title'] = title_link.get_text(strip=True) + product['url'] = title_link.get('href', '') + # 确保URL是完整的 + if product['url'] and not product['url'].startswith('http'): + if product['url'].startswith('//'): + product['url'] = 'https:' + product['url'] + elif product['url'].startswith('/'): + if base_url: + parsed = urlparse(base_url) + product['url'] = f"{parsed.scheme}://{parsed.netloc}" + product['url'] + else: + # 从当前页面URL推断基础URL + product['url'] = base_url + product['url'] + + # 提取产品价格(从 prod-price 中) + price_div = item.find('div', class_='prod-price') + if price_div: + # 方法1: 从title属性获取完整价格信息 + price_title = price_div.get('title', '') + if price_title: + product['price'] = price_title + else: + # 方法2: 从内部文本提取 + price_value = price_div.find('span', class_='value') + price_unit = price_div.find('span', class_='unit') + if price_value: + price_text = price_value.get_text(strip=True) + if price_unit: + price_text += ' ' + price_unit.get_text(strip=True) + product['price'] = price_text + + # 提取最小订购量(从 min-order 中) + min_order_div = item.find('div', class_='min-order') + if min_order_div: + # 从title属性获取 + min_order_title = min_order_div.get('title', '') + if min_order_title: + product['min_order'] = min_order_title + else: + # 从内部文本提取 + min_order_value = min_order_div.find('span', class_='value') + if min_order_value: + product['min_order'] = min_order_value.get_text(strip=True) + + # 提取产品图片(从 prod-image 中的 img 标签) + image_div = item.find('div', class_='prod-image') + if image_div: + img_tag = image_div.find('img') + if img_tag: + # 优先使用 data-original,其次使用 src + img_url = img_tag.get('data-original') or img_tag.get('src', '') + if img_url: + # 确保URL是完整的 + if img_url.startswith('//'): + img_url = 'https:' + img_url + elif img_url.startswith('/'): + if base_url: + parsed = urlparse(base_url) + img_url = f"{parsed.scheme}://{parsed.netloc}" + img_url + else: + img_url = base_url + img_url + product['image'] = img_url + product['image_alt'] = img_tag.get('alt', '') + + # 检查是否有视频标记 + video_mark = item.find('div', class_='prod-video-mark') + product['has_video'] = video_mark is not None + + # 只添加有标题的产品(确保数据完整) + if product.get('title'): + products.append(product) + + return products + + +def get_next_page_url(soup, current_url): + """检查是否有下一页,如果有则返回下一页URL""" + # 查找分页器 + pager = soup.find('div', class_='pager') + if not pager: + return None + + # 方法1: 查找 class 包含 "next" 的 a 标签 + # 排除包含 "disClick" 的(禁用状态,表示没有下一页) + next_links = pager.find_all('a', class_=lambda x: x and 'next' in x if x else False) + + for next_link in next_links: + # 检查是否被禁用 + classes = next_link.get('class', []) + if 'disClick' in classes: + continue # 跳过禁用的链接 + + # 获取href属性 + next_url = next_link.get('href') + if not next_url: + continue + + # 过滤掉 JavaScript URL + if next_url.startswith('javascript:'): + # 尝试从 JavaScript 函数中提取页码,然后构建 URL + # 例如:javascript:submitSearchByPage(2) -> 提取页码 2 + page_match = re.search(r'submitSearchByPage\((\d+)\)', next_url) + if page_match: + page_num = page_match.group(1) + # 尝试从当前 URL 构建下一页 URL + # 查找当前 URL 中的页码参数 + parsed = urlparse(current_url) + query_params = {} + if parsed.query: + query_params = parse_qs(parsed.query) + + # 更新页码参数 + # 常见的页码参数名:page, p, pageNum, pageNumber, pn + page_param_found = False + for param_name in ['page', 'p', 'pageNum', 'pageNumber', 'pn', 'currentPage']: + if param_name in query_params: + query_params[param_name] = [page_num] + page_param_found = True + break + + # 如果没有找到页码参数,尝试添加 + if not page_param_found: + query_params['page'] = [page_num] + + # 重建 URL + new_query = urlencode(query_params, doseq=True) + next_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{new_query}" + else: + # 无法从 JavaScript URL 提取页码,跳过 + continue + + # 确保URL是完整的 + if next_url.startswith('//'): + next_url = 'https:' + next_url + elif next_url.startswith('/'): + # 从当前URL提取基础URL + parsed = urlparse(current_url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + next_url = urljoin(base_url, next_url) + elif not next_url.startswith(('http://', 'https://', 'file://', 'raw:')): + # 相对路径 + parsed = urlparse(current_url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + next_url = urljoin(base_url, next_url) + + # 验证 URL 格式 + if not next_url.startswith(('http://', 'https://', 'file://', 'raw:')): + continue + + # 确保下一页URL与当前URL不同 + if next_url != current_url: + return next_url + + # 方法2: 如果方法1失败,尝试查找所有分页链接,找到当前页的下一个 + all_page_links = pager.find_all('a') + current_page_num = None + + # 尝试从当前 URL 提取页码 + parsed = urlparse(current_url) + if parsed.query: + query_params = parse_qs(parsed.query) + for param_name in ['page', 'p', 'pageNum', 'pageNumber', 'pn', 'currentPage']: + if param_name in query_params: + try: + current_page_num = int(query_params[param_name][0]) + break + except (ValueError, IndexError): + pass + + # 如果找到了当前页码,尝试找到下一页 + if current_page_num is not None: + next_page_num = current_page_num + 1 + # 查找包含下一页数字的链接 + for link in all_page_links: + link_text = link.get_text(strip=True) + try: + if int(link_text) == next_page_num: + href = link.get('href') + if href and not href.startswith('javascript:'): + # 构建下一页 URL + query_params = parse_qs(parsed.query) + for param_name in ['page', 'p', 'pageNum', 'pageNumber', 'pn', 'currentPage']: + if param_name in query_params: + query_params[param_name] = [str(next_page_num)] + break + else: + query_params['page'] = [str(next_page_num)] + + new_query = urlencode(query_params, doseq=True) + next_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{new_query}" + if next_url != current_url: + return next_url + except ValueError: + continue + + return None + + +# ==================== 产品详情页提取函数 ==================== + +def extract_product_detail_images(soup, base_url=''): + """从产品详情页提取所有图片""" + images = [] + + # 查找图片容器 + slide_page = soup.find('div', class_='sr-proMainInfo-slide-page') + if slide_page: + # 查找所有图片项 + slide_ul = slide_page.find('ul', class_='sr-proMainInfo-slide-pageUl') + if slide_ul: + # 查找所有 li.J-pic-dot + pic_items = slide_ul.find_all('li', class_='J-pic-dot') + for item in pic_items: + img_tag = item.find('img') + if img_tag: + img_url = img_tag.get('data-original') or img_tag.get('src', '') + if img_url: + # 确保URL是完整的 + if img_url.startswith('//'): + img_url = 'https:' + img_url + elif img_url.startswith('/'): + if base_url: + parsed = urlparse(base_url) + img_url = f"{parsed.scheme}://{parsed.netloc}" + img_url + else: + img_url = base_url + img_url + if img_url not in images: + images.append(img_url) + + return images + + +def extract_product_detail_prices(soup): + """从产品详情页提取价格信息(统一格式:{"US$718.00":"5-19"})""" + prices = [] + + # 方法1: 提取swiper格式的价格(多个价格项) + swiper_wrapper = soup.find('div', class_='swiper-wrapper-div') + if swiper_wrapper: + # 查找所有价格项 + slide_divs = swiper_wrapper.find_all('div', class_='swiper-slide-div') + for slide_div in slide_divs: + # 查找价格容器 + money_container = slide_div.find('div', class_='swiper-money-container') + # 查找数量容器 + unit_container = slide_div.find('div', class_='swiper-unit-container') + + if money_container and unit_container: + price_text = money_container.get_text(strip=True) + unit_text = unit_container.get_text(strip=True) + + # 提取数量范围(移除"Pieces"等词,只保留数字范围) + # 使用正则表达式提取数字和符号(如 "5-19", "50+", "20-49") + # 匹配数字范围格式:数字-数字 或 数字+ + unit_match = re.search(r'(\d+\s*[-+]\s*\d+|\d+\+)', unit_text) + if unit_match: + unit_text = unit_match.group(1).strip() + else: + # 如果没匹配到,移除"Pieces"等词,提取数字 + unit_text = unit_text.replace('Pieces', '').replace('Piece', '').strip() + # 提取数字,如果没有范围,添加+号表示最小数量 + num_match = re.search(r'(\d+)', unit_text) + if num_match: + unit_text = num_match.group(1) + '+' + unit_text = ' '.join(unit_text.split()) # 清理多余空格 + + if price_text and unit_text: + # 格式: {"US$718.00":"5-19"} + price_obj = {price_text: unit_text} + prices.append(price_obj) + + # 方法2: 如果swiper格式没有找到,尝试提取only-one-price格式(单个价格范围) + if not prices: + only_one_price = soup.find('div', class_='only-one-priceNum') + if only_one_price: + # 提取价格范围 + price_td = only_one_price.find('td') + if price_td: + price_span = price_td.find('span', class_='only-one-priceNum-td-left') + if price_span: + price_text = price_span.get_text(strip=True) + + # 提取数量信息 + price_info_td = only_one_price.find('td', class_='sa-only-property-price') + unit_text = '' + has_moq = False + if price_info_td: + # 先检查整个容器的文本是否包含MOQ + container_text = price_info_td.get_text(strip=True) + if 'MOQ' in container_text or 'moq' in container_text.lower(): + has_moq = True + + # 查找所有span标签 + quantity_spans = price_info_td.find_all('span') + for span in quantity_spans: + span_text = span.get_text(strip=True) + # 提取数字(可能是 "5 Pieces" 或 "5") + num_match = re.search(r'(\d+)', span_text) + if num_match: + unit_text = num_match.group(1) + # 检查当前span是否包含MOQ + if 'MOQ' in span_text or 'moq' in span_text.lower(): + has_moq = True + break + + # 如果提取到了价格,即使没有数量信息也保存(数量设为空或默认值) + if price_text: + # 如果没有提取到数量,使用默认值"1+"(表示最小订购量为1) + if not unit_text: + unit_text = '1+' + # only-one-price格式通常表示最小订购量,默认添加+号 + elif not unit_text.endswith('+'): + unit_text += '+' + # 统一格式: {"US$1,018.00 - 1,118.00":"5+"} + price_obj = {price_text: unit_text} + prices.append(price_obj) + + return prices + + +def extract_company_info(soup, base_url=''): + """从产品详情页提取公司信息""" + company_info = { + 'company_name': '', + 'company_url': '' + } + + # 查找公司信息容器 + com_div = soup.find('div', class_='sr-com') + if com_div: + # 查找公司信息 + com_info = com_div.find('div', class_='sr-com-info') + if com_info: + # 查找公司标题 + title_div = com_info.find('div', class_='title-txt') + if title_div: + company_link = title_div.find('a') + if company_link: + company_info['company_name'] = company_link.get('title', '') or company_link.get_text(strip=True) + company_info['company_url'] = company_link.get('href', '') + + # 确保URL是完整的 + if company_info['company_url'] and not company_info['company_url'].startswith('http'): + if company_info['company_url'].startswith('//'): + company_info['company_url'] = 'https:' + company_info['company_url'] + elif company_info['company_url'].startswith('/'): + if base_url: + parsed = urlparse(base_url) + company_info['company_url'] = f"{parsed.scheme}://{parsed.netloc}" + company_info['company_url'] + else: + company_info['company_url'] = base_url + company_info['company_url'] + + return company_info + + +def extract_product_details(soup): + """从产品详情页提取Product Details表格信息""" + details = {} + + # 方法1: 查找包含"Product Details"文本的元素 + details_elements = soup.find_all(string=lambda text: text and 'Product Details' in text) + + for details_text in details_elements: + # 向上查找包含表格的容器 + parent = details_text.find_parent() + + # 在父元素的后续兄弟元素中查找表格 + current = parent + for _ in range(5): # 最多查找5层 + if current: + # 在当前元素及其后续兄弟中查找表格 + table = current.find('table') + if not table: + # 查找下一个兄弟元素 + next_sibling = current.find_next_sibling() + if next_sibling: + table = next_sibling.find('table') + + if table: + # 提取表格数据 + rows = table.find_all('tr') + for row in rows: + th = row.find('th') + td = row.find('td') + if th and td: + key = th.get_text(strip=True).replace(':', '').strip() + value = td.get_text(strip=True) + if key and value: + details[key] = value + break # 找到表格后退出 + + # 向上查找 + current = current.find_parent() + else: + break + + # 方法2: 如果没找到,直接查找所有包含th和td的表格 + if not details: + tables = soup.find_all('table') + for table in tables: + rows = table.find_all('tr') + # 检查表格是否包含Product Details相关的key + product_detail_keys = ['Customization', 'Type', 'Material', 'Application', 'Max Bear Weight', 'Warranty'] + for row in rows: + th = row.find('th') + td = row.find('td') + if th and td: + key = th.get_text(strip=True).replace(':', '').strip() + value = td.get_text(strip=True) + # 如果key匹配产品详情的关键词,则提取 + if key and value and any(keyword in key for keyword in product_detail_keys): + details[key] = value + + return details + + +def extract_product_title(soup): + """提取产品标题""" + # 优先级:h1.sr-proMainInfo-baseInfoH1 > span > og:title > title标签 + h1_tag = soup.find('h1', class_='sr-proMainInfo-baseInfoH1') + if h1_tag: + title_span = h1_tag.find('span') + if title_span: + return title_span.get_text(strip=True) + + # 如果h1没有找到,尝试从og:title提取 + og_title = soup.find('meta', property='og:title') + if og_title and og_title.get('content'): + return og_title.get('content').replace('[Hot Item]', '').strip() + + # 如果还是没有,从title标签提取 + title_tag = soup.find('title') + if title_tag: + title_text = title_tag.get_text(strip=True) + # 移除 "- Smith Machine and Bench Press price" 等后缀 + if ' - ' in title_text: + return title_text.split(' - ')[0].strip() + else: + return title_text + + return '' + + +# ==================== 爬取函数 ==================== + +async def crawl_product_detail(crawler, product_url, product_index=None, total_products=None, base_url=''): + """爬取单个产品详情页并提取详细信息""" + try: + result = await crawler.arun(url=product_url, wait_for="networkidle", page_timeout=30000) + + if not result.success: + return {} + + soup = BeautifulSoup(result.html, 'html.parser') + detail_info = {} + + # 提取标题 + title = extract_product_title(soup) + if title: + detail_info['title'] = title + + # 提取图片 + images = extract_product_detail_images(soup, base_url) + if images: + detail_info['images'] = images + + # 提取价格 + prices = extract_product_detail_prices(soup) + if prices: + detail_info['prices'] = prices + + # 提取公司信息 + company_info = extract_company_info(soup, base_url) + if company_info.get('company_name'): + detail_info['company_name'] = company_info['company_name'] + detail_info['company_url'] = company_info['company_url'] + + # 提取Product Details + product_details = extract_product_details(soup) + if product_details: + detail_info['Product Details'] = product_details + + return detail_info + + except Exception as e: + return {} + + +async def crawl_single_list_page(crawler, url, page_num, base_url=''): + """爬取单个产品列表页""" + result = await crawler.arun(url=url, wait_for="networkidle") + + if not result.success: + return None, None, None + + soup = BeautifulSoup(result.html, 'html.parser') + + # 提取产品 + products = extract_products_from_page(soup, base_url) + + # 检查是否有下一页 + next_url = get_next_page_url(soup, url) + + # 只在第一页提取分类名称 + category_name = None + if page_num == 1: + category_name = extract_current_category(soup) + + return products, next_url, category_name + + +async def crawl_product_list(crawler, start_url, max_pages=100, enrich_details=False, base_url=''): + """爬取产品列表(支持分页)""" + all_products = [] + current_url = start_url + page_num = 1 + category_name = None + + while current_url and page_num <= max_pages: + # 爬取当前页 + products, next_url, cat_name = await crawl_single_list_page(crawler, current_url, page_num, base_url) + + # 保存第一页提取的分类名称 + if page_num == 1 and cat_name: + category_name = cat_name + + if products: + all_products.extend(products) + else: + break + + # 检查是否有下一页 + if next_url: + current_url = next_url + page_num += 1 + # 添加延迟,避免请求过快 + await asyncio.sleep(1) + else: + break + + # 如果需要补充详情,为每个产品爬取详情页 + if enrich_details and all_products: + for i, product in enumerate(all_products, 1): + if not product.get('url'): + continue + + # 爬取产品详情 + detail_info = await crawl_product_detail( + crawler, + product['url'], + i, + len(all_products), + base_url + ) + + # 合并详细信息到产品数据 + if detail_info: + # 更新图片(使用详情页的图片列表) + if 'images' in detail_info: + product['images'] = detail_info['images'] + # 保留第一张图片作为主图 + if detail_info['images']: + product['image'] = detail_info['images'][0] + + # 更新价格(使用详情页的价格) + if 'prices' in detail_info: + product['prices'] = detail_info['prices'] + + # 添加公司信息 + if 'company_name' in detail_info: + product['company_name'] = detail_info['company_name'] + if 'company_url' in detail_info: + product['company_url'] = detail_info['company_url'] + + # 添加Product Details + if 'Product Details' in detail_info: + product['Product Details'] = detail_info['Product Details'] + + # 添加延迟,避免请求过快 + if i < len(all_products): + await asyncio.sleep(0.5) + + # 将分类信息添加到结果中 + result = { + 'products': all_products, + 'category_name': category_name, + 'total_pages': page_num, + 'total_products': len(all_products) + } + + return result + + +# ==================== 节点执行函数 ==================== + +def get_branch_upstream_nodes(current_node_id, flow_data): + """获取同一分支的所有上游节点ID列表""" + if not current_node_id or not flow_data: + return [] + + edges = flow_data.get("edges", []) + upstream_nodes = set() + stack = [current_node_id] + + while stack: + node_id = stack.pop() + for edge in edges: + if edge.get("target") == node_id: + source_id = edge.get("source") + if source_id and source_id not in upstream_nodes: + upstream_nodes.add(source_id) + stack.append(source_id) + + return list(upstream_nodes) + + +def execute(context=None, inputs=None, config=None, **kwargs): + """ + 网站产品信息采集节点 - 基于 Crawl4AI + 支持两种模式: + 1. 单产品详情页模式:爬取单个产品详情页 + 2. 产品列表页模式:爬取产品列表(支持分页),可选择是否补充详情 + """ + if context is None: + context = kwargs.get("context", {}) + if inputs is None: + inputs = kwargs.get("inputs", {}) + if config is None: + config = kwargs.get("config", {}) + + try: + # 参数处理 + if not isinstance(inputs, dict): + try: + inputs = json.loads(inputs) if isinstance(inputs, str) else {} + except Exception: + inputs = {} + + # 获取配置 + url = config.get("url") or inputs.get("url") + if not url: + return {"success": False, "error": "URL 必填"} + + # 支持从上游节点获取 URL + current_node_id = context.get("current_node_id") + flow_data = context.get("flow_data", {}) + branch_upstream_nodes = get_branch_upstream_nodes(current_node_id, flow_data) + + if context and "node_results" in context and branch_upstream_nodes: + node_results = context["node_results"] + for upstream_node_id in branch_upstream_nodes: + if upstream_node_id in node_results: + upstream_result = node_results[upstream_node_id] + if isinstance(upstream_result, dict): + # 解析 AI 原始响应中的 JSON + if "ai_raw_response" in upstream_result and isinstance(upstream_result["ai_raw_response"], str): + try: + raw_response = upstream_result["ai_raw_response"].strip() + start_brace = raw_response.find('{') + end_brace = raw_response.rfind('}') + + if start_brace != -1 and end_brace != -1 and end_brace > start_brace: + json_content = raw_response[start_brace:end_brace + 1] + parsed_json = json.loads(json_content) + if isinstance(parsed_json, dict): + for k, v in parsed_json.items(): + if k not in inputs or not inputs.get(k): + inputs[k] = v + except: + pass + + # 平铺其他字段到 inputs + for k, v in upstream_result.items(): + if k not in inputs or not inputs.get(k): + inputs[k] = v + + # 如果 inputs 中有 url,优先使用 + if inputs.get("url") and not url: + url = inputs.get("url") + + # 构建模板上下文(支持 Jinja2 模板) + template_context = inputs.copy() + if context and context.get("flow_id"): + flow_id = context["flow_id"] + agent_data = jingrow.get_pg("Local Ai Agent", str(flow_id)) + if agent_data: + for key, value in agent_data.items(): + if key not in template_context or not template_context.get(key): + template_context[key] = value + + # 如果 URL 包含模板语法,进行渲染 + try: + url = render_template(url, template_context) + except Exception as e: + return {"success": False, "error": f"URL 模板渲染失败: {str(e)}"} + + # URL 验证和规范化 + if not url or not isinstance(url, str): + return {"success": False, "error": "URL 不能为空"} + + url = url.strip() + if not url: + return {"success": False, "error": "URL 不能为空"} + + # 确保 URL 有协议前缀 + if not url.startswith(('http://', 'https://', 'file://', 'raw:')): + if url.startswith('//'): + url = 'https:' + url + elif not url.startswith('/'): + url = 'https://' + url + else: + return {"success": False, "error": f"URL 格式不正确: {url}"} + + # 获取模式配置 + crawl_mode = config.get("crawl_mode", "single") # "single" 或 "list" + max_pages = config.get("max_pages", 100) # 列表页模式的最大页数 + enrich_details = config.get("enrich_details", False) # 是否补充产品详情 + + # AI 提取配置(仅用于单产品模式) + use_ai_extraction = config.get("use_ai_extraction", False) + text_model = config.get("text_model", "Jingrow") + ai_system_message = config.get("ai_system_message", "你是一个专业的产品信息提取助手,请从网页内容中准确提取产品信息。") + max_tokens = config.get("max_tokens", 2000) + ai_temperature = config.get("ai_temperature", 0.7) + ai_top_p = config.get("ai_top_p", 0.9) + + # 使用 Crawl4AI 进行爬取 + try: + from crawl4ai import AsyncWebCrawler + import asyncio + except ImportError: + return { + "success": False, + "error": "Crawl4AI 未安装,请运行: uv pip install crawl4ai" + } + + async def crawl_website(): + async with AsyncWebCrawler(verbose=True) as crawler: + # 提取基础URL用于相对路径处理 + parsed = urlparse(url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + + if crawl_mode == "list": + # 产品列表页模式 + result = await crawl_product_list( + crawler, + url, + max_pages=max_pages, + enrich_details=enrich_details, + base_url=base_url + ) + + # 构建返回结果 + return { + "success": True, + "url": url, + "mode": "list", + "category_name": result.get('category_name'), + "total_pages": result.get('total_pages', 0), + "total_products": result.get('total_products', 0), + "products": result.get('products', []) + } + else: + # 单产品详情页模式 + result = await crawler.arun( + url=url, + wait_for="networkidle", + page_timeout=30000, + ) + + if not result.success: + return { + "success": False, + "error": f"爬取失败: {result.error_message or '未知错误'}" + } + + html_content = result.html + markdown_content = result.markdown + + # 初始化 BeautifulSoup + soup = BeautifulSoup(html_content, 'html.parser') + + # 提取产品信息 + extracted_data = {} + + # 提取标题 + title = extract_product_title(soup) + if title: + extracted_data['title'] = title + + # 提取图片 + images = extract_product_detail_images(soup, base_url) + if images: + extracted_data['image'] = images + + # 提取价格 + prices = extract_product_detail_prices(soup) + if prices: + extracted_data['price'] = prices + + # 提取公司信息 + company_info = extract_company_info(soup, base_url) + if company_info.get('company_name'): + extracted_data['company_name'] = company_info['company_name'] + extracted_data['url'] = company_info['company_url'] + + # 提取Product Details + product_details = extract_product_details(soup) + if product_details: + extracted_data['Product Details'] = product_details + + # 如果启用了 AI 提取,使用 AI 模型进一步提取和补充信息 + if use_ai_extraction: + try: + from jingrow.utils.jingrow_cloud import call_ai_model + + # 构建 AI 提取提示词 + ai_prompt = f"""请从以下网页内容中提取产品信息,返回 JSON 格式。 + +URL: {url} + +网页内容(Markdown,前5000字符): +{markdown_content[:5000]} + +已提取的基础信息: +{json.dumps(extracted_data, ensure_ascii=False, indent=2)} + +请基于网页内容和完善已有信息,返回完整的 JSON 格式(严格按照以下结构): +{{ + "title": "产品标题", + "price": [{{"US$877.00": "2-9"}}, {{"US$850.00": "10+"}}], // 价格数组,每个元素是 {{"价格": "数量范围"}} 的格式 + "image": ["图片URL1", "图片URL2"], // 图片URL数组 + "Product Details": "产品详细信息文本", + "company_name": "公司名称", + "url": "公司主页URL" +}} + +注意: +- price 必须是数组,每个元素是对象格式 {{"价格": "数量范围"}} +- image 必须是字符串数组 +- 如果某些信息不存在,使用空字符串或空数组""" + + # 调用 AI 模型 + ai_res = call_ai_model( + ai_prompt, + text_model=text_model, + ai_temperature=ai_temperature, + ai_top_p=ai_top_p, + ai_system_message=ai_system_message, + max_tokens=max_tokens, + image_urls=None, + ) + + if ai_res.get("success"): + ai_response = ai_res.get("response", "") + # 尝试解析 JSON + try: + start_brace = ai_response.find('{') + end_brace = ai_response.rfind('}') + if start_brace != -1 and end_brace != -1 and end_brace > start_brace: + json_str = ai_response[start_brace:end_brace + 1] + ai_data = json.loads(json_str) + if isinstance(ai_data, dict): + # 合并 AI 提取的数据,AI 提取的数据优先 + for key, value in ai_data.items(): + if value is not None and value != "" and (not isinstance(value, list) or len(value) > 0): + extracted_data[key] = value + except json.JSONDecodeError: + pass + except Exception: + pass + + # 确保输出格式一致 + if "price" in extracted_data: + if not isinstance(extracted_data["price"], list): + if isinstance(extracted_data["price"], str): + extracted_data["price"] = [extracted_data["price"]] + else: + extracted_data["price"] = [] + + if "image" in extracted_data: + if not isinstance(extracted_data["image"], list): + extracted_data["image"] = [] + + # 确保所有必需字段存在 + final_data = { + "title": extracted_data.get("title", ""), + "price": extracted_data.get("price", []), + "image": extracted_data.get("image", []), + "Product Details": extracted_data.get("Product Details", ""), + "company_name": extracted_data.get("company_name", ""), + "url": extracted_data.get("url", "") + } + + # 构建返回结果 + return { + "success": True, + "url": url, + "mode": "single", + "extracted_data": final_data + } + + # 运行异步爬取 + try: + result = asyncio.run(crawl_website()) + return result + except Exception as e: + return { + "success": False, + "error": f"爬取执行失败: {str(e)}" + } + + except Exception as e: + return { + "success": False, + "error": f"网站采集节点执行失败: {str(e)}" + } + diff --git a/apps/jingrow/jingrow/utils/node_dependencies.py b/apps/jingrow/jingrow/utils/node_dependencies.py index 1cc0186..bd010ed 100644 --- a/apps/jingrow/jingrow/utils/node_dependencies.py +++ b/apps/jingrow/jingrow/utils/node_dependencies.py @@ -2,11 +2,13 @@ 节点依赖管理工具 支持节点独立管理自己的依赖,使用 uv pip install 直接安装 """ +import os +import hashlib import subprocess import logging from pathlib import Path from typing import Dict, Any, Optional, List -from jingrow.utils.path import get_jingrow_root +from jingrow.utils.path import get_jingrow_root, get_root_path logger = logging.getLogger(__name__) @@ -117,10 +119,22 @@ def install_node_dependencies(node_type: str, sync: bool = True) -> Dict[str, An ) if result.returncode == 0: + # 检查是否有后安装脚本需要执行 + after_install_result = _run_node_after_install(node_type, node_dir, project_root, content) + + message = f"节点 {node_type} 依赖安装成功" + if after_install_result: + if after_install_result.get("success"): + message += f" ({after_install_result.get('message', '后安装脚本执行成功')})" + else: + logger.warning(f"节点 {node_type} 后安装脚本执行失败: {after_install_result.get('error', '未知错误')}") + message += f" (后安装脚本警告: {after_install_result.get('error', '未知错误')})" + return { "success": True, - "message": f"节点 {node_type} 依赖安装成功", - "output": result.stdout + "message": message, + "output": result.stdout, + "post_install": after_install_result } else: error_msg = result.stderr or result.stdout @@ -222,6 +236,177 @@ def check_node_dependencies(node_type: str) -> Dict[str, Any]: return {"success": None, "message": f"检查依赖时出错: {str(e)}"} +def _run_node_after_install(node_type: str, node_dir: Path, project_root: Path, pyproject_content: str) -> Optional[Dict[str, Any]]: + """ + 执行节点的后安装脚本(如果配置了的话) + + 支持配置格式: + [tool.jingrow.after_install] + commands = [ + "python -m playwright install-deps", + "python -m playwright install" + ] + # 或单条命令 + script = "python -m playwright install-deps && python -m playwright install" + + Args: + node_type: 节点类型 + node_dir: 节点目录 + project_root: 项目根目录 + pyproject_content: pyproject.toml 文件内容 + + Returns: + 执行结果字典,如果没有配置后安装脚本则返回 None + """ + try: + commands = [] + + # 解析 [tool.jingrow.after_install] 配置 + in_after_install = False + in_commands_array = False + bracket_count = 0 + + for line in pyproject_content.split('\n'): + stripped = line.strip() + + # 检测 [tool.jingrow.after_install] 段 + if stripped.startswith('[tool.jingrow.after_install]'): + in_after_install = True + continue + + # 如果遇到新的段,停止解析 + if in_after_install and stripped.startswith('[') and not stripped.startswith('[tool.jingrow'): + break + + if in_after_install: + # 查找 commands = [...] 数组格式(推荐,支持多条命令) + if stripped.startswith('commands') and '=' in stripped and '[' in stripped: + in_commands_array = True + bracket_count = stripped.count('[') - stripped.count(']') + # 检查是否在同一行结束 + if ']' in stripped: + in_commands_array = False + # 提取同一行的命令 + if '[' in stripped and ']' in stripped: + # 提取数组内容 + array_content = stripped.split('[', 1)[1].rsplit(']', 1)[0] + for cmd in array_content.split(','): + cmd = cmd.strip().strip('"').strip("'") + if cmd: + commands.append(cmd) + continue + elif in_commands_array: + # 继续解析数组内容 + bracket_count += stripped.count('[') - stripped.count(']') + if bracket_count <= 0 and ']' in stripped: + in_commands_array = False + # 如果这一行只有 ],直接跳过 + if stripped.strip() == ']': + break + # 否则提取 ] 之前的内容 + stripped = stripped.rsplit(']', 1)[0] + + # 提取命令(去除引号和逗号) + cmd = stripped.strip('",\' ,') + # 跳过空行、注释和单独的 ] + if cmd and not cmd.startswith('#') and cmd != ']': + commands.append(cmd) + if bracket_count <= 0: + break + continue + # 查找 script = 配置(单条命令) + elif stripped.startswith('script') and '=' in stripped: + script_value = stripped.split('=', 1)[1].strip() + # 处理字符串格式(去除引号) + if script_value.startswith('"') and script_value.endswith('"'): + script_value = script_value[1:-1] + elif script_value.startswith("'") and script_value.endswith("'"): + script_value = script_value[1:-1] + if script_value: + commands = [script_value] + break + + if not commands: + return None + + # 将多条命令合并为一条脚本(用 && 连接) + script = ' && '.join(commands) + + # 检查是否已经执行过相同的脚本 + script_hash = hashlib.md5(script.encode('utf-8')).hexdigest() + # 将标记文件存储在项目根目录的缓存目录 + cache_dir = get_root_path() / '.cache' / 'node_install' + cache_dir.mkdir(parents=True, exist_ok=True) + after_install_marker = cache_dir / f"{node_type}.after_install_done" + + # 如果标记文件存在,检查脚本哈希值是否匹配 + if after_install_marker.exists(): + try: + with open(after_install_marker, 'r', encoding='utf-8') as f: + saved_hash = f.read().strip() + if saved_hash == script_hash: + logger.info(f"节点 {node_type} 的后安装脚本已执行过,跳过") + return { + "success": True, + "message": "后安装脚本已执行过,跳过", + "skipped": True + } + except Exception as e: + logger.warning(f"读取后安装标记文件失败: {str(e)},将重新执行") + + logger.info(f"执行节点 {node_type} 的后安装脚本: {script}") + + # 执行脚本(使用 shell 执行,支持 && 等操作符) + # 将脚本中的 python -m 命令替换为 uv run python -m,确保在正确的环境中执行 + import re + # 替换 python -m 为 uv run python -m(避免重复替换已存在的 uv run) + script = re.sub(r'(? Dict[str, Any]: """ 确保节点依赖已安装(检查并安装)