japi/apps/add_bg/service.py
2025-05-12 02:39:56 +08:00

312 lines
12 KiB
Python

import sys
import os
import json
import io
import cv2
import numpy as np
from PIL import Image, ImageFilter, ImageDraw, ImageChops
import uuid
import urllib.request
import urllib3
import requests
from pydantic import BaseModel
from typing import Optional
from colorthief import ColorThief
import tempfile
from urllib.parse import urlparse
import torch
import time
import warnings
import gc
import base64
import asyncio
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import colorsys
# 关闭不必要的警告
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
class AddBgService:
# 默认配置
DEFAULT_CONFIG = {
'added_background_marker': "_added_background",
'enable_texture_effect': False,
'texture_type': 'noise',
'texture_blend_mode': 'multiply',
'enable_depth_of_field': False,
'blur_intensity': 15,
'output_format': 'png',
'enable_lighting_effect': False,
'light_intensity': 0.1,
'light_position': [0.5, 0.3],
'light_radius_ratio': [0.4, 0.25],
'light_angle': 45,
'light_blur': 91,
'light_shape': 'ellipse',
'alpha_background': 0.8,
'design_rotation': 0
}
def __init__(self):
"""初始化添加背景服务"""
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def apply_lighting_effect(self, image, config):
"""应用光照效果"""
light_intensity = config.get('light_intensity', self.DEFAULT_CONFIG['light_intensity'])
light_position = config.get('light_position', self.DEFAULT_CONFIG['light_position'])
light_radius_ratio = config.get('light_radius_ratio', self.DEFAULT_CONFIG['light_radius_ratio'])
light_angle = config.get('light_angle', self.DEFAULT_CONFIG['light_angle'])
light_blur = config.get('light_blur', self.DEFAULT_CONFIG['light_blur'])
light_shape = config.get('light_shape', self.DEFAULT_CONFIG['light_shape'])
height, width = image.shape[:2]
light_position = (int(light_position[0] * width), int(light_position[1] * height))
light_radius = (int(light_radius_ratio[0] * width), int(light_radius_ratio[1] * height))
mask = np.zeros((height, width), dtype=np.uint8)
if light_shape == 'ellipse':
cv2.ellipse(mask, light_position, light_radius, light_angle, 0, 360, 255, -1)
elif light_shape == 'circle':
cv2.circle(mask, light_position, min(light_radius), 255, -1)
elif light_shape == 'rect':
rect_top_left = (light_position[0] - light_radius[0] // 2, light_position[1] - light_radius[1] // 2)
rect_bottom_right = (light_position[0] + light_radius[0] // 2, light_position[1] + light_radius[1] // 2)
cv2.rectangle(mask, rect_top_left, rect_bottom_right, 255, -1)
mask = cv2.GaussianBlur(mask, (light_blur, light_blur), 0)
mask = mask.astype(np.float32) / 255
result = image.astype(np.float32)
for i in range(3):
result[:, :, i] = result[:, :, i] * (1 - light_intensity + mask * light_intensity)
return result.astype(np.uint8)
def generate_noise_texture(self, size, intensity=64):
"""生成噪点纹理"""
noise = np.random.randint(0, intensity, (size, size, 4), dtype=np.uint8)
noise[..., 3] = 255 # 设置 alpha 通道为不透明
return Image.fromarray(noise)
def generate_line_texture(self, size, line_width=4, spacing=20, color=(0, 0, 0, 255)):
"""生成线条纹理"""
texture = Image.new('RGBA', (size, size), (255, 255, 255, 0))
draw = ImageDraw.Draw(texture)
for y in range(0, size, spacing):
draw.line([(0, y), (size, y)], fill=color, width=line_width)
for x in range(0, size, spacing):
draw.line([(x, 0), (x, size)], fill=color, width=line_width)
return texture
def add_texture(self, image, config):
"""添加纹理效果"""
texture_type = config.get('texture_type', self.DEFAULT_CONFIG['texture_type'])
texture_blend_mode = config.get('texture_blend_mode', self.DEFAULT_CONFIG['texture_blend_mode'])
if texture_type == 'noise':
texture = self.generate_noise_texture(image.size[0])
elif texture_type == 'lines':
texture = self.generate_line_texture(image.size[0])
else:
return image
if texture_blend_mode == 'multiply':
return ImageChops.multiply(image, texture)
elif texture_blend_mode == 'overlay':
return ImageChops.overlay(image, texture)
else:
return image
def calculate_dominant_color(self, image):
"""计算图像的主色调"""
try:
# 将PIL Image转换为BytesIO对象
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
img_byte_arr.seek(0)
# 使用ColorThief计算主色调
color_thief = ColorThief(img_byte_arr)
dominant_color = color_thief.get_color(quality=1)
return dominant_color
except Exception as e:
print(f"计算主色调失败: {str(e)}")
# 如果计算失败,返回默认的白色
return (255, 255, 255)
def rgb_to_hex(self, rgb):
"""RGB转HEX"""
return '#{:02x}{:02x}{:02x}'.format(rgb[0], rgb[1], rgb[2])
def calculate_light_color(self, dominant_color, target_lightness=0.92):
r, g, b = [x / 255.0 for x in dominant_color]
h, l, s = colorsys.rgb_to_hls(r, g, b)
l = target_lightness
r2, g2, b2 = colorsys.hls_to_rgb(h, l, s)
return (int(r2 * 255), int(g2 * 255), int(b2 * 255))
def calculate_monochrome_color(self, dominant_color, alpha):
# alpha参数可忽略
return self.calculate_light_color(dominant_color, target_lightness=0.92)
def apply_depth_of_field(self, background_image, blur_intensity):
"""应用景深效果"""
background_image_pil = Image.fromarray(background_image).convert("RGBA")
blurred_background = background_image_pil.filter(ImageFilter.GaussianBlur(blur_intensity))
return np.array(blurred_background)
def rotate_image_with_transparency(self, image, angle):
"""旋转图像"""
rotated_image = image.rotate(angle, expand=True)
return rotated_image
def process_image(self, image, config):
"""处理图像,添加背景"""
try:
# 合并默认配置和用户配置
config = {**self.DEFAULT_CONFIG, **config}
# 计算主色并设置背景颜色
try:
dominant_color = self.calculate_dominant_color(image)
background_color = self.calculate_monochrome_color(dominant_color, config['alpha_background'])
except Exception as e:
# 使用默认的白色背景
background_color = (255, 255, 255)
# 创建背景图像
background_image = np.full((image.height, image.width, 4), (*background_color, 255), dtype=np.uint8)
# 应用景深效果
if config['enable_depth_of_field']:
background_image = self.apply_depth_of_field(background_image, config['blur_intensity'])
# 应用纹理效果
if config['enable_texture_effect']:
background_image_pil = Image.fromarray(background_image).convert("RGBA")
background_image_pil = self.add_texture(background_image_pil, config)
background_image = np.array(background_image_pil)
# 将前景图像转换为numpy数组
foreground = np.array(image)
# 旋转前景图像
if config['design_rotation'] != 0:
foreground_pil = Image.fromarray(foreground)
foreground_pil = self.rotate_image_with_transparency(foreground_pil, config['design_rotation'])
foreground = np.array(foreground_pil)
# 合并前景和背景
alpha = foreground[:, :, 3] / 255.0
for c in range(3):
background_image[:, :, c] = background_image[:, :, c] * (1 - alpha) + foreground[:, :, c] * alpha
# 确保最终图像不透明
background_image[:, :, 3] = 255
# 应用光照效果
if config['enable_lighting_effect']:
background_image = self.apply_lighting_effect(background_image, config)
# 转换回PIL图像
return Image.fromarray(background_image)
except Exception as e:
raise Exception(f"处理图片失败: {str(e)}")
def image_to_base64(self, image, config):
"""将图片转换为base64格式"""
try:
output_format = config.get('output_format', self.DEFAULT_CONFIG['output_format'])
buffered = io.BytesIO()
image.save(buffered, format=output_format.upper())
img_str = base64.b64encode(buffered.getvalue()).decode()
return f"data:image/{output_format.lower()};base64,{img_str}"
except Exception as e:
raise Exception(f"转换图片为base64失败: {str(e)}")
async def add_background(self, image_path, config=None):
"""为图片添加背景"""
try:
# 下载图片
if self.is_valid_url(image_path):
image_content = self.download_image(image_path)
image = Image.open(io.BytesIO(image_content))
else:
image = Image.open(image_path)
# 确保图片是RGBA模式
if image.mode != 'RGBA':
image = image.convert('RGBA')
# 处理图片
processed_image = self.process_image(image, config or {})
# 转换为base64
result = self.image_to_base64(processed_image, config or {})
return {
'status': 'success',
'image_content': result
}
except Exception as e:
return {
'status': 'error',
'message': f"处理图片失败: {str(e)}"
}
async def add_background_from_file(self, file_content, config=None):
"""
从上传的文件内容添加背景
Args:
file_content: 上传的文件内容
config: 配置参数
Returns:
处理后的图像内容
"""
if config is None:
config = {}
try:
# 从文件内容创建PIL Image对象
image = Image.open(io.BytesIO(file_content)).convert("RGBA")
image_with_bg = self.process_image(image, config)
# 转换为base64
image_content = self.image_to_base64(image_with_bg, config)
return {
"status": "success",
"image_content": image_content
}
except Exception as e:
raise Exception(f"处理图片失败: {e}")
def is_valid_url(self, url):
"""验证URL是否有效"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except:
return False
def download_image(self, url):
"""下载图片并返回内容"""
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.content
except Exception as e:
raise Exception(f"下载图片失败: {str(e)}")
def cleanup(self):
"""清理资源"""
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()