From 4d6785dc9fe9b2784c33abc62b930fe3340bf88e Mon Sep 17 00:00:00 2001 From: jingrow Date: Fri, 14 Nov 2025 17:52:37 +0800 Subject: [PATCH] add category auto-linking --- .../web_scrapers_create.json | 22 +++- .../web_scrapers_create.py | 101 ++++++++++++++++-- 2 files changed, 114 insertions(+), 9 deletions(-) diff --git a/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.json b/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.json index 98549f9..0ca886e 100644 --- a/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.json +++ b/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.json @@ -18,7 +18,7 @@ }, "pagetype": { "type": "string", - "title": "页面类型", + "title": "内容页面类型", "description": "要创建记录的页面类型,如:Jsite Product、Jsite Article", "minLength": 1 }, @@ -51,6 +51,23 @@ "title": "默认网站", "description": "Jsite Product 等类型需要 site 字段,请填写网站记录的名称(name),不是 URL。例如:如果网站记录的名称是 'mysite',就填写 'mysite'。可以在网站设置页面查看网站记录的名称。" }, + "category_pagetype": { + "type": "string", + "title": "分类页面类型", + "description": "分类的页面类型,默认为 Jsite Product Category。如果内容页面类型中有分类字段(Link类型),会自动查找或创建分类记录", + "default": "Jsite Product Category" + }, + "category_field": { + "type": "string", + "title": "分类字段名", + "description": "内容页面类型中分类字段的字段名(不是标签),默认为 category。如果不配置,会自动查找包含'分类'或'category'的字段", + "default": "category" + }, + "category_name": { + "type": "string", + "title": "分类名称", + "description": "要关联的分类名称(分类记录的title字段值)。如果配置了此字段,所有创建的记录都会关联到该分类;如果不配置,会尝试从页面提取分类名称" + }, "max_pages": { "type": "integer", "title": "最大页数", @@ -85,6 +102,9 @@ "fields": [ "url", "pagetype", + "category_pagetype", + "category_field", + "category_name", "max_pages", "default_site" ] diff --git a/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.py b/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.py index b0f975f..dc6a0d4 100644 --- a/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.py +++ b/apps/jingrow/jingrow/ai/nodes/web_scrapers_create/web_scrapers_create.py @@ -749,7 +749,37 @@ async def upload_images(images: List[str], record_name: str, record_type: str, f return uploaded_urls -def map_product_data_to_record(product_data: Dict[str, Any], field_map: List[Dict], label2field: Dict, record_type: str, default_site: str = "") -> Dict[str, Any]: +def get_or_create_category(category_name: str, category_pagetype: str, site: str = None) -> Optional[str]: + """查找或创建分类,返回分类记录的name,失败返回None + 统一使用title字段查找和创建分类 + 如果提供了site,会在查找和创建时使用site字段过滤 + """ + if not category_name or not category_pagetype: + return None + + try: + # 通过title字段查找,如果提供了site则加上site过滤 + filters = [["title", "=", category_name]] + if site: + filters.append(["site", "=", site]) + existing = jingrow.get_list(category_pagetype, filters=filters, limit=1) + if existing: + return existing[0].get("name") + + # 不存在则创建(使用title字段,如果提供了site则设置site字段) + category_data = {"title": category_name} + if site: + category_data["site"] = site + created = jingrow.create_pg(category_pagetype, category_data) + if created: + return created.get("name") + except Exception: + pass + + return None + + +def map_product_data_to_record(product_data: Dict[str, Any], field_map: List[Dict], label2field: Dict, record_type: str, default_site: str = "", category_name: str = None, category_field: str = None, category_pagetype: str = None) -> Dict[str, Any]: """将产品数据映射为记录字段""" record_data = {} mapped_fields = set() @@ -813,7 +843,26 @@ def map_product_data_to_record(product_data: Dict[str, Any], field_map: List[Dic if not site_value.startswith(('http://', 'https://')): record_data[site_fieldname] = site_value - # 4. 处理字段长度限制 + # 4. 处理分类字段(Link类型) + if category_name and category_field and category_pagetype: + # 获取site值(从record_data中获取,或使用default_site) + site_value = None + # 先尝试从record_data中获取site字段值 + for key, value in record_data.items(): + if "site" in key.lower(): + site_value = value + break + # 如果record_data中没有,使用default_site + if not site_value and default_site: + site_value = str(default_site).strip() + if site_value.startswith(('http://', 'https://')): + site_value = None + + category_record_name = get_or_create_category(category_name, category_pagetype, site_value) + if category_record_name: + record_data[category_field] = category_record_name + + # 5. 处理字段长度限制 processed_data = {} for key, value in record_data.items(): if isinstance(value, str) and len(value) > 140 and key in ['title', 'name']: @@ -829,15 +878,49 @@ def map_product_data_to_record(product_data: Dict[str, Any], field_map: List[Dic return processed_data -async def create_record_async(product_data: Dict[str, Any], config: Dict[str, Any], label2field: Dict) -> Dict[str, Any]: +async def create_record_async(product_data: Dict[str, Any], config: Dict[str, Any], label2field: Dict, category_name: str = None) -> Dict[str, Any]: """异步创建记录""" try: record_type = config.get("pagetype") field_map = config.get("field_map", []) default_site = config.get("default_site", "") + # 获取分类字段配置 + category_field = None + category_pagetype = None + if category_name: + # 从config获取分类字段配置(优先使用新配置方式) + category_field = config.get("category_field", "category") + category_pagetype = config.get("category_pagetype", "Jsite Product Category") + + # 兼容旧配置方式 + if not category_field or not category_pagetype: + category_config = config.get("category", {}) + if category_config: + category_field = category_field or category_config.get("field") + category_pagetype = category_pagetype or category_config.get("pagetype", "Jsite Product Category") + + # 如果字段名未配置,尝试从label2field自动查找 + if not category_field: + for label, fieldname in label2field.items(): + if "分类" in str(label) or "category" in str(label).lower(): + category_field = fieldname + # 如果pagetype未配置,尝试从字段元数据获取 + if not category_pagetype or category_pagetype == "Jsite Product Category": + try: + meta = jingrow.get_meta(record_type) + if meta and meta.get("success"): + fields = meta.get("data", {}).get("fields", []) + for field in fields: + if field.get("fieldname") == fieldname and field.get("fieldtype") == "Link": + category_pagetype = field.get("options") or "Jsite Product Category" + break + except Exception: + pass + break + # 映射字段 - record_data = map_product_data_to_record(product_data, field_map, label2field, record_type, default_site) + record_data = map_product_data_to_record(product_data, field_map, label2field, record_type, default_site, category_name, category_field, category_pagetype) # 处理图片上传 image_field = None @@ -920,7 +1003,8 @@ async def crawl_and_create_list(crawler, start_url: str, config: Dict[str, Any], current_url = start_url page_num = 1 - category_name = None + # 优先使用配置的分类名称,如果没有配置才从页面提取 + category_name = config.get("category_name") created_count = 0 failed_count = 0 created_records = [] @@ -930,7 +1014,8 @@ async def crawl_and_create_list(crawler, start_url: str, config: Dict[str, Any], # 爬取当前页 products, next_url, cat_name = await crawl_single_list_page(crawler, current_url, page_num, base_url) - if page_num == 1 and cat_name: + # 如果配置中没有分类名称,且是第一页,尝试从页面提取 + if not category_name and page_num == 1 and cat_name: category_name = cat_name if not products: @@ -985,8 +1070,8 @@ async def crawl_and_create_list(crawler, start_url: str, config: Dict[str, Any], if 'description' in detail_info: full_product_data['description'] = detail_info['description'] - # 异步创建记录 - create_result = await create_record_async(full_product_data, config, label2field) + # 异步创建记录(传入分类名称) + create_result = await create_record_async(full_product_data, config, label2field, category_name) if create_result.get('success'): created_count += 1