12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import json
- from typing import Generator, Union, Optional
- import jwt
- from fastapi import security, HTTPException, status, Depends
- from fastapi.security import OAuth2PasswordBearer
- from pydantic import ValidationError
- from sqlalchemy.ext.asyncio import AsyncSession
- from core import security
- from core.config import settings
- from crud.user import crud_admin, crud_teacher, crud_student
- from crud.sysdata.role import crud_role, crud_permission
- from db.asyncsession import LocalAsyncSession
- from models.user import Teacher, Student, Admin
- from schemas.auth import TokenPayload
- reusable_oauth2 = OAuth2PasswordBearer(tokenUrl=f"/{settings.API_V1_STR}/login")
- async def get_async_db() -> Generator:
- async with LocalAsyncSession() as db:
- yield db
- def check_access_token(token: str):
- try:
- payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[security.ALGORITHM])
- payload["sub"] = json.loads(payload["sub"])
- token_payload = TokenPayload(**payload)
- except (jwt.PyJWTError, ValidationError):
- raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid Access Token!!")
- return token_payload
- async def get_current_user(token: str = Depends(reusable_oauth2),
- db: AsyncSession = Depends(get_async_db)):
- token_payload = check_access_token(token)
- if token_payload.sub["utype"] == 0:
- crud = crud_admin
- elif token_payload.sub["utype"] == 1:
- crud = crud_teacher
- else:
- crud = crud_student
- user = await crud.find_one(db, filters={"username": token_payload.sub["sub"]})
- user.utype = token_payload.sub["utype"]
- if token_payload.sub["utype"] == 0:
- role_id = user.role_id
- role = await crud_role.find_one(db, filters={"id": role_id})
- user.pcodes = role.permission_codes.split(",") if role.permission_codes else []
- return user
|