# -*- coding: utf-8 -*- import os from typing import Union, Dict, Any import oss2 import requests from core.config import settings class OSSFileUploader(object): def __init__(self, access_key_id: str, access_key_secret: str, bucket_name: str, root_name: str, endpoint: str, domain: str): self.AccessKeyID = access_key_id self.AccessKeySecret = access_key_secret self.bucket_name = bucket_name self.root_name = root_name self.auth = oss2.Auth(self.AccessKeyID, self.AccessKeySecret) self.endpoint = endpoint self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name) self.domain = domain def get_url(self, status_code: int, filename: str) -> str: return os.path.join(self.domain, filename) if status_code == 200 else "" def upload_from_str(self, filename: str, content: Union[bytes, str], content_type: Dict[str, Any]): """ 通过字符串上传, byte, str """ filename = os.path.join(self.root_name, filename) resp = self.bucket.put_object(filename, content, headers=content_type) return self.get_url(resp.status, filename) def upload_from_file(self, localfile: str, ossfile: str, content_type: Dict[str, Any]): """ 上传本地文件到oss """ ossfile = os.path.join(self.root_name, ossfile) with open(localfile, "rb") as f: resp = self.bucket.put_object(ossfile, f, headers=content_type) return self.get_url(resp.status, ossfile) def resumable_upload_from_local(self, localfile: str, ossfile: str, content_type: Dict[str, Any]): ossfile = os.path.join(self.root_name, ossfile) resp = oss2.resumable_upload(self.bucket, ossfile, localfile, headers=content_type, store=oss2.ResumableStore(root='/tmp'), multipart_threshold=100 * 1024, part_size=100 * 1024, num_threads=4) return self.get_url(resp.status, ossfile) def upload_from_url(self, fileurl: str, ossfile: str, content_type: Dict[str, Any]): """ 通过URL上传 """ ossfile = os.path.join(self.root_name, ossfile) resp = self.bucket.put_object(ossfile, requests.get(fileurl), headers=content_type) return self.get_url(resp.status, ossfile) def delete_file(self, filename: str): resp = self.bucket.delete_object(filename) return True if resp.status == 200 else False def get_preview_url(self, fileurl: str) -> str: filename = fileurl.replace(f"{self.domain}/", "") style_str = "imm/previewdoc,copy_0" expire = 60 * 10 url = self.bucket.sign_url("GET", filename, expire, params={"x-oss-process": style_str}) return url ossfile_uploader = OSSFileUploader( settings.OSSINFO["AccessKeyID"], settings.OSSINFO["AccessKeySecret"], settings.OSSINFO["BucketName"], settings.OSSINFO["RootName"], settings.OSSINFO["Endpoint"], settings.OSSINFO["Domain"], ) __all__ = ["ossfile_uploader"] if __name__ == '__main__': myoss = OSSFileUploader( "LTAIbkpKrzQViemJ", "9y9mqNFMMDfUEBGvWqHt3wGas2W5ML", "scxjcclub", "say365", "http://oss-cn-beijing.aliyuncs.com", 'http://scxjcclub.oss-cn-beijing.aliyuncs.com', ) # with open("../ttt.txt", "rb") as f: # url = myoss.upload_from_str("oohtest.png", # base64.b64decode(f.read()), # content_type={"Content-Type": "image/png"}) # localfile = '/tmp/test.txt' # ossfile = 'test/test.txt' # url = myoss.resumable_upload_from_local(localfile, ossfile) # 获取预览URL preview_url = myoss.get_preview_url("say365/高中数学人教A版必修5导学案-1.1正弦定理.doc") print(preview_url)