fileuploader.py 3.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # -*- coding: utf-8 -*-
  2. import base64
  3. import os
  4. from typing import Union, Dict, Any
  5. import oss2
  6. import requests
  7. from core.config import settings
  8. class OSSFileUploader(object):
  9. def __init__(self, access_key_id: str, access_key_secret: str,
  10. bucket_name: str, root_name: str, endpoint: str, domain: str):
  11. self.AccessKeyID = access_key_id
  12. self.AccessKeySecret = access_key_secret
  13. self.bucket_name = bucket_name
  14. self.root_name = root_name
  15. self.auth = oss2.Auth(self.AccessKeyID, self.AccessKeySecret)
  16. self.endpoint = endpoint
  17. self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name)
  18. self.domain = domain
  19. def get_url(self, status_code: int, filename: str) -> str:
  20. return os.path.join(self.domain, filename) if status_code == 200 else ""
  21. def upload_from_str(self, filename: str, content: Union[bytes, str],
  22. content_type: Dict[str, Any]):
  23. """
  24. 通过字符串上传, byte, str
  25. """
  26. filename = os.path.join(self.root_name, filename)
  27. resp = self.bucket.put_object(filename, content, headers=content_type)
  28. return self.get_url(resp.status, filename)
  29. def upload_from_file(self, localfile: str, ossfile: str,
  30. content_type: Dict[str, Any]):
  31. """
  32. 上传本地文件到oss
  33. """
  34. ossfile = os.path.join(self.root_name, ossfile)
  35. with open(localfile, "rb") as f:
  36. resp = self.bucket.put_object(ossfile, f, headers=content_type)
  37. return self.get_url(resp.status, ossfile)
  38. def resumable_upload_from_local(self, localfile: str, ossfile: str,
  39. content_type: Dict[str, Any]):
  40. ossfile = os.path.join(self.root_name, ossfile)
  41. resp = oss2.resumable_upload(self.bucket,
  42. ossfile,
  43. localfile,
  44. headers=content_type,
  45. store=oss2.ResumableStore(root='/tmp'),
  46. multipart_threshold=100 * 1024,
  47. part_size=100 * 1024,
  48. num_threads=4)
  49. return self.get_url(resp.status, ossfile)
  50. def upload_from_url(self, fileurl: str, ossfile: str,
  51. content_type: Dict[str, Any]):
  52. """
  53. 通过URL上传
  54. """
  55. ossfile = os.path.join(self.root_name, ossfile)
  56. resp = self.bucket.put_object(ossfile,
  57. requests.get(fileurl),
  58. headers=content_type)
  59. return self.get_url(resp.status, ossfile)
  60. def delete_file(self, filename: str):
  61. resp = self.bucket.delete_object(filename)
  62. return True if resp.status == 200 else False
  63. ossfile_uploader = OSSFileUploader(settings.OSSINFO["AccessKeyID"],
  64. settings.OSSINFO["AccessKeySecret"],
  65. settings.OSSINFO["BucketName"],
  66. settings.OSSINFO["RootName"],
  67. settings.OSSINFO["Endpoint"],
  68. settings.OSSINFO["Domain"])
  69. __all__ = ["ossfile_uploader"]
  70. # if __name__ == '__main__':
  71. # myoss = OSSFileUploader("LTAIbkpKrzQViemJ",
  72. # "9y9mqNFMMDfUEBGvWqHt3wGas2W5ML", "scxjcclub",
  73. # "say365", "http://oss-cn-beijing.aliyuncs.com",
  74. # 'http://scxjcclub.oss-cn-beijing.aliyuncs.com')
  75. # with open("../ttt.txt", "rb") as f:
  76. # url = myoss.upload_from_str("oohtest.png",
  77. # base64.b64decode(f.read()),
  78. # content_type={"Content-Type": "image/png"})
  79. # localfile = '/tmp/test.txt'
  80. # ossfile = 'test/test.txt'
  81. # url = myoss.resumable_upload_from_local(localfile, ossfile)
  82. # print(url)