1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import json
- from typing import Generator, Union, Optional
- import jwt
- from fastapi import security, HTTPException, status, Depends
- from fastapi.security import OAuth2PasswordBearer
- from pydantic import ValidationError
- from sqlalchemy.ext.asyncio import AsyncSession
- from core import security
- from core.config import settings
- from crud.user import crud_admin, crud_teacher, crud_student
- from db.asyncsession import LocalAsyncSession
- from models.user import Teacher, Student, SysUser
- from schemas.auth import TokenPayload
- reusable_oauth2 = OAuth2PasswordBearer(tokenUrl=f"/{settings.API_V1_STR}/login")
- async def get_async_db() -> Generator:
- async with LocalAsyncSession() as db:
- yield db
- def check_access_token(token: str):
- try:
- payload = jwt.decode(token,
- settings.SECRET_KEY,
- algorithms=[security.ALGORITHM])
- payload["sub"] = json.loads(payload["sub"])
- token_payload = TokenPayload(**payload)
- except (jwt.PyJWTError, ValidationError):
- raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
- detail="Invalid Access Token!!")
- return token_payload
- async def get_current_user(token: str = Depends(reusable_oauth2),
- db: AsyncSession = Depends(get_async_db)
- ) -> Optional[Union[SysUser, Teacher, Student]]:
- token_payload = check_access_token(token)
- if token_payload.sub["utype"] == 0:
- crud = crud_admin
- elif token_payload.sub["utype"] == 1:
- crud = crud_teacher
- else:
- crud = crud_student
- user = await crud.find_one(db, {"username": token_payload.sub["sub"]})
- return user
|