common.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536
  1. # -*- coding: utf-8 -*-
  2. import base64
  3. from typing import Tuple, Union, List, Dict, Any
  4. from starlette.datastructures import UploadFile
  5. from schemas.common import UploadBase64File
  6. from utils.fileuploader import ossfile_uploader
  7. async def upload_oss_file(file: Union[UploadFile, UploadBase64File],
  8. content_type: Dict[str, Any]) -> Tuple[bool, str]:
  9. try:
  10. if isinstance(file, UploadFile):
  11. content = await file.read()
  12. else:
  13. content = base64.b64decode(file.content[22:])
  14. url = ossfile_uploader.upload_from_str(file.filename, content,
  15. content_type)
  16. except Exception as ex:
  17. print(f"[Error] Upload File: {str(ex)}")
  18. return False, str(ex)
  19. return True, url
  20. async def delete_oss_file(filenames: Union[str, List[str]]):
  21. if isinstance(filenames, str):
  22. filenames = [filenames]
  23. try:
  24. for name in filenames:
  25. ossfile_uploader.delete_file(name)
  26. except Exception as ex:
  27. print(f"[Error] Delete File: {str(ex)}")
  28. return False, str(ex)