123456789101112131415161718192021222324252627282930313233343536 |
- # -*- coding: utf-8 -*-
- import base64
- from typing import Tuple, Union, List, Dict, Any
- from starlette.datastructures import UploadFile
- from schemas.common import UploadBase64File
- from utils.fileuploader import ossfile_uploader
- async def upload_oss_file(file: Union[UploadFile, UploadBase64File],
- content_type: Dict[str, Any]) -> Tuple[bool, str]:
- try:
- if isinstance(file, UploadFile):
- content = await file.read()
- else:
- content = base64.b64decode(file.content[22:])
- url = ossfile_uploader.upload_from_str(file.filename, content,
- content_type)
- except Exception as ex:
- print(f"[Error] Upload File: {str(ex)}")
- return False, str(ex)
- return True, url
- async def delete_oss_file(filenames: Union[str, List[str]]):
- if isinstance(filenames, str):
- filenames = [filenames]
- try:
- for name in filenames:
- ossfile_uploader.delete_file(name)
- except Exception as ex:
- print(f"[Error] Delete File: {str(ex)}")
- return False, str(ex)
|