# -*- coding: utf-8 -*- import base64 from typing import Tuple, Union, List, Dict, Any from starlette.datastructures import UploadFile from schemas.common import UploadBase64File from utils.fileuploader import ossfile_uploader async def upload_oss_file(file: Union[UploadFile, UploadBase64File], content_type: Dict[str, Any]) -> Tuple[bool, str]: try: if isinstance(file, UploadFile): content = await file.read() else: content = base64.b64decode(file.content[22:]) url = ossfile_uploader.upload_from_str(file.filename, content, content_type) except Exception as ex: print(f"[Error] Upload File: {str(ex)}") return False, str(ex) return True, url async def delete_oss_file(filenames: Union[str, List[str]]): if isinstance(filenames, str): filenames = [filenames] try: for name in filenames: ossfile_uploader.delete_file(name) except Exception as ex: print(f"[Error] Delete File: {str(ex)}") return False, str(ex)