From cb1b3887e8a91106e72d1622adbe1f17561fba4b Mon Sep 17 00:00:00 2001 From: jingrow Date: Fri, 14 Nov 2025 04:09:44 +0800 Subject: [PATCH] add node: web_scrapers_create --- .../batch_create_records.py | 538 --------------- .../ai/nodes/web_scrapers/web_scrapers.json | 181 ----- .../pyproject.toml | 6 +- .../web_scrapers_create.json} | 71 +- .../web_scrapers_create.py} | 653 ++++++++++-------- 5 files changed, 409 insertions(+), 1040 deletions(-) delete mode 100644 apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.py delete mode 100644 apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.json rename apps/jingrow/jingrow/ai/nodes/{web_scrapers => web_scrapers_create}/pyproject.toml (83%) rename apps/jingrow/jingrow/ai/nodes/{batch_create_records/batch_create_records.json => web_scrapers_create/web_scrapers_create.json} (51%) rename apps/jingrow/jingrow/ai/nodes/{web_scrapers/web_scrapers.py => web_scrapers_create/web_scrapers_create.py} (70%) diff --git a/apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.py b/apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.py deleted file mode 100644 index f386bf2..0000000 --- a/apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.py +++ /dev/null @@ -1,538 +0,0 @@ -import json -from typing import Dict, Any, Optional, List - -import jingrow - - -def get_branch_upstream_nodes(current_node_id, flow_data): - """获取同一分支的所有上游节点ID列表""" - if not current_node_id or not flow_data: - return [] - - edges = flow_data.get("edges", []) - upstream_nodes = set() - stack = [current_node_id] - - while stack: - node_id = stack.pop() - for edge in edges: - if edge.get("target") == node_id: - source_id = edge.get("source") - if source_id and source_id not in upstream_nodes: - upstream_nodes.add(source_id) - stack.append(source_id) - - return list(upstream_nodes) - - -def execute(context=None, inputs=None, config=None, **kwargs): - """ - 批量创建记录节点 - 用于批量创建记录,特别适用于处理 web_scrapers 节点返回的产品列表 - """ - if context is None: - context = kwargs.get("context", {}) - if inputs is None: - inputs = kwargs.get("inputs", {}) - if config is None: - config = kwargs.get("config", {}) - - try: - # 参数处理 - if not isinstance(inputs, dict): - try: - inputs = json.loads(inputs) if isinstance(inputs, str) else {} - except Exception: - inputs = {} - - # 获取 pagetype - record_type = None - if isinstance(inputs, dict): - record_type = inputs.get("pagetype") - if not record_type and isinstance(config, dict): - record_type = config.get("pagetype") - if not isinstance(record_type, str) or not record_type: - return { - "success": False, - "error": "pagetype 必须为非空字符串,请检查配置" - } - - # 获取字段映射配置 - field_map = config.get("field_map", []) - - # 获取默认网站配置(用于 Jsite Product 等需要 site 字段的类型) - default_site = config.get("default_site", "") - - # 从 Jingrow 获取字段 label->fieldname 映射 - label2field = jingrow.get_field_mapping_from_jingrow(record_type) or {} - - # 获取字段元数据(用于检查字段长度限制) - field_metadata = {} - try: - from jingrow.model.page import get_page_instance - pg = get_page_instance(record_type) - meta_result = pg.get_meta() - if isinstance(meta_result, dict) and meta_result.get('success'): - meta_data = meta_result.get('data', {}) - fields = meta_data.get('fields', []) - # 构建字段名到字段元数据的映射 - for field in fields: - fieldname = field.get('fieldname') - if fieldname: - # 尝试多种方式获取长度限制 - max_length = ( - field.get('length') or - field.get('max_length') or - field.get('maxlength') or - (field.get('df') and field.get('df').get('length')) or - None - ) - # 如果是整数,转换为整数 - if max_length is not None: - try: - max_length = int(max_length) - except (ValueError, TypeError): - max_length = None - - field_metadata[fieldname] = { - 'max_length': max_length, - 'fieldtype': field.get('fieldtype'), - 'label': field.get('label') - } - except Exception as e: - # 如果获取元数据失败,记录错误但继续执行 - print(f"获取字段元数据失败: {str(e)}") - pass - - # 如果获取元数据失败,使用已知的常见字段长度限制 - # 根据错误信息,title 字段通常是 140 字符 - # 为常见字段设置默认长度限制(即使元数据获取成功,也作为后备) - common_field_limits = { - 'title': 140, # 从错误信息中得知 - 'name': 140, - } - for fieldname, default_length in common_field_limits.items(): - if fieldname not in field_metadata: - field_metadata[fieldname] = {'max_length': default_length} - elif not field_metadata[fieldname].get('max_length'): - field_metadata[fieldname]['max_length'] = default_length - - # 同时,为所有通过 label2field 映射的字段设置默认限制(如果它们看起来是标题字段) - for label, fieldname in label2field.items(): - if fieldname not in field_metadata or not field_metadata[fieldname].get('max_length'): - # 如果字段标签包含"标题"或"title",且字段名是 title 或 name,设置默认限制 - label_str = str(label).lower() - if ('标题' in str(label) or 'title' in label_str) and fieldname in ['title', 'name']: - if fieldname not in field_metadata: - field_metadata[fieldname] = {'max_length': 140} - elif not field_metadata[fieldname].get('max_length'): - field_metadata[fieldname]['max_length'] = 140 - - # 获取数据数组 - # 支持多种数据源: - # 1. 从 inputs 中获取 products 数组(web_scrapers 节点输出) - # 2. 从 inputs 中获取 items 数组(通用数组) - # 3. 从上游节点获取 products 或 items - data_array = None - - # 方法1: 从 inputs 直接获取 - if isinstance(inputs, dict): - # web_scrapers 节点返回格式: {"products": [...]} - if "products" in inputs and isinstance(inputs["products"], list): - data_array = inputs["products"] - # 通用格式: {"items": [...]} - elif "items" in inputs and isinstance(inputs["items"], list): - data_array = inputs["items"] - # 如果 inputs 本身是数组 - elif isinstance(inputs, list): - data_array = inputs - - # 方法2: 从上游节点获取 - if not data_array: - current_node_id = context.get("current_node_id") - flow_data = context.get("flow_data", {}) - branch_upstream_nodes = get_branch_upstream_nodes(current_node_id, flow_data) - - if context and "node_results" in context and branch_upstream_nodes: - node_results = context["node_results"] - for upstream_node_id in branch_upstream_nodes: - if upstream_node_id in node_results: - upstream_result = node_results[upstream_node_id] - if isinstance(upstream_result, dict): - # web_scrapers 节点返回格式 - if "products" in upstream_result and isinstance(upstream_result["products"], list): - data_array = upstream_result["products"] - break - # 通用格式 - elif "items" in upstream_result and isinstance(upstream_result["items"], list): - data_array = upstream_result["items"] - break - # 如果有 ai_raw_response,尝试解析 JSON - elif "ai_raw_response" in upstream_result and isinstance(upstream_result["ai_raw_response"], str): - try: - raw_response = upstream_result["ai_raw_response"].strip() - start_brace = raw_response.find('[') - end_brace = raw_response.rfind(']') - - if start_brace != -1 and end_brace != -1 and end_brace > start_brace: - json_content = raw_response[start_brace:end_brace + 1] - parsed_json = json.loads(json_content) - if isinstance(parsed_json, list): - data_array = parsed_json - break - except: - continue - - if not data_array or not isinstance(data_array, list): - return { - "success": False, - "error": "未找到要创建的数据数组。请确保上游节点返回了 'products' 或 'items' 数组,或者直接传入数组数据。" - } - - if len(data_array) == 0: - return { - "success": True, - "message": "数据数组为空,未创建任何记录", - "created_count": 0, - "created_records": [] - } - - # 批量创建记录 - created_records = [] - failed_records = [] - - for index, item_data in enumerate(data_array): - try: - # 确保 item_data 是字典 - if not isinstance(item_data, dict): - failed_records.append({ - "index": index, - "data": item_data, - "error": "数据项不是字典格式" - }) - continue - - # 目标记录数据 - record_data = {} - mapped_fields = set() - - # 1. 优先用 field_map(from→to) - if field_map: - mapped_result = jingrow.map_fields_by_labels(field_map, item_data, label2field) - record_data.update(mapped_result) - mapped_fields.update([m.get("from") for m in field_map if m.get("from")]) - # 同时标记目标字段为已映射,防止被自动映射覆盖 - for mapping in field_map: - to_field = mapping.get("to") - if to_field: - mapped_fields.add(to_field) - - # 调试:检查哪些字段映射失败了 - failed_mappings = [] - for mapping in field_map: - from_field = mapping.get("from") - to_field = mapping.get("to") - if from_field in item_data and from_field not in mapped_result: - # 字段映射失败 - to_fieldname = label2field.get(str(to_field).strip()) - failed_mappings.append({ - "from": from_field, - "to_label": to_field, - "to_fieldname": to_fieldname, - "value": item_data.get(from_field) - }) - - # 2. 自动标签映射:处理未在 field_map 中映射的字段 - for label, value in item_data.items(): - if label in mapped_fields: - continue - fieldname = label2field.get(str(label).strip()) - if fieldname is not None and fieldname not in record_data: - record_data[fieldname] = value - mapped_fields.add(label) - - # 3. 处理特殊必填字段 - # 对于 Jsite Product,需要 site 字段 - if record_type == "Jsite Product": - # 检查 site 字段是否已映射 - site_fieldname = None - for label, fieldname in label2field.items(): - if "网站" in str(label) or "site" in str(label).lower() or fieldname == "site": - site_fieldname = fieldname - break - - if not site_fieldname: - site_fieldname = "site" - - if site_fieldname not in record_data: - if default_site: - # 验证 default_site 不是 URL - site_value = str(default_site).strip() - if site_value.startswith(('http://', 'https://')): - # 如果是 URL,尝试提取域名部分作为提示 - from urllib.parse import urlparse - parsed = urlparse(site_value) - domain = parsed.netloc or parsed.path.split('/')[0] - # 不设置值,让创建失败并提示用户 - # 但记录到调试信息中 - pass - else: - # 使用配置的网站名称 - record_data[site_fieldname] = site_value - else: - # 如果没有配置 default_site,在错误信息中提示 - # 但继续处理,让创建失败时能看到完整的错误信息 - pass - - # 确保 title 字段存在(如果数据中有 title 但映射失败) - # 检查所有可能的 title 字段名变体 - title_fieldnames = ["title", "name"] # title 和 name 都可能是标题字段 - title_mapped = False - - for title_field in title_fieldnames: - if title_field in record_data and record_data[title_field]: - title_mapped = True - break - - if not title_mapped and "title" in item_data and item_data.get("title"): - # 尝试多种方式映射 title - # 1. 查找 label2field 中 label 包含 "标题" 或 "title" 的字段 - for label, fieldname in label2field.items(): - if ("标题" in str(label) or "title" in str(label).lower()) and fieldname not in record_data: - record_data[fieldname] = item_data.get("title") - title_mapped = True - break - - # 2. 如果还没映射,直接尝试使用 title 或 name 作为字段名 - if not title_mapped: - for title_field in title_fieldnames: - if title_field not in record_data: - record_data[title_field] = item_data.get("title") - title_mapped = True - break - - # 处理复杂数据类型(数组、对象)的序列化 - # 某些字段可能需要转换为 JSON 字符串 - # 同时处理字段长度限制,自动截断超长字段 - processed_record_data = {} - truncated_fields = [] # 记录被截断的字段 - - for key, value in record_data.items(): - if isinstance(value, (list, dict)): - # 如果是数组或对象,尝试转换为 JSON 字符串 - # 但先检查目标字段类型,如果是 JSON 类型字段,可能需要保持原样 - try: - # 先尝试保持原样,如果创建失败再转换 - processed_record_data[key] = value - except: - processed_record_data[key] = json.dumps(value, ensure_ascii=False) - else: - # 处理字符串字段的长度限制 - if isinstance(value, str) and value: - # 对于 title 和 name 字段,强制应用 140 字符限制(根据错误信息) - if key in ['title', 'name'] and len(value) > 140: - original_length = len(value) - processed_record_data[key] = value[:140] - truncated_fields.append({ - "field": key, - "field_label": field_metadata.get(key, {}).get('label', key), - "original_length": original_length, - "max_length": 140, - "truncated_value": processed_record_data[key], - "truncation_method": "强制截断(title/name 字段默认 140 字符)" - }) - else: - # 检查字段是否有长度限制 - field_info = field_metadata.get(key, {}) - max_length = field_info.get('max_length') - - # 如果字段元数据中没有,尝试通过 label2field 查找对应的字段 - if not max_length: - # 尝试查找字段的 label,然后通过 label 查找元数据 - for label, fieldname in label2field.items(): - if fieldname == key: - # 找到了对应的 label,尝试查找元数据 - field_info_from_label = field_metadata.get(fieldname, {}) - if not max_length: - max_length = field_info_from_label.get('max_length') - break - - # 执行截断 - if max_length and isinstance(max_length, (int, float)): - max_length_int = int(max_length) - if len(value) > max_length_int: - # 自动截断超长字段 - original_length = len(value) - processed_record_data[key] = value[:max_length_int] - truncated_fields.append({ - "field": key, - "field_label": field_info.get('label', key), - "original_length": original_length, - "max_length": max_length_int, - "truncated_value": processed_record_data[key], - "truncation_method": "元数据截断" - }) - else: - processed_record_data[key] = value - else: - processed_record_data[key] = value - else: - processed_record_data[key] = value - - # 调用 Jingrow 创建记录 - try: - # 直接调用 Page.create 以获取详细的错误信息 - from jingrow.model.page import get_page_instance - pg = get_page_instance(record_type) - result = pg.create(processed_record_data) - - # 初始化 error_msg,确保在所有情况下都有值 - error_msg = '创建记录失败' - - if not isinstance(result, dict) or not result.get('success'): - # 获取详细错误信息 - error_msg = result.get('error', '创建记录失败') - error_detail = result.get('detail', '') - if error_detail: - error_msg = f"{error_msg}: {error_detail}" - - # 添加更详细的调试信息 - debug_info = { - "record_type": record_type, - "mapped_fields_count": len(processed_record_data), - "field_names": list(processed_record_data.keys()), - "response": result, - "label2field_sample": dict(list(label2field.items())[:10]) if label2field else {}, - "default_site_configured": bool(default_site), - "default_site_value": default_site if default_site else None, - "site_field_in_record_data": site_fieldname in processed_record_data if 'site_fieldname' in locals() else False, - "field_metadata_sample": dict(list(field_metadata.items())[:5]) if field_metadata else {}, - "title_field_length": len(processed_record_data.get('title', '')) if 'title' in processed_record_data else None, - "title_field_in_metadata": 'title' in field_metadata, - "title_max_length": field_metadata.get('title', {}).get('max_length') if 'title' in field_metadata else None - } - - # 如果有字段被截断,添加到调试信息 - if 'truncated_fields' in locals() and truncated_fields: - debug_info["truncated_fields"] = truncated_fields - else: - # 如果没有截断信息,但 title 字段存在且超长,说明截断逻辑可能没有执行 - if 'title' in processed_record_data: - title_value = processed_record_data['title'] - if isinstance(title_value, str) and len(title_value) > 140: - debug_info["truncation_warning"] = f"title 字段长度为 {len(title_value)},超过 140,但未执行截断。可能原因:max_length 未正确获取。" - - # 如果 default_site 是 URL,添加警告 - if default_site and str(default_site).strip().startswith(('http://', 'https://')): - debug_info["site_config_warning"] = "default_site 配置看起来是 URL,但 site 字段需要的是网站记录的名称(name),不是 URL。请检查网站设置,找到网站记录的名称并更新配置。" - - # 如果有失败的映射,添加到调试信息中 - if 'failed_mappings' in locals(): - debug_info["failed_mappings"] = failed_mappings - - failed_records.append({ - "index": index, - "data": item_data, - "error": error_msg, - "record_data": processed_record_data, - "debug": debug_info - }) - continue - - created = result.get('data', {}) - if not created: - debug_info = { - "record_type": record_type, - "response": result - } - if 'truncated_fields' in locals() and truncated_fields: - debug_info["truncated_fields"] = truncated_fields - - failed_records.append({ - "index": index, - "data": item_data, - "error": "创建记录失败:返回数据为空", - "record_data": processed_record_data, - "debug": debug_info - }) - continue - - # 记录成功创建,但如果有字段被截断,也记录下来 - if 'truncated_fields' in locals() and truncated_fields: - # 在成功创建的情况下,截断信息可以通过返回结果查看 - pass - - except Exception as create_error: - debug_info = { - "record_type": record_type, - "mapped_fields_count": len(processed_record_data), - "field_names": list(processed_record_data.keys()), - "exception_type": type(create_error).__name__ - } - if 'truncated_fields' in locals() and truncated_fields: - debug_info["truncated_fields"] = truncated_fields - - failed_records.append({ - "index": index, - "data": item_data, - "error": f"创建记录异常: {str(create_error)}", - "record_data": processed_record_data, - "debug": debug_info - }) - continue - - created_name = created.get("name") or record_data.get("name") - record_info = { - "index": index, - "name": created_name, - "pagetype": record_type - } - - # 如果有字段被截断,添加到记录信息中 - if 'truncated_fields' in locals() and truncated_fields: - record_info["truncated_fields"] = truncated_fields - - created_records.append(record_info) - - except Exception as e: - failed_records.append({ - "index": index, - "data": item_data, - "error": str(e) - }) - continue - - # 构建返回结果 - result = { - "success": True, - "total_count": len(data_array), - "created_count": len(created_records), - "failed_count": len(failed_records), - "created_records": created_records - } - - if failed_records: - result["failed_records"] = failed_records - result["message"] = f"成功创建 {len(created_records)} 条记录,失败 {len(failed_records)} 条" - else: - result["message"] = f"成功创建 {len(created_records)} 条记录" - - return result - - except Exception as e: - # 构建详细的错误信息 - error_details = { - "error": str(e), - "record_type": record_type if 'record_type' in locals() else None, - "inputs": inputs if 'inputs' in locals() else {}, - "field_map": field_map if 'field_map' in locals() else [] - } - - print(f"BatchCreateRecords异常: {json.dumps(error_details, ensure_ascii=False, indent=2)}") - - return { - "success": False, - "error": str(e), - "error_details": error_details - } - diff --git a/apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.json b/apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.json deleted file mode 100644 index 5fe96fd..0000000 --- a/apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.json +++ /dev/null @@ -1,181 +0,0 @@ -{ - "metadata": { - "type": "web_scrapers", - "label": "网站采集", - "icon": "fa-spider", - "color": "rgba(71, 180, 133, 1)", - "description": "基于 Crawl4AI 采集网站产品信息(标题、价格、描述、图片等),支持单产品详情页和产品列表页两种模式", - "group": "数据", - "component_type": "GenericNode" - }, - "properties": { - "url": { - "type": "string", - "title": "目标URL", - "description": "要采集的网站URL,支持 Jinja2 模板语法,如:{{ product_url }}", - "format": "uri", - "minLength": 1 - }, - "crawl_mode": { - "type": "string", - "title": "爬取模式", - "description": "选择爬取模式:single(单产品详情页)或 list(产品列表页,支持分页)", - "default": "single", - "enum": [ - "single", - "list" - ], - "enumNames": [ - "单产品详情页", - "产品列表页" - ] - }, - "max_pages": { - "type": "integer", - "title": "最大页数", - "description": "列表页模式下的最大爬取页数(防止无限循环)", - "default": 100, - "minimum": 1, - "maximum": 1000 - }, - "enrich_details": { - "type": "boolean", - "title": "补充产品详情", - "description": "列表页模式下,是否爬取每个产品的详情页以补充详细信息(图片、价格、公司信息、Product Details等)" - }, - "use_ai_extraction": { - "type": "boolean", - "title": "使用AI提取", - "description": "如果启用,将使用AI模型从网页内容中智能提取结构化产品信息(在手动提取的基础上进一步优化)" - }, - "ai_system_message": { - "type": "string", - "title": "系统消息", - "description": "定义AI的角色和行为", - "default": "你是一个专业的产品信息提取助手,请从网页内容中准确提取产品信息。", - "format": "textarea", - "rows": 5 - }, - "text_model": { - "type": "string", - "title": "AI模型", - "description": "选择要使用的AI模型(与 ai_content_generation 节点保持一致)", - "default": "Jingrow", - "enum": [ - "Jingrow", - "DeepSeek", - "Doubao", - "ChatGPT", - "Qwen", - "Wenxin", - "Hunyuan", - "Spark" - ], - "enumNames": [ - "Jingrow", - "DeepSeek", - "Doubao", - "ChatGPT", - "Qwen", - "文心", - "混元", - "讯飞星火" - ] - }, - "max_tokens": { - "type": "integer", - "title": "最大Token数", - "description": "限制生成内容的最大长度", - "default": 2000 - }, - "ai_temperature": { - "type": "number", - "title": "随机性 (Temperature)", - "description": "控制输出的随机性,0为确定性,2为最大随机性", - "default": 0.7 - }, - "ai_top_p": { - "type": "number", - "title": "多样性 (Top-p)", - "description": "控制词汇选择的多样性", - "default": 0.9 - }, - "hide_input_handle": { - "type": "boolean", - "title": "隐藏输入接口", - "description": "是否隐藏节点的输入连接点" - } - }, - "required": [ - "url" - ], - "_layout": { - "tabs": [ - { - "id": "tab_1", - "label": "基础配置", - "sections": [ - { - "id": "section_1", - "label": "基本属性", - "columns": [ - { - "id": "column_1", - "label": "主要设置", - "fields": [ - "url", - "crawl_mode", - "max_pages", - "enrich_details", - "use_ai_extraction", - "ai_system_message" - ] - } - ] - } - ] - }, - { - "id": "tab_2", - "label": "AI模型设置", - "sections": [ - { - "id": "section_2", - "label": "模型参数", - "columns": [ - { - "id": "column_2", - "label": "模型配置", - "fields": [ - "text_model", - "max_tokens", - "ai_temperature", - "ai_top_p" - ] - } - ] - } - ] - }, - { - "id": "tab_3", - "label": "节点设置", - "sections": [ - { - "id": "section_3", - "label": "节点接口", - "columns": [ - { - "id": "column_3", - "fields": [ - "hide_input_handle" - ] - } - ] - } - ] - } - ], - "activeTab": "tab_3" - } -} \ No newline at end of file diff --git a/apps/jingrow/jingrow/ai/nodes/web_scrapers/pyproject.toml b/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/pyproject.toml similarity index 83% rename from apps/jingrow/jingrow/ai/nodes/web_scrapers/pyproject.toml rename to apps/jingrow/jingrow/ai/nodes/web_scrapers_create/pyproject.toml index 30c764e..8016de5 100644 --- a/apps/jingrow/jingrow/ai/nodes/web_scrapers/pyproject.toml +++ b/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/pyproject.toml @@ -1,15 +1,17 @@ [project] -name = "jingrow-node-web-scrapers" +name = "web_scrapers_create" version = "1.0.0" requires-python = ">=3.10" dependencies = [ "beautifulsoup4", "crawl4ai", "playwright", + "requests", ] [tool.jingrow.after_install] commands = [ "python -m playwright install-deps", "python -m playwright install" -] \ No newline at end of file +] + diff --git a/apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.json b/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.json similarity index 51% rename from apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.json rename to apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.json index bf5db7a..98549f9 100644 --- a/apps/jingrow/jingrow/ai/nodes/batch_create_records/batch_create_records.json +++ b/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.json @@ -1,14 +1,21 @@ { "metadata": { - "type": "batch_create_records", - "label": "批量创建记录", - "icon": "fa-layer-group", - "color": "#10b981", - "description": "批量创建页面记录,特别适用于处理产品列表等数组数据", - "group": "输出", + "type": "web_scrapers_create", + "label": "网站采集并创建", + "icon": "fa-spider", + "color": "rgba(71, 180, 133, 1)", + "description": "采集网站产品信息并立即创建记录(一体化节点),支持字段映射和图片上传", + "group": "数据", "component_type": "GenericNode" }, "properties": { + "url": { + "type": "string", + "title": "目标URL", + "description": "要采集的网站URL,支持 Jinja2 模板语法,如:{{ product_url }}", + "format": "uri", + "minLength": 1 + }, "pagetype": { "type": "string", "title": "页面类型", @@ -18,14 +25,14 @@ "field_map": { "type": "array", "title": "字段映射", - "description": "配置字段映射关系,将上游节点的数据映射到目标字段。如果不配置,将使用自动标签映射", + "description": "配置字段映射关系,将采集的数据映射到目标字段。如果不配置,将使用自动标签映射", "items": { "type": "object", "properties": { "from": { "type": "string", "title": "来源字段", - "description": "上游节点输出的字段名,如:title、price、images" + "description": "采集数据中的字段名,如:title、price、images、Product Details" }, "to": { "type": "string", @@ -44,13 +51,22 @@ "title": "默认网站", "description": "Jsite Product 等类型需要 site 字段,请填写网站记录的名称(name),不是 URL。例如:如果网站记录的名称是 'mysite',就填写 'mysite'。可以在网站设置页面查看网站记录的名称。" }, - "show_output_handle": { + "max_pages": { + "type": "integer", + "title": "最大页数", + "description": "列表页模式下的最大爬取页数(防止无限循环)", + "default": 100, + "minimum": 1, + "maximum": 1000 + }, + "hide_input_handle": { "type": "boolean", - "title": "显示输出接口", - "description": "是否显示节点的输出连接点" + "title": "隐藏输入接口", + "description": "是否隐藏节点的输入连接点" } }, "required": [ + "url", "pagetype" ], "_layout": { @@ -61,15 +77,16 @@ "sections": [ { "id": "section_1", - "label": "基本信息", + "label": "基本属性", "columns": [ { "id": "column_1", - "label": "记录设置", + "label": "主要设置", "fields": [ + "url", "pagetype", - "default_site", - "show_output_handle" + "max_pages", + "default_site" ] } ] @@ -82,11 +99,11 @@ "sections": [ { "id": "section_2", - "label": "映射配置", + "label": "字段映射配置", "columns": [ { "id": "column_2", - "label": "字段映射", + "label": "映射设置", "fields": [ "field_map" ] @@ -94,9 +111,27 @@ ] } ] + }, + { + "id": "tab_3", + "label": "节点设置", + "sections": [ + { + "id": "section_3", + "label": "节点接口", + "columns": [ + { + "id": "column_3", + "fields": [ + "hide_input_handle" + ] + } + ] + } + ] } ], - "activeTab": "tab_2" + "activeTab": "tab_1" } } diff --git a/apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.py b/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.py similarity index 70% rename from apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.py rename to apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.py index cdb059d..9ba8232 100644 --- a/apps/jingrow/jingrow/ai/nodes/web_scrapers/web_scrapers.py +++ b/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.py @@ -1,6 +1,8 @@ import json import re import asyncio +import requests +import uuid from typing import Dict, Any, Optional, List from urllib.parse import urljoin, urlparse, parse_qs, urlencode @@ -73,7 +75,6 @@ def extract_products_from_page(soup, base_url=''): parsed = urlparse(base_url) product['url'] = f"{parsed.scheme}://{parsed.netloc}" + product['url'] else: - # 从当前页面URL推断基础URL product['url'] = base_url + product['url'] # 提取产品价格(从 prod-price 中) @@ -573,90 +574,6 @@ async def crawl_single_list_page(crawler, url, page_num, base_url=''): return products, next_url, category_name -async def crawl_product_list(crawler, start_url, max_pages=100, enrich_details=False, base_url=''): - """爬取产品列表(支持分页)""" - all_products = [] - current_url = start_url - page_num = 1 - category_name = None - - while current_url and page_num <= max_pages: - # 爬取当前页 - products, next_url, cat_name = await crawl_single_list_page(crawler, current_url, page_num, base_url) - - # 保存第一页提取的分类名称 - if page_num == 1 and cat_name: - category_name = cat_name - - if products: - all_products.extend(products) - else: - break - - # 检查是否有下一页 - if next_url: - current_url = next_url - page_num += 1 - # 添加延迟,避免请求过快 - await asyncio.sleep(1) - else: - break - - # 如果需要补充详情,为每个产品爬取详情页 - if enrich_details and all_products: - for i, product in enumerate(all_products, 1): - if not product.get('url'): - continue - - # 爬取产品详情 - detail_info = await crawl_product_detail( - crawler, - product['url'], - i, - len(all_products), - base_url - ) - - # 合并详细信息到产品数据 - if detail_info: - # 更新图片(使用详情页的图片列表) - if 'images' in detail_info: - product['images'] = detail_info['images'] - # 保留第一张图片作为主图 - if detail_info['images']: - product['image'] = detail_info['images'][0] - - # 更新价格(使用详情页的价格) - if 'prices' in detail_info: - product['prices'] = detail_info['prices'] - - # 添加公司信息 - if 'company_name' in detail_info: - product['company_name'] = detail_info['company_name'] - if 'company_url' in detail_info: - product['company_url'] = detail_info['company_url'] - - # 添加Product Details - if 'Product Details' in detail_info: - product['Product Details'] = detail_info['Product Details'] - - # 添加延迟,避免请求过快 - if i < len(all_products): - await asyncio.sleep(0.5) - - # 将分类信息添加到结果中 - result = { - 'products': all_products, - 'category_name': category_name, - 'total_pages': page_num, - 'total_products': len(all_products) - } - - return result - - -# ==================== 节点执行函数 ==================== - def get_branch_upstream_nodes(current_node_id, flow_data): """获取同一分支的所有上游节点ID列表""" if not current_node_id or not flow_data: @@ -678,12 +595,329 @@ def get_branch_upstream_nodes(current_node_id, flow_data): return list(upstream_nodes) +async def download_and_upload_image(image_url: str, record_name: str, record_type: str, field_name: str = None) -> Optional[str]: + """下载图片并上传到服务器""" + try: + # 下载图片 + response = requests.get(image_url, verify=False, timeout=30) + if response.status_code != 200: + return None + + # 获取文件扩展名 + parsed_url = urlparse(image_url) + path = parsed_url.path + file_extension = "jpg" + if path: + ext = path.split('.')[-1].lower() + if ext in ['jpg', 'jpeg', 'png', 'gif', 'webp']: + file_extension = ext + + # 生成文件名 + filename = f"scraped_{uuid.uuid4().hex[:8]}.{file_extension}" + + # 上传图片 + upload_result = jingrow.upload_file( + file_data=response.content, + filename=filename, + attached_to_pagetype=record_type, + attached_to_name=record_name, + attached_to_field=field_name + ) + + if upload_result.get('success'): + return upload_result.get('file_url') + return None + except Exception as e: + print(f"[ERROR] 图片上传失败 {image_url}: {str(e)}") + return None + + +async def upload_images(images: List[str], record_name: str, record_type: str, field_name: str = None) -> List[str]: + """批量上传图片""" + uploaded_urls = [] + for image_url in images: + if not image_url or not image_url.startswith('http'): + continue + uploaded_url = await download_and_upload_image(image_url, record_name, record_type, field_name) + if uploaded_url: + uploaded_urls.append(uploaded_url) + return uploaded_urls + + +def map_product_data_to_record(product_data: Dict[str, Any], field_map: List[Dict], label2field: Dict, record_type: str, default_site: str = "") -> Dict[str, Any]: + """将产品数据映射为记录字段""" + record_data = {} + mapped_fields = set() + + # 识别图片字段(需要特殊处理,不能直接映射) + image_source_fields = set() + image_target_fields = set() + for mapping in field_map: + from_field = mapping.get("from") + to_field = mapping.get("to") + if from_field in ["image", "images"]: + image_source_fields.add(from_field) + if to_field: + image_target_field = label2field.get(str(to_field).strip(), to_field) + if image_target_field: + image_target_fields.add(image_target_field) + + # 1. 优先使用 field_map 映射(排除图片字段) + if field_map: + # 过滤掉图片字段的映射 + filtered_field_map = [m for m in field_map if m.get("from") not in image_source_fields] + if filtered_field_map: + mapped_result = jingrow.map_fields_by_labels(filtered_field_map, product_data, label2field) + record_data.update(mapped_result) + mapped_fields.update([m.get("from") for m in field_map if m.get("from")]) + + # 2. 自动标签映射:处理未映射的字段(排除图片字段) + for label, value in product_data.items(): + if label in mapped_fields or label in image_source_fields: + continue + fieldname = label2field.get(str(label).strip()) + if fieldname is not None and fieldname not in record_data and fieldname not in image_target_fields: + record_data[fieldname] = value + mapped_fields.add(label) + + # 3. 处理特殊必填字段(如果配置了 default_site,尝试设置 site 字段) + if default_site: + # 查找 site 字段 + site_fieldname = None + for label, fieldname in label2field.items(): + if "网站" in str(label) or "site" in str(label).lower() or fieldname == "site": + site_fieldname = fieldname + break + + # 如果没找到,尝试使用 "site" 作为字段名 + if not site_fieldname: + # 检查 "site" 是否在字段名集合中 + if "site" in label2field.values(): + site_fieldname = "site" + else: + # 如果 label2field 中有 site 相关的字段,使用第一个 + for fieldname in label2field.values(): + if "site" in fieldname.lower(): + site_fieldname = fieldname + break + + # 如果找到了 site 字段且未设置,则设置默认值 + if site_fieldname and site_fieldname not in record_data: + site_value = str(default_site).strip() + # 验证 default_site 不是 URL + if not site_value.startswith(('http://', 'https://')): + record_data[site_fieldname] = site_value + + # 4. 处理字段长度限制 + processed_data = {} + for key, value in record_data.items(): + if isinstance(value, str) and len(value) > 140 and key in ['title', 'name']: + processed_data[key] = value[:140] + elif isinstance(value, (list, dict)): + try: + processed_data[key] = json.dumps(value, ensure_ascii=False) + except: + processed_data[key] = str(value) + else: + processed_data[key] = value + + return processed_data + + +async def create_record_async(product_data: Dict[str, Any], config: Dict[str, Any], label2field: Dict) -> Dict[str, Any]: + """异步创建记录""" + try: + record_type = config.get("pagetype") + field_map = config.get("field_map", []) + default_site = config.get("default_site", "") + + # 映射字段 + record_data = map_product_data_to_record(product_data, field_map, label2field, record_type, default_site) + + # 处理图片上传 + image_field = None + for mapping in field_map: + if mapping.get("from") in ["image", "images"]: + to_field = mapping.get("to") + if to_field: + image_field = label2field.get(str(to_field).strip(), to_field) + break + + # 如果没找到映射,尝试自动查找图片字段 + if not image_field: + for label, fieldname in label2field.items(): + if "图片" in str(label) or "image" in str(label).lower(): + image_field = fieldname + break + + # 上传图片 + images_to_upload = [] + if "images" in product_data and isinstance(product_data["images"], list): + images_to_upload = product_data["images"] + elif "image" in product_data and product_data["image"]: + images_to_upload = [product_data["image"]] + + # 先创建记录(需要 record_name 来上传图片) + # 获取标题作为记录名称 + record_name = record_data.get("title") or record_data.get("name") or f"product_{uuid.uuid4().hex[:8]}" + if isinstance(record_name, str) and len(record_name) > 140: + record_name = record_name[:140] + + # 创建记录 + created = jingrow.create_pg(record_type, record_data) + + if not created: + return { + "success": False, + "error": "创建记录失败", + "data": product_data + } + + created_name = created.get('name') or record_name + + # 上传图片(如果有) + update_data = {} + if images_to_upload and image_field: + uploaded_urls = await upload_images(images_to_upload, created_name, record_type, image_field) + if uploaded_urls: + # 只设置第一张图片作为主图(避免多张图片时设置数组) + # 如果目标字段支持多张图片,可以通过其他方式处理 + update_data[image_field] = uploaded_urls[0] + + # 如果有需要更新的数据,更新记录 + if update_data: + jingrow.update_pg(record_type, created_name, update_data) + + return { + "success": True, + "record_name": created_name, + "data": product_data + } + except Exception as e: + return { + "success": False, + "error": str(e), + "data": product_data + } + + +async def crawl_and_create_list(crawler, start_url: str, config: Dict[str, Any], base_url: str = '', max_pages: int = 100): + """爬取产品列表并逐条创建记录""" + record_type = config.get("pagetype") + if not record_type: + return { + "success": False, + "error": "pagetype 必须配置" + } + + # 获取字段映射 + label2field = jingrow.get_field_mapping_from_jingrow(record_type) or {} + + current_url = start_url + page_num = 1 + category_name = None + created_count = 0 + failed_count = 0 + created_records = [] + failed_records = [] + + while current_url and page_num <= max_pages: + # 爬取当前页 + products, next_url, cat_name = await crawl_single_list_page(crawler, current_url, page_num, base_url) + + if page_num == 1 and cat_name: + category_name = cat_name + + if not products: + break + + # 为每个产品爬取详情并创建记录 + for product in products: + if not product.get('url'): + continue + + # 爬取产品详情 + detail_info = await crawl_product_detail(crawler, product['url'], None, None, base_url) + + # 合并数据(列表页数据 + 详情页数据) + full_product_data = product.copy() + if detail_info: + # 更新标题(使用详情页的标题,更准确) + if 'title' in detail_info: + full_product_data['title'] = detail_info['title'] + + # 提取副标题(如果有合适的来源字段) + # 可以从 title 中提取副标题,例如 "主标题 - 副标题" + if 'title' in full_product_data: + title_parts = full_product_data['title'].split(' - ', 1) + if len(title_parts) == 2: + full_product_data['title'] = title_parts[0].strip() + full_product_data['subtitle'] = title_parts[1].strip() + + # 更新图片(使用详情页的图片列表,更完整) + if 'images' in detail_info and detail_info['images']: + full_product_data['images'] = detail_info['images'] + full_product_data['image'] = detail_info['images'][0] + elif 'image' in full_product_data and not full_product_data.get('images'): + # 如果详情页没有图片,使用列表页的图片 + full_product_data['images'] = [full_product_data['image']] + + # 更新价格(使用详情页的价格,更详细) + if 'prices' in detail_info: + full_product_data['prices'] = detail_info['prices'] + + # 添加公司信息 + if 'company_name' in detail_info: + full_product_data['company_name'] = detail_info['company_name'] + if 'company_url' in detail_info: + full_product_data['company_url'] = detail_info['company_url'] + + # 添加Product Details(完整的产品详情) + if 'Product Details' in detail_info: + full_product_data['Product Details'] = detail_info['Product Details'] + + # 异步创建记录 + create_result = await create_record_async(full_product_data, config, label2field) + + if create_result.get('success'): + created_count += 1 + created_records.append({ + "record_name": create_result.get('record_name'), + "title": full_product_data.get('title') + }) + else: + failed_count += 1 + failed_records.append({ + "title": full_product_data.get('title'), + "error": create_result.get('error') + }) + + # 添加延迟,避免请求过快 + await asyncio.sleep(0.5) + + # 检查是否有下一页 + if next_url: + current_url = next_url + page_num += 1 + await asyncio.sleep(1) + else: + break + + return { + "success": True, + "category_name": category_name, + "total_pages": page_num, + "created_count": created_count, + "failed_count": failed_count, + "created_records": created_records, + "failed_records": failed_records + } + + def execute(context=None, inputs=None, config=None, **kwargs): """ - 网站产品信息采集节点 - 基于 Crawl4AI - 支持两种模式: - 1. 单产品详情页模式:爬取单个产品详情页 - 2. 产品列表页模式:爬取产品列表(支持分页),可选择是否补充详情 + 网站采集并创建记录节点(一体化) + 采集一条数据后立即异步创建一条记录 """ if context is None: context = kwargs.get("context", {}) @@ -691,7 +925,7 @@ def execute(context=None, inputs=None, config=None, **kwargs): inputs = kwargs.get("inputs", {}) if config is None: config = kwargs.get("config", {}) - + try: # 参数处理 if not isinstance(inputs, dict): @@ -699,12 +933,17 @@ def execute(context=None, inputs=None, config=None, **kwargs): inputs = json.loads(inputs) if isinstance(inputs, str) else {} except Exception: inputs = {} - + # 获取配置 url = config.get("url") or inputs.get("url") if not url: return {"success": False, "error": "URL 必填"} - + + # 获取 pagetype + record_type = config.get("pagetype") + if not record_type: + return {"success": False, "error": "pagetype 必须配置"} + # 支持从上游节点获取 URL current_node_id = context.get("current_node_id") flow_data = context.get("flow_data", {}) @@ -716,7 +955,6 @@ def execute(context=None, inputs=None, config=None, **kwargs): if upstream_node_id in node_results: upstream_result = node_results[upstream_node_id] if isinstance(upstream_result, dict): - # 解析 AI 原始响应中的 JSON if "ai_raw_response" in upstream_result and isinstance(upstream_result["ai_raw_response"], str): try: raw_response = upstream_result["ai_raw_response"].strip() @@ -733,16 +971,14 @@ def execute(context=None, inputs=None, config=None, **kwargs): except: pass - # 平铺其他字段到 inputs for k, v in upstream_result.items(): if k not in inputs or not inputs.get(k): inputs[k] = v - # 如果 inputs 中有 url,优先使用 if inputs.get("url") and not url: url = inputs.get("url") - - # 构建模板上下文(支持 Jinja2 模板) + + # 构建模板上下文 template_context = inputs.copy() if context and context.get("flow_id"): flow_id = context["flow_id"] @@ -751,13 +987,13 @@ def execute(context=None, inputs=None, config=None, **kwargs): for key, value in agent_data.items(): if key not in template_context or not template_context.get(key): template_context[key] = value - - # 如果 URL 包含模板语法,进行渲染 + + # 渲染 URL 模板 try: url = render_template(url, template_context) except Exception as e: return {"success": False, "error": f"URL 模板渲染失败: {str(e)}"} - + # URL 验证和规范化 if not url or not isinstance(url, str): return {"success": False, "error": "URL 不能为空"} @@ -766,7 +1002,6 @@ def execute(context=None, inputs=None, config=None, **kwargs): if not url: return {"success": False, "error": "URL 不能为空"} - # 确保 URL 有协议前缀 if not url.startswith(('http://', 'https://', 'file://', 'raw:')): if url.startswith('//'): url = 'https:' + url @@ -774,209 +1009,25 @@ def execute(context=None, inputs=None, config=None, **kwargs): url = 'https://' + url else: return {"success": False, "error": f"URL 格式不正确: {url}"} - - # 获取模式配置 - crawl_mode = config.get("crawl_mode", "single") # "single" 或 "list" - max_pages = config.get("max_pages", 100) # 列表页模式的最大页数 - enrich_details = config.get("enrich_details", False) # 是否补充产品详情 - - # AI 提取配置(仅用于单产品模式) - use_ai_extraction = config.get("use_ai_extraction", False) - text_model = config.get("text_model", "Jingrow") - ai_system_message = config.get("ai_system_message", "你是一个专业的产品信息提取助手,请从网页内容中准确提取产品信息。") - max_tokens = config.get("max_tokens", 2000) - ai_temperature = config.get("ai_temperature", 0.7) - ai_top_p = config.get("ai_top_p", 0.9) - - # 使用 Crawl4AI 进行爬取 - try: - from crawl4ai import AsyncWebCrawler - import asyncio - except ImportError: - return { - "success": False, - "error": "Crawl4AI 未安装,请运行: uv pip install crawl4ai" - } - - async def crawl_website(): + + # 获取配置 + max_pages = config.get("max_pages", 100) + + # 提取基础URL + parsed = urlparse(url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + + # 执行爬取和创建 + async def run_crawl(): async with AsyncWebCrawler(verbose=True) as crawler: - # 提取基础URL用于相对路径处理 - parsed = urlparse(url) - base_url = f"{parsed.scheme}://{parsed.netloc}" - - if crawl_mode == "list": - # 产品列表页模式 - result = await crawl_product_list( - crawler, - url, - max_pages=max_pages, - enrich_details=enrich_details, - base_url=base_url - ) - - # 构建返回结果 - return { - "success": True, - "url": url, - "mode": "list", - "category_name": result.get('category_name'), - "total_pages": result.get('total_pages', 0), - "total_products": result.get('total_products', 0), - "products": result.get('products', []) - } - else: - # 单产品详情页模式 - result = await crawler.arun( - url=url, - wait_for="networkidle", - page_timeout=30000, - ) - - if not result.success: - return { - "success": False, - "error": f"爬取失败: {result.error_message or '未知错误'}" - } - - html_content = result.html - markdown_content = result.markdown - - # 初始化 BeautifulSoup - soup = BeautifulSoup(html_content, 'html.parser') - - # 提取产品信息 - extracted_data = {} - - # 提取标题 - title = extract_product_title(soup) - if title: - extracted_data['title'] = title - - # 提取图片 - images = extract_product_detail_images(soup, base_url) - if images: - extracted_data['image'] = images - - # 提取价格 - prices = extract_product_detail_prices(soup) - if prices: - extracted_data['price'] = prices - - # 提取公司信息 - company_info = extract_company_info(soup, base_url) - if company_info.get('company_name'): - extracted_data['company_name'] = company_info['company_name'] - extracted_data['url'] = company_info['company_url'] - - # 提取Product Details - product_details = extract_product_details(soup) - if product_details: - extracted_data['Product Details'] = product_details - - # 如果启用了 AI 提取,使用 AI 模型进一步提取和补充信息 - if use_ai_extraction: - try: - from jingrow.utils.jingrow_cloud import call_ai_model - - # 构建 AI 提取提示词 - ai_prompt = f"""请从以下网页内容中提取产品信息,返回 JSON 格式。 - -URL: {url} - -网页内容(Markdown,前5000字符): -{markdown_content[:5000]} - -已提取的基础信息: -{json.dumps(extracted_data, ensure_ascii=False, indent=2)} - -请基于网页内容和完善已有信息,返回完整的 JSON 格式(严格按照以下结构): -{{ - "title": "产品标题", - "price": [{{"US$877.00": "2-9"}}, {{"US$850.00": "10+"}}], // 价格数组,每个元素是 {{"价格": "数量范围"}} 的格式 - "image": ["图片URL1", "图片URL2"], // 图片URL数组 - "Product Details": "产品详细信息文本", - "company_name": "公司名称", - "url": "公司主页URL" -}} - -注意: -- price 必须是数组,每个元素是对象格式 {{"价格": "数量范围"}} -- image 必须是字符串数组 -- 如果某些信息不存在,使用空字符串或空数组""" - - # 调用 AI 模型 - ai_res = call_ai_model( - ai_prompt, - text_model=text_model, - ai_temperature=ai_temperature, - ai_top_p=ai_top_p, - ai_system_message=ai_system_message, - max_tokens=max_tokens, - image_urls=None, - ) - - if ai_res.get("success"): - ai_response = ai_res.get("response", "") - # 尝试解析 JSON - try: - start_brace = ai_response.find('{') - end_brace = ai_response.rfind('}') - if start_brace != -1 and end_brace != -1 and end_brace > start_brace: - json_str = ai_response[start_brace:end_brace + 1] - ai_data = json.loads(json_str) - if isinstance(ai_data, dict): - # 合并 AI 提取的数据,AI 提取的数据优先 - for key, value in ai_data.items(): - if value is not None and value != "" and (not isinstance(value, list) or len(value) > 0): - extracted_data[key] = value - except json.JSONDecodeError: - pass - except Exception: - pass - - # 确保输出格式一致 - if "price" in extracted_data: - if not isinstance(extracted_data["price"], list): - if isinstance(extracted_data["price"], str): - extracted_data["price"] = [extracted_data["price"]] - else: - extracted_data["price"] = [] - - if "image" in extracted_data: - if not isinstance(extracted_data["image"], list): - extracted_data["image"] = [] - - # 确保所有必需字段存在 - final_data = { - "title": extracted_data.get("title", ""), - "price": extracted_data.get("price", []), - "image": extracted_data.get("image", []), - "Product Details": extracted_data.get("Product Details", ""), - "company_name": extracted_data.get("company_name", ""), - "url": extracted_data.get("url", "") - } - - # 构建返回结果 - return { - "success": True, - "url": url, - "mode": "single", - "extracted_data": final_data - } - - # 运行异步爬取 - try: - result = asyncio.run(crawl_website()) - return result - except Exception as e: - return { - "success": False, - "error": f"爬取执行失败: {str(e)}" - } - + return await crawl_and_create_list(crawler, url, config, base_url, max_pages) + + result = asyncio.run(run_crawl()) + return result + except Exception as e: return { "success": False, - "error": f"网站采集节点执行失败: {str(e)}" + "error": f"执行失败: {str(e)}" }