From 8b25b8ccec1b6bdf8f1827f2a0d2225783e9619d Mon Sep 17 00:00:00 2001 From: jingrow Date: Thu, 30 Oct 2025 22:18:09 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=97=A5=E5=BF=97=E8=BE=93?= =?UTF-8?q?=E5=87=BA=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/jingrow/jingrow/__init__.py | 32 +- .../jingrow/jingrow/adapters/local_adapter.py | 295 +----------------- 2 files changed, 27 insertions(+), 300 deletions(-) diff --git a/apps/jingrow/jingrow/__init__.py b/apps/jingrow/jingrow/__init__.py index 73f1e3e..4666be7 100644 --- a/apps/jingrow/jingrow/__init__.py +++ b/apps/jingrow/jingrow/__init__.py @@ -29,13 +29,15 @@ for _lvl in list(_ANSI.keys()): def _ensure_logging_configured() -> None: - if not _root_logger.handlers: + # 统一在 root logger 上配置输出与格式,这样通过名为 "jingrow" 的 logger 打印时, + # 也会以 "jingrow - ERROR - ... - 时间" 的格式输出,并带颜色的级别名。 + root_logger = logging.getLogger() + if not root_logger.handlers: handler = logging.StreamHandler() - # 模块名 - 级别(已上色) - 消息 - 时间 - formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s - %(asctime)s") + # 统一样式:不含 logger 名称,只输出级别、消息、时间 + formatter = logging.Formatter("%(levelname)s - %(message)s - %(asctime)s") handler.setFormatter(formatter) - _root_logger.addHandler(handler) - # 从配置文件(Config)读取日志级别(默认 INFO) + root_logger.addHandler(handler) try: level_name = str(getattr(Config, 'log_level', 'INFO')).upper() except Exception: @@ -44,7 +46,25 @@ def _ensure_logging_configured() -> None: level = getattr(logging, level_name, logging.INFO) except Exception: level = logging.INFO - _root_logger.setLevel(level) + root_logger.setLevel(level) + + # 如果已存在其他 handler,但需要强制向控制台输出(例如运行在 API 模式), + # 则在设置了环境变量 JINGROW_STREAM_LOGGING 的情况下,补充一个 StreamHandler。 + try: + force_stream = os.environ.get("JINGROW_STREAM_LOGGING") + except Exception: + force_stream = None + if force_stream and not any(isinstance(h, logging.StreamHandler) for h in root_logger.handlers): + _sh = logging.StreamHandler() + _sh.setFormatter(logging.Formatter("%(levelname)s - %(message)s - %(asctime)s")) + root_logger.addHandler(_sh) + + # 使用 root 的处理器,避免 jingrow 自带 handler 导致重复输出 + if _root_logger.handlers: + _root_logger.handlers.clear() + _root_logger.propagate = True + # 与 root 保持同级别,避免级别不一致导致丢日志 + _root_logger.setLevel(root_logger.level) def _init_adapter(run_mode: str = "api"): global _adapter diff --git a/apps/jingrow/jingrow/adapters/local_adapter.py b/apps/jingrow/jingrow/adapters/local_adapter.py index 3ef316e..87cac2d 100644 --- a/apps/jingrow/jingrow/adapters/local_adapter.py +++ b/apps/jingrow/jingrow/adapters/local_adapter.py @@ -7,305 +7,12 @@ Jingrow 本地数据库适配器 """ from typing import Dict, List, Any, Optional, Union -import logging import os from jingrow.model.page import Page from jingrow.config import Config -logger = logging.getLogger(__name__) class LocalAdapter: """本地数据库适配器 - 直接操作本地数据库""" - def __init__(self, **config): - self.db_host = Config.jingrow_db_host - self.db_port = Config.jingrow_db_port - self.db_name = Config.jingrow_db_name - self.db_user = Config.jingrow_db_user - self.db_password = Config.jingrow_db_password - self.db_type = Config.jingrow_db_type - - logger.info(f"Local adapter initialized with database: {self.db_name}@{self.db_host}:{self.db_port}") - - def _get_db_connection(self): - """获取数据库连接""" - try: - if self.db_type.lower() == 'mariadb' or self.db_type.lower() == 'mysql': - import pymysql - return pymysql.connect( - host=self.db_host, - port=int(self.db_port), - user=self.db_user, - password=self.db_password, - database=self.db_name, - charset='utf8mb4' - ) - elif self.db_type.lower() == 'postgresql': - import psycopg2 - return psycopg2.connect( - host=self.db_host, - port=int(self.db_port), - user=self.db_user, - password=self.db_password, - database=self.db_name - ) - else: - raise ValueError(f"Unsupported database type: {self.db_type}") - except Exception as e: - logger.error(f"Failed to connect to database: {e}") - raise - - def _execute_query(self, query: str, params: tuple = None): - """执行数据库查询""" - conn = None - try: - conn = self._get_db_connection() - cursor = conn.cursor() - cursor.execute(query, params) - - if query.strip().upper().startswith('SELECT'): - return cursor.fetchall() - else: - conn.commit() - return cursor.rowcount - except Exception as e: - if conn: - conn.rollback() - logger.error(f"Database query failed: {e}") - raise - finally: - if conn: - conn.close() - - def get_pg(self, pagetype: str, name: str = None, **kwargs): - """获取页面类型文档""" - logger.info(f"Local get_pg: {pagetype}, {name}") - - if not name: - return None - - # 查询数据库 - query = f"SELECT * FROM `tab{pagetype}` WHERE name = %s" - results = self._execute_query(query, (name,)) - - if not results: - return None - - # 转换为 Page 对象 - class MockPage(Page): - def __init__(self, pagetype, name=None, **kwargs): - d = {"pagetype": pagetype, **kwargs} - if name is not None: - d["name"] = name - super().__init__(d) - - # 获取列名 - conn = self._get_db_connection() - cursor = conn.cursor() - cursor.execute(f"DESCRIBE `tab{pagetype}`") - columns = [row[0] for row in cursor.fetchall()] - conn.close() - - # 构建数据字典 - data = dict(zip(columns, results[0])) - - return MockPage(pagetype, name, **data) - - def get_list(self, pagetype: str, *args, **kwargs): - """获取页面类型列表""" - logger.info(f"Local get_list: {pagetype}") - - # 构建查询条件 - where_conditions = [] - params = [] - - filters = kwargs.get('filters', {}) - for field, value in filters.items(): - where_conditions.append(f"`{field}` = %s") - params.append(value) - - # 构建查询语句 - query = f"SELECT * FROM `tab{pagetype}`" - if where_conditions: - query += " WHERE " + " AND ".join(where_conditions) - - # 添加排序 - order_by = kwargs.get('order_by') - if order_by: - query += f" ORDER BY {order_by}" - - # 添加限制 - limit_start = kwargs.get('limit_start', 0) - limit_page_length = kwargs.get('limit_page_length', 20) - query += f" LIMIT {limit_start}, {limit_page_length}" - - results = self._execute_query(query, tuple(params)) - - # 转换为 Page 对象列表 - class MockPage(Page): - def __init__(self, pagetype, name=None, **kwargs): - d = {"pagetype": pagetype, **kwargs} - if name is not None: - d["name"] = name - super().__init__(d) - - # 获取列名 - conn = self._get_db_connection() - cursor = conn.cursor() - cursor.execute(f"DESCRIBE `tab{pagetype}`") - columns = [row[0] for row in cursor.fetchall()] - conn.close() - - pages = [] - for row in results: - data = dict(zip(columns, row)) - name = data.get('name') - pages.append(MockPage(pagetype, name, **data)) - - return pages - - def new_pg(self, pagetype: str, **kwargs): - """创建新的页面类型文档""" - logger.info(f"Local new_pg: {pagetype}") - - - class MockPage(Page): - def __init__(self, pagetype, name=None, **kwargs): - d = {"pagetype": pagetype, **kwargs} - if name is not None: - d["name"] = name - super().__init__(d) - - return MockPage(pagetype, None, **kwargs) - - def save_pg(self, pg): - """保存页面类型文档""" - logger.info(f"Local save_pg: {pg.pagetype}, {pg.name}") - - # 获取表结构 - conn = self._get_db_connection() - cursor = conn.cursor() - cursor.execute(f"DESCRIBE `tab{pg.pagetype}`") - columns = [row[0] for row in cursor.fetchall()] - conn.close() - - # 构建数据字典 - data = pg.as_dict() - - # 检查文档是否存在 - existing = self.get_pg(pg.pagetype, pg.name) - - if existing: - # 更新现有文档 - set_clauses = [] - params = [] - - for field, value in data.items(): - if field in columns and field not in ['name', 'doctype']: - set_clauses.append(f"`{field}` = %s") - params.append(value) - - if set_clauses: - query = f"UPDATE `tab{pg.pagetype}` SET {', '.join(set_clauses)} WHERE name = %s" - params.append(pg.name) - self._execute_query(query, tuple(params)) - else: - # 插入新文档 - fields = [] - placeholders = [] - values = [] - - for field, value in data.items(): - if field in columns: - fields.append(f"`{field}`") - placeholders.append("%s") - values.append(value) - - if fields: - query = f"INSERT INTO `tab{pg.pagetype}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)})" - self._execute_query(query, tuple(values)) - - return pg - - def delete_pg(self, pagetype: str, name: str): - """删除页面类型文档""" - logger.info(f"Local delete_pg: {pagetype}, {name}") - - query = f"DELETE FROM `tab{pagetype}` WHERE name = %s" - self._execute_query(query, (name,)) - - return True - - def db_exists(self, pagetype: str, filters: Dict[str, Any] = None): - """检查文档是否存在""" - logger.info(f"Local db_exists: {pagetype}, {filters}") - - if not filters: - return False - - where_conditions = [] - params = [] - - for field, value in filters.items(): - where_conditions.append(f"`{field}` = %s") - params.append(value) - - query = f"SELECT COUNT(*) FROM `tab{pagetype}` WHERE {' AND '.join(where_conditions)}" - results = self._execute_query(query, tuple(params)) - - return results[0][0] > 0 if results else False - - def db_get_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None): - """获取数据库值""" - logger.info(f"Local db_get_value: {pagetype}, {filters}, {fieldname}") - - if not filters or not fieldname: - return None - - where_conditions = [] - params = [] - - for field, value in filters.items(): - where_conditions.append(f"`{field}` = %s") - params.append(value) - - query = f"SELECT `{fieldname}` FROM `tab{pagetype}` WHERE {' AND '.join(where_conditions)} LIMIT 1" - results = self._execute_query(query, tuple(params)) - - return results[0][0] if results else None - - def db_set_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None, value: Any = None): - """设置数据库值""" - logger.info(f"Local db_set_value: {pagetype}, {filters}, {fieldname}, {value}") - - if not filters or not fieldname: - return False - - where_conditions = [] - params = [] - - for field, field_value in filters.items(): - where_conditions.append(f"`{field}` = %s") - params.append(field_value) - - query = f"UPDATE `tab{pagetype}` SET `{fieldname}` = %s WHERE {' AND '.join(where_conditions)}" - params.insert(0, value) - - self._execute_query(query, tuple(params)) - return True - - def log_error(self, title: str, message: str): - """记录错误日志""" - logger.error(f"[{title}] {message}") - - # 也可以记录到数据库 - try: - query = "INSERT INTO `tabError Log` (title, error, method, error_trace) VALUES (%s, %s, %s, %s)" - self._execute_query(query, (title, message, "jingrow_local", "")) - except Exception as e: - logger.warning(f"Failed to log error to database: {e}") - - def throw(self, title: str, message: str): - """抛出异常""" - from ..exceptions import JingrowException - raise JingrowException(title, message) + pass \ No newline at end of file