1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- # -*- coding: utf-8 -*-
- import base64
- import os
- from typing import Union, Dict, Any
- import oss2
- import requests
- from core.config import settings
- class OSSFileUploader(object):
- def __init__(self, access_key_id: str, access_key_secret: str,
- bucket_name: str, root_name: str, endpoint: str, domain: str):
- self.AccessKeyID = access_key_id
- self.AccessKeySecret = access_key_secret
- self.bucket_name = bucket_name
- self.root_name = root_name
- self.auth = oss2.Auth(self.AccessKeyID, self.AccessKeySecret)
- self.endpoint = endpoint
- self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name)
- self.domain = domain
- def get_url(self, status_code: int, filename: str) -> str:
- return os.path.join(self.domain, filename) if status_code == 200 else ""
- def upload_from_str(self, filename: str, content: Union[bytes, str],
- content_type: Dict[str, Any]):
- """
- 通过字符串上传, byte, str
- """
- filename = os.path.join(self.root_name, filename)
- resp = self.bucket.put_object(filename, content, headers=content_type)
- return self.get_url(resp.status, filename)
- def upload_from_file(self, localfile: str, ossfile: str,
- content_type: Dict[str, Any]):
- """
- 上传本地文件到oss
- """
- ossfile = os.path.join(self.root_name, ossfile)
- with open(localfile, "rb") as f:
- resp = self.bucket.put_object(ossfile, f, headers=content_type)
- return self.get_url(resp.status, ossfile)
- def resumable_upload_from_local(self, localfile: str, ossfile: str,
- content_type: Dict[str, Any]):
- ossfile = os.path.join(self.root_name, ossfile)
- resp = oss2.resumable_upload(self.bucket,
- ossfile,
- localfile,
- headers=content_type,
- store=oss2.ResumableStore(root='/tmp'),
- multipart_threshold=100 * 1024,
- part_size=100 * 1024,
- num_threads=4)
- return self.get_url(resp.status, ossfile)
- def upload_from_url(self, fileurl: str, ossfile: str,
- content_type: Dict[str, Any]):
- """
- 通过URL上传
- """
- ossfile = os.path.join(self.root_name, ossfile)
- resp = self.bucket.put_object(ossfile,
- requests.get(fileurl),
- headers=content_type)
- return self.get_url(resp.status, ossfile)
- def delete_file(self, filename: str):
- resp = self.bucket.delete_object(filename)
- return True if resp.status == 200 else False
- ossfile_uploader = OSSFileUploader(settings.OSSINFO["AccessKeyID"],
- settings.OSSINFO["AccessKeySecret"],
- settings.OSSINFO["BucketName"],
- settings.OSSINFO["RootName"],
- settings.OSSINFO["Endpoint"],
- settings.OSSINFO["Domain"])
- __all__ = ["ossfile_uploader"]
- # if __name__ == '__main__':
- # myoss = OSSFileUploader("LTAIbkpKrzQViemJ",
- # "9y9mqNFMMDfUEBGvWqHt3wGas2W5ML", "scxjcclub",
- # "say365", "http://oss-cn-beijing.aliyuncs.com",
- # 'http://scxjcclub.oss-cn-beijing.aliyuncs.com')
- # with open("../ttt.txt", "rb") as f:
- # url = myoss.upload_from_str("oohtest.png",
- # base64.b64decode(f.read()),
- # content_type={"Content-Type": "image/png"})
- # localfile = '/tmp/test.txt'
- # ossfile = 'test/test.txt'
- # url = myoss.resumable_upload_from_local(localfile, ossfile)
- # print(url)
|