# app.py import os import oss2 import sys import uuid import shutil import time import gradio as gr import requests from pathlib import Path from datetime import datetime, timedelta import dashscope import spaces # Required by ZeroGPU environment @spaces.GPU(duration=1) def _gpu_placeholder(): pass # from dashscope.utils.oss_utils import check_and_upload_local DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY") dashscope.api_key = DASHSCOPE_API_KEY def get_upload_policy(api_key, model_name): """获取文件上传凭证""" url = "https://dashscope.aliyuncs.com/api/v1/uploads" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } params = { "action": "getPolicy", "model": model_name } response = requests.get(url, headers=headers, params=params) if response.status_code != 200: raise Exception(f"Failed to get upload policy: {response.text}") return response.json()['data'] def upload_file_to_oss(policy_data, file_path): """将文件上传到临时存储OSS""" file_name = Path(file_path).name key = f"{policy_data['upload_dir']}/{file_name}" with open(file_path, 'rb') as file: files = { 'OSSAccessKeyId': (None, policy_data['oss_access_key_id']), 'Signature': (None, policy_data['signature']), 'policy': (None, policy_data['policy']), 'x-oss-object-acl': (None, policy_data['x_oss_object_acl']), 'x-oss-forbid-overwrite': (None, policy_data['x_oss_forbid_overwrite']), 'key': (None, key), 'success_action_status': (None, '200'), 'file': (file_name, file) } response = requests.post(policy_data['upload_host'], files=files) if response.status_code != 200: raise Exception(f"Failed to upload file: {response.text}") return f"oss://{key}" def upload_file_and_get_url(api_key, model_name, file_path): """上传文件并获取URL""" # 1. 获取上传凭证,上传凭证接口有限流,超出限流将导致请求失败 policy_data = get_upload_policy(api_key, model_name) # 2. 上传文件到OSS oss_url = upload_file_to_oss(policy_data, file_path) return oss_url class WanAnimateApp: def __init__(self, url, get_url): self.url = url self.get_url = get_url def predict( self, ref_img, video, model_id, model, ): # Upload files to OSS if needed and get URLs image_url = upload_file_and_get_url(DASHSCOPE_API_KEY, model_id, ref_img) video_url = upload_file_and_get_url(DASHSCOPE_API_KEY, model_id, video) # Prepare the request payload payload = { "model": model_id, "input": { "image_url": image_url, "video_url": video_url }, "parameters": { "check_image": True, "mode": model, } } # Set up headers headers = { "X-DashScope-Async": "enable", "X-DashScope-OssResourceResolve": "enable", "Authorization": f"Bearer {DASHSCOPE_API_KEY}", "Content-Type": "application/json" } # Make the initial API request url = self.url response = requests.post(url, json=payload, headers=headers, timeout=60) # Check if request was successful if response.status_code != 200: raise Exception(f"Initial request failed with status code {response.status_code}: {response.text}") # Get the task ID from response result = response.json() task_id = result.get("output", {}).get("task_id") if not task_id: raise Exception("Failed to get task ID from response") # Poll for results get_url = f"{self.get_url}/{task_id}" headers = { "Authorization": f"Bearer {DASHSCOPE_API_KEY}", "Content-Type": "application/json" } while True: response = requests.get(get_url, headers=headers, timeout=60) if response.status_code != 200: raise Exception(f"Failed to get task status: {response.status_code}: {response.text}") result = response.json() print(result) task_status = result.get("output", {}).get("task_status") if task_status == "SUCCEEDED": # Task completed successfully, return video URL video_url = result["output"]["results"]["video_url"] return video_url, "SUCCEEDED" elif task_status == "PENDING" or task_status == "RUNNING": # Task is still running, wait and retry time.sleep(10) # Wait 10 seconds before polling again else: # Task failed or unknown, raise an exception with error message error_msg = result.get("output", {}).get("message", "Unknown error") code_msg = result.get("output", {}).get("code", "Unknown code") print(f"\n\nTask failed: {error_msg} Code: {code_msg} TaskId: {task_id}\n\n") return None, f"Task failed: {error_msg} Code: {code_msg} TaskId: {task_id}" def start_app(): import argparse parser = argparse.ArgumentParser(description="Wan2.2-Animate 视频生成工具") args = parser.parse_args() url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2video/video-synthesis/" get_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/" app = WanAnimateApp(url=url, get_url=get_url) with gr.Blocks(title="Wan2.2-Animate 视频生成") as demo: gr.HTML("""