#!/usr/bin/env python # -*- coding: utf-8 -*- import json from typing import Generator, Union, Optional import jwt from fastapi import security, HTTPException, status, Depends from fastapi.security import OAuth2PasswordBearer from pydantic import ValidationError from sqlalchemy.ext.asyncio import AsyncSession from core import security from core.config import settings from crud.user import crud_admin, crud_teacher, crud_student from crud.sysdata.role import crud_role, crud_permission from db.asyncsession import LocalAsyncSession from models.user import Teacher, Student, Admin from schemas.auth import TokenPayload reusable_oauth2 = OAuth2PasswordBearer(tokenUrl=f"/{settings.API_V1_STR}/login") async def get_async_db() -> Generator: async with LocalAsyncSession() as db: yield db def check_access_token(token: str): try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]) payload["sub"] = json.loads(payload["sub"]) token_payload = TokenPayload(**payload) except (jwt.PyJWTError, ValidationError): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid Access Token!!") return token_payload async def get_current_user(token: str = Depends(reusable_oauth2), db: AsyncSession = Depends(get_async_db)): token_payload = check_access_token(token) if token_payload.sub["utype"] == 0: crud = crud_admin elif token_payload.sub["utype"] == 1: crud = crud_teacher else: crud = crud_student user = await crud.find_one(db, filters={"username": token_payload.sub["sub"]}) user.utype = token_payload.sub["utype"] if token_payload.sub["utype"] == 0: role_id = user.role_id role = await crud_role.find_one(db, filters={"id": role_id}) user.pcodes = role.permission_codes.split(",") if role.permission_codes else [] return user