import io import os import requests import traceback import tempfile import base64 from urllib.parse import urlparse from pathlib import Path from PIL import Image import asyncio from settings import settings class JvectorService: def __init__(self): """初始化矢量图转换服务""" # 获取配置变量 self.upload_url = settings.upload_url self.vector_api_id = settings.vector_api_id self.vector_api_secret = settings.vector_api_secret self.vector_mode = settings.vector_mode def _get_config(self, key): """获取配置值,从环境变量读取""" if key == "upload_url": return settings.upload_url # 其他配置项的处理方式 config_map = {} return config_map.get(key, "") def upload_image_to_intermediate_server(self, image_url): """上传图片到中转服务器的函数""" try: response = requests.get(image_url, verify=False) response.raise_for_status() image_data = response.content parsed_url = urlparse(image_url) file_name = Path(parsed_url.path).name file_ext = Path(file_name).suffix # 如果图片是webp格式,转换为png格式 if file_ext.lower() == '.webp': image = Image.open(io.BytesIO(image_data)) png_buffer = io.BytesIO() image.save(png_buffer, format='PNG') image_data = png_buffer.getvalue() file_name = file_name.replace('.webp', '.png') files = {"file": (file_name, image_data)} upload_response = requests.post(self.upload_url, files=files, verify=False) if upload_response.status_code == 200: return upload_response.json()["file_url"] else: error_msg = f"上传失败. 状态码: {upload_response.status_code}, {upload_response.text}" print(error_msg) raise Exception(error_msg) except Exception as e: error_msg = f"上传图像到中间服务器失败: {str(e)}, URL: {image_url}" print(error_msg) traceback.print_exc() raise Exception(error_msg) def convert_image_to_vector(self, image_url): """将图片转换为矢量图的函数""" try: url = "https://vectorizer.ai/api/v1/vectorize" data = { 'image.url': image_url, 'mode': self.vector_mode } auth = (self.vector_api_id, self.vector_api_secret) response = requests.post(url, data=data, auth=auth) response.raise_for_status() return response.content except Exception as e: error_msg = f"转换图像为矢量图失败: {str(e)}, URL: {image_url}" print(error_msg) traceback.print_exc() raise Exception(error_msg) def svg_to_base64(self, svg_content): """将SVG内容转换为base64字符串""" return base64.b64encode(svg_content).decode('utf-8') async def vectorize_image(self, image_url): """ 将图片转换为矢量图 Args: image_url: 输入图像的URL Returns: 处理后的矢量图内容 """ try: # 转换为矢量图 vector_content = self.convert_image_to_vector(image_url) # 转换为base64 svg_content = self.svg_to_base64(vector_content) return { "status": "success", "svg_content": svg_content } except Exception as e: raise Exception(f"矢量图转换失败: {str(e)}") async def vectorize_from_file(self, file_content): """ 从上传的文件内容创建矢量图 Args: file_content: 上传的文件内容 Returns: 处理后的矢量图内容 """ temp_file = None try: # 创建临时文件 temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') with open(temp_file.name, 'wb') as f: f.write(file_content) # 上传到中转服务器 with open(temp_file.name, 'rb') as f: files = {"file": ("image.png", f)} upload_response = requests.post(self.upload_url, files=files, verify=False) if upload_response.status_code == 200: intermediate_url = upload_response.json()["file_url"] else: raise Exception(f"上传失败. 状态码: {upload_response.status_code}") # 转换为矢量图 vector_content = self.convert_image_to_vector(intermediate_url) # 转换为base64 svg_content = self.svg_to_base64(vector_content) return { "status": "success", "svg_content": svg_content } except Exception as e: raise Exception(f"处理文件失败: {str(e)}") finally: # 清理临时文件 if temp_file and os.path.exists(temp_file.name): try: os.unlink(temp_file.name) except: pass async def process_batch(self, urls): """ 批量处理多个URL图像,流式返回结果 Args: urls: 图片URL列表 Yields: 每个图片的处理结果 """ total = len(urls) success_count = 0 error_count = 0 for i, url in enumerate(urls, 1): try: url_str = str(url) result = await self.vectorize_image(url_str) success_count += 1 # 确保返回正确的数据格式 yield { "index": i, "total": total, "original_url": url_str, "status": "success", "svg_content": result["svg_content"], "success_count": success_count, "error_count": error_count, "message": "处理成功" } except Exception as e: error_count += 1 yield { "index": i, "total": total, "original_url": str(url), "status": "error", "error": str(e), "success_count": success_count, "error_count": error_count, "message": f"处理失败: {str(e)}" } # 让出控制权,避免阻塞 await asyncio.sleep(0) def is_valid_url(self, url): """验证URL是否有效""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except: return False def download_file(self, url, filename): """下载文件到本地""" response = requests.get(url, stream=True) response.raise_for_status() with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return filename