优化日志输出逻辑

This commit is contained in:
jingrow 2025-10-30 22:18:09 +08:00
parent ba31f5adc7
commit 8b25b8ccec
2 changed files with 27 additions and 300 deletions

View File

@ -29,13 +29,15 @@ for _lvl in list(_ANSI.keys()):
def _ensure_logging_configured() -> None:
if not _root_logger.handlers:
# 统一在 root logger 上配置输出与格式,这样通过名为 "jingrow" 的 logger 打印时,
# 也会以 "jingrow - ERROR - ... - 时间" 的格式输出,并带颜色的级别名。
root_logger = logging.getLogger()
if not root_logger.handlers:
handler = logging.StreamHandler()
# 模块名 - 级别(已上色) - 消息 - 时间
formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s - %(asctime)s")
# 统一样式:不含 logger 名称,只输出级别、消息、时间
formatter = logging.Formatter("%(levelname)s - %(message)s - %(asctime)s")
handler.setFormatter(formatter)
_root_logger.addHandler(handler)
# 从配置文件Config读取日志级别默认 INFO
root_logger.addHandler(handler)
try:
level_name = str(getattr(Config, 'log_level', 'INFO')).upper()
except Exception:
@ -44,7 +46,25 @@ def _ensure_logging_configured() -> None:
level = getattr(logging, level_name, logging.INFO)
except Exception:
level = logging.INFO
_root_logger.setLevel(level)
root_logger.setLevel(level)
# 如果已存在其他 handler但需要强制向控制台输出例如运行在 API 模式),
# 则在设置了环境变量 JINGROW_STREAM_LOGGING 的情况下,补充一个 StreamHandler。
try:
force_stream = os.environ.get("JINGROW_STREAM_LOGGING")
except Exception:
force_stream = None
if force_stream and not any(isinstance(h, logging.StreamHandler) for h in root_logger.handlers):
_sh = logging.StreamHandler()
_sh.setFormatter(logging.Formatter("%(levelname)s - %(message)s - %(asctime)s"))
root_logger.addHandler(_sh)
# 使用 root 的处理器,避免 jingrow 自带 handler 导致重复输出
if _root_logger.handlers:
_root_logger.handlers.clear()
_root_logger.propagate = True
# 与 root 保持同级别,避免级别不一致导致丢日志
_root_logger.setLevel(root_logger.level)
def _init_adapter(run_mode: str = "api"):
global _adapter

View File

@ -7,305 +7,12 @@ Jingrow 本地数据库适配器
"""
from typing import Dict, List, Any, Optional, Union
import logging
import os
from jingrow.model.page import Page
from jingrow.config import Config
logger = logging.getLogger(__name__)
class LocalAdapter:
"""本地数据库适配器 - 直接操作本地数据库"""
def __init__(self, **config):
self.db_host = Config.jingrow_db_host
self.db_port = Config.jingrow_db_port
self.db_name = Config.jingrow_db_name
self.db_user = Config.jingrow_db_user
self.db_password = Config.jingrow_db_password
self.db_type = Config.jingrow_db_type
logger.info(f"Local adapter initialized with database: {self.db_name}@{self.db_host}:{self.db_port}")
def _get_db_connection(self):
"""获取数据库连接"""
try:
if self.db_type.lower() == 'mariadb' or self.db_type.lower() == 'mysql':
import pymysql
return pymysql.connect(
host=self.db_host,
port=int(self.db_port),
user=self.db_user,
password=self.db_password,
database=self.db_name,
charset='utf8mb4'
)
elif self.db_type.lower() == 'postgresql':
import psycopg2
return psycopg2.connect(
host=self.db_host,
port=int(self.db_port),
user=self.db_user,
password=self.db_password,
database=self.db_name
)
else:
raise ValueError(f"Unsupported database type: {self.db_type}")
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
raise
def _execute_query(self, query: str, params: tuple = None):
"""执行数据库查询"""
conn = None
try:
conn = self._get_db_connection()
cursor = conn.cursor()
cursor.execute(query, params)
if query.strip().upper().startswith('SELECT'):
return cursor.fetchall()
else:
conn.commit()
return cursor.rowcount
except Exception as e:
if conn:
conn.rollback()
logger.error(f"Database query failed: {e}")
raise
finally:
if conn:
conn.close()
def get_pg(self, pagetype: str, name: str = None, **kwargs):
"""获取页面类型文档"""
logger.info(f"Local get_pg: {pagetype}, {name}")
if not name:
return None
# 查询数据库
query = f"SELECT * FROM `tab{pagetype}` WHERE name = %s"
results = self._execute_query(query, (name,))
if not results:
return None
# 转换为 Page 对象
class MockPage(Page):
def __init__(self, pagetype, name=None, **kwargs):
d = {"pagetype": pagetype, **kwargs}
if name is not None:
d["name"] = name
super().__init__(d)
# 获取列名
conn = self._get_db_connection()
cursor = conn.cursor()
cursor.execute(f"DESCRIBE `tab{pagetype}`")
columns = [row[0] for row in cursor.fetchall()]
conn.close()
# 构建数据字典
data = dict(zip(columns, results[0]))
return MockPage(pagetype, name, **data)
def get_list(self, pagetype: str, *args, **kwargs):
"""获取页面类型列表"""
logger.info(f"Local get_list: {pagetype}")
# 构建查询条件
where_conditions = []
params = []
filters = kwargs.get('filters', {})
for field, value in filters.items():
where_conditions.append(f"`{field}` = %s")
params.append(value)
# 构建查询语句
query = f"SELECT * FROM `tab{pagetype}`"
if where_conditions:
query += " WHERE " + " AND ".join(where_conditions)
# 添加排序
order_by = kwargs.get('order_by')
if order_by:
query += f" ORDER BY {order_by}"
# 添加限制
limit_start = kwargs.get('limit_start', 0)
limit_page_length = kwargs.get('limit_page_length', 20)
query += f" LIMIT {limit_start}, {limit_page_length}"
results = self._execute_query(query, tuple(params))
# 转换为 Page 对象列表
class MockPage(Page):
def __init__(self, pagetype, name=None, **kwargs):
d = {"pagetype": pagetype, **kwargs}
if name is not None:
d["name"] = name
super().__init__(d)
# 获取列名
conn = self._get_db_connection()
cursor = conn.cursor()
cursor.execute(f"DESCRIBE `tab{pagetype}`")
columns = [row[0] for row in cursor.fetchall()]
conn.close()
pages = []
for row in results:
data = dict(zip(columns, row))
name = data.get('name')
pages.append(MockPage(pagetype, name, **data))
return pages
def new_pg(self, pagetype: str, **kwargs):
"""创建新的页面类型文档"""
logger.info(f"Local new_pg: {pagetype}")
class MockPage(Page):
def __init__(self, pagetype, name=None, **kwargs):
d = {"pagetype": pagetype, **kwargs}
if name is not None:
d["name"] = name
super().__init__(d)
return MockPage(pagetype, None, **kwargs)
def save_pg(self, pg):
"""保存页面类型文档"""
logger.info(f"Local save_pg: {pg.pagetype}, {pg.name}")
# 获取表结构
conn = self._get_db_connection()
cursor = conn.cursor()
cursor.execute(f"DESCRIBE `tab{pg.pagetype}`")
columns = [row[0] for row in cursor.fetchall()]
conn.close()
# 构建数据字典
data = pg.as_dict()
# 检查文档是否存在
existing = self.get_pg(pg.pagetype, pg.name)
if existing:
# 更新现有文档
set_clauses = []
params = []
for field, value in data.items():
if field in columns and field not in ['name', 'doctype']:
set_clauses.append(f"`{field}` = %s")
params.append(value)
if set_clauses:
query = f"UPDATE `tab{pg.pagetype}` SET {', '.join(set_clauses)} WHERE name = %s"
params.append(pg.name)
self._execute_query(query, tuple(params))
else:
# 插入新文档
fields = []
placeholders = []
values = []
for field, value in data.items():
if field in columns:
fields.append(f"`{field}`")
placeholders.append("%s")
values.append(value)
if fields:
query = f"INSERT INTO `tab{pg.pagetype}` ({', '.join(fields)}) VALUES ({', '.join(placeholders)})"
self._execute_query(query, tuple(values))
return pg
def delete_pg(self, pagetype: str, name: str):
"""删除页面类型文档"""
logger.info(f"Local delete_pg: {pagetype}, {name}")
query = f"DELETE FROM `tab{pagetype}` WHERE name = %s"
self._execute_query(query, (name,))
return True
def db_exists(self, pagetype: str, filters: Dict[str, Any] = None):
"""检查文档是否存在"""
logger.info(f"Local db_exists: {pagetype}, {filters}")
if not filters:
return False
where_conditions = []
params = []
for field, value in filters.items():
where_conditions.append(f"`{field}` = %s")
params.append(value)
query = f"SELECT COUNT(*) FROM `tab{pagetype}` WHERE {' AND '.join(where_conditions)}"
results = self._execute_query(query, tuple(params))
return results[0][0] > 0 if results else False
def db_get_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None):
"""获取数据库值"""
logger.info(f"Local db_get_value: {pagetype}, {filters}, {fieldname}")
if not filters or not fieldname:
return None
where_conditions = []
params = []
for field, value in filters.items():
where_conditions.append(f"`{field}` = %s")
params.append(value)
query = f"SELECT `{fieldname}` FROM `tab{pagetype}` WHERE {' AND '.join(where_conditions)} LIMIT 1"
results = self._execute_query(query, tuple(params))
return results[0][0] if results else None
def db_set_value(self, pagetype: str, filters: Dict[str, Any] = None, fieldname: str = None, value: Any = None):
"""设置数据库值"""
logger.info(f"Local db_set_value: {pagetype}, {filters}, {fieldname}, {value}")
if not filters or not fieldname:
return False
where_conditions = []
params = []
for field, field_value in filters.items():
where_conditions.append(f"`{field}` = %s")
params.append(field_value)
query = f"UPDATE `tab{pagetype}` SET `{fieldname}` = %s WHERE {' AND '.join(where_conditions)}"
params.insert(0, value)
self._execute_query(query, tuple(params))
return True
def log_error(self, title: str, message: str):
"""记录错误日志"""
logger.error(f"[{title}] {message}")
# 也可以记录到数据库
try:
query = "INSERT INTO `tabError Log` (title, error, method, error_trace) VALUES (%s, %s, %s, %s)"
self._execute_query(query, (title, message, "jingrow_local", ""))
except Exception as e:
logger.warning(f"Failed to log error to database: {e}")
def throw(self, title: str, message: str):
"""抛出异常"""
from ..exceptions import JingrowException
raise JingrowException(title, message)
pass