#!/usr/bin/env python # -*- coding: utf-8 -*- from typing import Any import aioredis from core.config import settings # 同步Redis # class RedisCli(object): # # def __init__(self, # *, # host: str, # port: int = 6379, # password: str = "", # db: int = 0, # socket_timeout: int = 5): # self._redis_client = None # self.host = host # self.port = port # self.password = password # self.db = db # self.socket_timeout = socket_timeout # # def init_redis_connect(self) -> None: # """ # 初始化连接 # :return: # """ # try: # self._redis_client = redis.Redis(host=self.host, # port=self.port, # password=self.password, # socket_timeout=self.socket_timeout, # decode_responses=True) # if not self._redis_client.ping(): # print("Connect Redis Timeout!") # sys.exit() # except (redis.AuthenticationError, Exception) as ex: # print(f"Connect Redis Error: {str(ex)}") # sys.exit() # # def __getattr__(self, item): # return getattr(self._redis_client, item) # # def __getitem__(self, item): # return self._redis_client[item] # # def __setitem__(self, key, value): # self._redis_client[key] = value # # def __delitem__(self, key): # del self._redis_client[key] # # # redis_client = RedisCli(host=settings.REDIS_HOST, # port=settings.REDIS_PORT, # password=settings.REDIS_PASSWORD, # db=settings.REDIS_DB) # 异步Redis class AsyncRedis(object): def __init__(self, host: str, port: int = 6379, password: str = "", db: int = 0): self._redis = None self.host = host self.port = port self.password = password self.db = db async def get_redis_pool(self): if not self._redis: params = { "address": f"redis://{self.host}:{self.port}", "db": self.db } if self.password: params["password"] = self.password self._redis = await aioredis.create_redis_pool(**params) return self._redis async def close(self): if self._redis: print("Close Redis Pool!!") self._redis.close() await self._redis.wait_closed() async def set(self, key: str, value: Any, *, expire: int = 9): r = await self.get_redis_pool() await r.set(key, value, expire=expire) async def get(self, key: str): r = await self.get_redis_pool() val = await r.get(key) return val async def delete(self, key: str): r = await self.get_redis_pool() await r.delete(key) redis_client = AsyncRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, password=settings.REDIS_PASSWORD, db=settings.REDIS_DB) __all__ = ["redis_client"]