123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import copy
- import os
- from typing import Optional
- from fastapi import APIRouter, Query, Depends, Path, UploadFile, File
- from openpyxl import load_workbook
- from sqlalchemy import or_, text
- from sqlalchemy.ext.asyncio import AsyncSession
- from starlette.background import BackgroundTasks
- from admin.api.endpoints.school.utils import check_row
- from core.config import settings
- from core.security import hashed_password
- from crud.school import crud_school, crud_grade, crud_class
- from crud.user import crud_student
- from models.user import SysUser
- from schemas.base import OrderByField, ReturnField
- from schemas.school.student import (StudentList, StudentDetail, NewStudent,
- StudentInDB, UpdateStudent)
- from utils.depends import get_async_db, get_current_user
- router = APIRouter()
- # 学生列表
- @router.get("/students",
- response_model=StudentList,
- response_model_exclude_none=True,
- summary="学生列表")
- async def get_students(
- page: Optional[int] = None,
- size: Optional[int] = None,
- sno: str = Query("", description="学号"),
- name: str = Query("", description="学生姓名"),
- tid: int = Query(0, description="学生ID"),
- sid: int = Query(0, description="学校ID"),
- gid: int = Query(0, description="年级ID"),
- cid: int = Query(0, description="班级ID"),
- order: OrderByField = Query("-created_at",
- description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"),
- res: ReturnField = Query("", description="返回字段,取值: id,name,sno,phone"),
- db: AsyncSession = Depends(get_async_db),
- current_user: SysUser = Depends(get_current_user)):
- # 过滤条件
- filters = {}
- if sno:
- filters["sno"] = sno
- if name:
- filters["name"] = name
- if tid:
- filters["id"] = tid
- if sid:
- filters["school_id"] = sid
- if gid:
- filters["grade_id"] = gid
- if cid:
- filters["class_id"] = cid
- if ((page is not None) and page >= 1) and ((size is not None)
- and size >= 1):
- offset = (page - 1) * size
- else:
- offset = size = None
- if isinstance(order, str):
- order = [text(order)]
- total, items = await crud_student.find_all(db,
- filters=filters,
- offset=offset,
- limit=size,
- order_by=order,
- return_fields=res)
- return {"total": total, "data": items}
- # 创建学生
- @router.post("/students",
- response_model=StudentDetail,
- response_model_exclude_none=True,
- summary="创建学生")
- async def create_student(info: NewStudent,
- db: AsyncSession = Depends(get_async_db),
- current_user: SysUser = Depends(get_current_user)):
- # 判断学校是否存在
- db_school = await crud_school.find_one(db, filters={"id": info.school_id})
- if not db_school:
- return {"errcode": 404, "mess": "学校不存在!"}
- # 判断年级是否存在
- db_grade = await crud_grade.find_one(db, filters={"id": info.grade_id})
- if not db_grade:
- return {"errcode": 404, "mess": "年级不存在!"}
- # 判断班级是否存在
- db_class = await crud_class.find_one(db, filters={"id": info.class_id})
- if not db_class:
- return {"errcode": 404, "mess": "班级不存在!"}
- # 判断是否存在同年级同学号
- existed = await crud_student.count(
- db,
- filters=[
- or_(text(f"school_id = {info.school_id} AND sno = '{info.sno}'"),
- text(f"phone = '{info.phone}'"))
- ])
- if existed:
- return {"errcode": 400, "mess": "学号或手机号重复!"}
- # 开始创建
- obj_in = StudentInDB(**info.dict(by_alias=True),
- school_name=db_school.name,
- grade_name=db_grade.name,
- class_name=db_class.name,
- username=info.phone,
- password=hashed_password(info.phone[-6:]),
- creator_id=current_user.id,
- creator_name=current_user.username,
- editor_id=current_user.id,
- editor_name=current_user.username)
- db_obj = await crud_student.insert_one(db, obj_in)
- return {"data": db_obj}
- @router.get("/students/{sid}",
- response_model=StudentDetail,
- response_model_exclude_none=True,
- summary="学生详情")
- async def get_student(sid: int = Path(..., description="学生ID"),
- db: AsyncSession = Depends(get_async_db),
- current_user: SysUser = Depends(get_current_user)):
- db_obj = await crud_student.find_one(db, filters={"id": sid})
- return {"data": db_obj}
- # 更新学生
- @router.put("/students/{sid}",
- response_model=StudentDetail,
- response_model_exclude_none=True,
- summary="更新学生")
- async def update_student(info: UpdateStudent,
- sid: int = Path(..., description="学生ID"),
- db: AsyncSession = Depends(get_async_db),
- current_user: SysUser = Depends(get_current_user)):
- # 判断提交参数是否为空
- info_dict = info.dict(exclude_none=True)
- if not info_dict:
- return {"errcode": 400, "mess": "提交参数为空!"}
- # 判断学生是否存在
- db_obj = await crud_student.find_one(db, filters={"id": sid})
- if not db_obj:
- return {"errcode": 404, "mess": "学生不存在!"}
- # 判断学校是否存在
- if ("school_id"
- in info_dict) and (db_obj.school_id != info_dict["school_id"]):
- db_school = await crud_school.find_one(db,
- filters={"id": info.school_id})
- if not db_school:
- return {"errcode": 404, "mess": "学校不存在!"}
- else:
- info.school_name = db_school.name
- # 判断年级是否存在
- if ("grade_id" in info_dict) and (db_obj.grade_id != info_dict["grade_id"]):
- db_grade = await crud_grade.find_one(db, filters={"id": info.grade_id})
- if not db_grade:
- return {"errcode": 404, "mess": "年级不存在!"}
- else:
- info.grade_name = db_grade.name
- # 判断班级是否存在
- if ("class_id" in info_dict) and (db_obj.class_id != info_dict["class_id"]):
- db_class = await crud_class.find_one(db, filters={"id": info.class_id})
- if not db_class:
- return {"errcode": 404, "mess": "班级不存在!"}
- else:
- info.class_name = db_class.name
- # 判断手机号是否重复
- if ("phone" in info_dict) and (db_obj.phone != info_dict["phone"]):
- db_student = await crud_student.find_one(
- db, filters=[text(f"id != {sid} AND phone = '{info.phone}'")])
- if db_student:
- return {"errcode": 400, "mess": "手机号重复!"}
- info.username = info.phone[-6:]
- # 更新
- info.editor_id = current_user.id
- info.editor_name = current_user.username
- db_obj = await crud_student.update(db, db_obj, info)
- return {"data": db_obj}
- # 删除学生
- @router.delete("/students/{sid}",
- response_model=StudentDetail,
- response_model_exclude_none=True,
- summary="删除学生")
- async def delete_student(bg_task: BackgroundTasks,
- sid: int = Path(..., description="学生ID"),
- db: AsyncSession = Depends(get_async_db),
- current_user: SysUser = Depends(get_current_user)):
- existed = await crud_student.count(db, {"id": sid})
- if not existed:
- return {"errcode": 404, "mess": "学生不存在!"}
- else:
- await crud_student.delete(db, obj_id=sid)
- # TODO: 删除关联数据
- # bg_task.add_task(delete_related_object, db, cid=id)
- return {"data": None}
- # 批量导入学生
- @router.post("/students/bulk",
- response_model=StudentDetail,
- response_model_exclude_none=True,
- summary="批量导入学生")
- async def import_student(datafile: UploadFile = File(..., description="数据文件"),
- db: AsyncSession = Depends(get_async_db),
- current_user: SysUser = Depends(get_current_user)):
- # 判断文件格式
- if not datafile.filename.endswith(".xlsx"):
- return {"errcode": 400, "mess": "文件格式错误!"}
- # 把文件写入磁盘,再加载回来
- disk_file = os.path.join(settings.UPLOADER_PATH, datafile.filename)
- content = await datafile.read()
- with open(disk_file, "wb") as f:
- f.write(content)
- # 返回结果
- errors = []
- success = 0
- students = []
- counter = 0
- # 使用openpyxl读取文件
- wb = load_workbook(disk_file)
- ws = wb.worksheets[0]
- for row in ws.iter_rows(min_row=2,
- max_col=ws.max_column,
- max_row=ws.max_row,
- values_only=True):
- row = await check_row(row, 9)
- if row is None: # 空行
- continue
- elif not row: # 字段不完整
- errors.append(f"第{row[0]}行: 某些单元格为空!")
- continue
- # 判断学校是否存在
- db_school = await crud_school.find_one(db, filters={"name": row[6]})
- if not db_school:
- errors.append(f"第{row[0]}行: 学校不存在!")
- continue
- # 判断年级是否存在
- db_grade = await crud_grade.find_one(db,
- filters={
- "school_id": db_school.id,
- "name": row[7]
- })
- if not db_grade:
- errors.append(f"第{row[0]}行: 年级不存在!")
- continue
- # 判断班级是否存在
- db_class = await crud_class.find_one(db,
- filters={
- "school_id": db_school.id,
- "grade_id": db_grade.id,
- "name": row[8]
- })
- if not db_class:
- errors.append(f"第{row[0]}行: 班级不存在!")
- continue
- # 判断是否存在同名学生
- db_student = await crud_student.find_one(
- db,
- filters=[
- or_(text(f"school_id = {db_school.id} AND sno = '{row[1]}'"),
- text(f"phone = '{row[5]}'"))
- ])
- if db_student:
- errors.append(f"第{row[0]}行: 学号或手机号重复!")
- continue
- # 创建教师对象
- obj_in = StudentInDB(sno=row[1],
- username=row[5],
- password=hashed_password(row[5][-6:]),
- name=row[2],
- sex=1 if row[3] == "男" else 0,
- age=row[4],
- phone=row[5],
- sid=db_school.id,
- school_name=row[6],
- gid=db_grade.id,
- grade_name=row[7],
- cid=db_class.id,
- class_name=row[8],
- creator_id=current_user.id,
- creator_name=current_user.username,
- editor_id=current_user.id,
- editor_name=current_user.username)
- students.append(obj_in)
- success += 1
- counter += 1
- if counter == 50:
- await crud_student.insert_many(db, copy.deepcopy(students))
- students.clear()
- counter = 0
- if counter:
- await crud_student.insert_many(db, students)
- # 删除上传文件
- os.remove(disk_file)
- return {"data": {"success": success, "fail": len(errors), "errors": errors}}
|