#!/usr/bin/env python # -*- coding: utf-8 -*- import os from typing import Any, List, Dict from fastapi import Depends, Query, Path, UploadFile, File from openpyxl import load_workbook from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from admin.api.endpoints.school.utils import check_filetype, check_row from common.const import PERIODS, SUBJECTS, RESOURCE_TYPES, WORK_RESOURCE_TYPES from core.config import settings from crud.base import CrudManager from crud.marktask import crud_task from crud.paper import crud_paper from crud.resource import CATEGORY_CRUDS, RESOURCE_CRUDS from models.resource import CATEGORY_MODES from models.user import Admin from schemas.resource import NewCategory, UpdateCategory from schemas.resource.category import CategoryInDB from utils.depends import get_async_db, get_current_user # 学段列表 async def get_periods(current_user: Admin = Depends(get_current_user)): data = [{"name": item} for item in PERIODS] return {"data": data} # 学科列表 async def get_subjects(current_user: Admin = Depends(get_current_user)): data = [{"name": item} for item in SUBJECTS] return {"data": data} async def get_work_types(current_user: Admin = Depends(get_current_user)): data = [{"name": item} for item in WORK_RESOURCE_TYPES] return {"data": data} # 分类 async def get_category_tree(crud: CrudManager, db: AsyncSession = Depends(get_async_db), root: List[Any] = None): for item in root: children = await crud.fetch_all(db, filters=[text(f"pid={item.id}")]) item.children = children await get_category_tree(crud, db, item.children) async def get_categories(ctype: str = Query(..., description="资源类型,exam / work"), period: str = Query(None, description="学段名称,eg:小学/初中/高中"), subject: str = Query(None, description="学科名称,eg: 语文/数学..."), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): if ctype not in RESOURCE_TYPES: return {"errcode": 400, "mess": "资源类型错误!"} else: model = CATEGORY_MODES[ctype] crud = CATEGORY_CRUDS[ctype] _q = [model.pid == 0] if period: _q.append(model.period == period) if subject: _q.append(model.subject == subject) root = await crud.fetch_all(db, filters=_q) await get_category_tree(crud, db, root) return {"data": root} # 创建资源分类 async def create_category(info: NewCategory, db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断提交参数是否为空 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "缺少分类信息!"} # 分类名称 if not info_dict["name"]: return {"errcode": 400, "mess": "缺少分类名称!"} # 分类类型,work / exam if info.ctype not in RESOURCE_TYPES: return {"errcode": 400, "mess": "分类类型错误!"} else: crud = CATEGORY_CRUDS[info.ctype] # 分类 if info.pid: # 判断上级分类是否存在 category = await crud.find_one(db, filters={"id": info.pid}) if not category: return {"errcode": 404, "mess": "上级分类不存在!"} else: info.pname = category.name # 判断同级分类是否存在重复 _q = {"name": info.name, "pid": info.pid, "period": info.period, "subject": info.subject} existed = await crud.count(db, filters=_q) if existed: return {"errcode": 400, "mess": "存在同名分类!"} # 创建 delattr(info, "ctype") db_obj = await crud.insert_one(db, info) return {"data": db_obj} # 更新资源分类 async def update_category(info: UpdateCategory, cid: int = Path(..., description="分类ID"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 上级分类ID不能与自身相同 if info.pid == cid: return {"errcode": 400, "mess": "上级分类不能等于自身!"} if info.ctype not in RESOURCE_TYPES: return {"errcode": 400, "mess": "分类类型错误!"} # 判断提交参数是否为空 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "提交参数为空!"} # 判断分类是否存在 crud = CATEGORY_CRUDS[info_dict["ctype"]] model = CATEGORY_MODES[info_dict["ctype"]] db_obj = await crud.find_one(db, filters={"id": cid}) if not db_obj: return {"errcode": 404, "mess": "分类不存在!"} # 如果修改的不是一级分类,则判断上级分类是否存在 if info.pid != db_obj.pid: if bool(info.pid) ^ bool(info.pname): return {"errcode": 400, "mess": "缺少上级分类ID或名称"} if info.pid: # 如果不是修改为一级分类 existed = await crud.count(db, filters={"id": info.pid, "name": info.pname}) if not existed: return {"errcode": 404, "mess": "上级分类不存在!"} else: info_dict.pop("pid", None) info_dict.pop("pname", None) # 判断同级分类是否存在重复 if ("name" in info_dict) and (info_dict["name"] != db_obj.name): existed = await crud.count( db, filters=[model.id != cid, model.name == info.name, model.pid == info.pid]) if existed: return {"errcode": 400, "mess": "存在同名分类!"} else: info_dict.pop("name", None) # 判断学段是否变更 if ("period" in info_dict) and (info.period == db_obj.period): info_dict.pop("period", None) # 判断学科是否变更 if ("subject" in info_dict) and (info.subject == db_obj.subject): info_dict.pop("subject", None) # 更新 if info_dict: info_dict.pop("ctype", None) db_obj = await crud.update(db, db_obj, info) return {"data": db_obj} # 删除资源分类 async def delete_category(cid: int = Path(..., description="资源分类ID"), ctype: str = Query(..., description="分类类型,exam / work"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): if ctype not in RESOURCE_TYPES: return {"errcode": 400, "mess": "分类类型错误!"} else: crud = CATEGORY_CRUDS[ctype] existed = await crud.count(db, filters={"id": cid}) if not existed: return {"errcode": 404, "mess": "分类不存在!"} # 判断是否存在关联资源 _q = {"category_id": cid} total = await RESOURCE_CRUDS[ctype].count(db, filters=_q) if not total: total = await crud_paper.count(db, filters=_q) # 判断是否存在关联试卷 if not total: total = await crud_task.count(db, filters=_q) # 判断是否存在关联阅卷任务 # 执行删除 if total: return {"errcode": 400, "mess": "不能删除有资源关联的分类"} else: await crud.delete(db, obj_id=cid) return {"data": None} # 批量导入分类 async def import_category(ctype: str = Path(..., description="资源类型,exam / work"), datafile: UploadFile = File(..., description="数据文件"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断文件格式 if not check_filetype(datafile.filename, ".xlsx"): return {"errcode": 400, "mess": "文件格式错误!"} # 判断分类类型 if ctype not in RESOURCE_TYPES: return {"errcode": 400, "mess": "资源类型错误!"} else: crud = CATEGORY_CRUDS[ctype] # 把文件写入磁盘,再加载回来 disk_file = os.path.join(settings.UPLOADER_PATH, datafile.filename) content = await datafile.read() with open(disk_file, "wb") as f: f.write(content) # 返回结果 errors = [] success = 0 counter = 2 category_maps = {} row_length = 6 if ctype == "work" else 5 # 使用openpyxl读取文件,生成分类字典 wb = load_workbook(disk_file) ws = wb.worksheets[0] for row in ws.iter_rows(min_row=2, max_col=ws.max_column, max_row=ws.max_row, values_only=True): row = await check_row(row, row_length) if row is None: # 空行 continue elif not row: # 字段不完整 errors.append(f"第{counter}行: 某些单元格为空!") continue # 判断学段是否正确 if row[0] not in PERIODS: errors.append(f"第{counter}行: 分类学段错误!") continue # 判断学科是否正确 if row[1] not in SUBJECTS: errors.append(f"第{counter}行: 分类科目错误!") continue # 判断分类名称是否为空 if not row[row_length - 1]: errors.append(f"第{counter}行: 分类名称错误!") continue # 解析模版,获取所有的分类 if row[0] not in category_maps: category_maps[row[0]] = {} if row[1] not in category_maps[row[0]]: category_maps[row[0]][row[1]] = {} if row[2] not in category_maps[row[0]][row[1]]: category_maps[row[0]][row[1]][row[2]] = {} if row[3] not in category_maps[row[0]][row[1]][row[2]]: category_maps[row[0]][row[1]][row[2]][row[3]] = {} if row[4] not in category_maps[row[0]][row[1]][row[2]][row[3]]: category_maps[row[0]][row[1]][row[2]][row[3]][row[4]] = {} if ctype == "work": if row[5] not in category_maps[row[0]][row[1]][row[2]][row[3]][row[4]]: category_maps[row[0]][row[1]][row[2]][row[3]][row[4]][row[5]] = counter counter += 1 # 解析分类字典,进行分类创建 for period in category_maps: # 学段 for sub in category_maps[period]: # 学科 for c1 in category_maps[period][sub]: # 一级分类 _q = {"period": period, "subject": sub, "name": c1, "pid": 0} fields = {"name": c1, "pid": 0, "pname": "", "period": period, "subject": sub} db_c1 = await get_or_create(crud, db, _q, fields) for c2 in category_maps[period][sub][c1]: # 二级分类 _q.update({"name": c2, "pid": db_c1.id}) fields.update({"name": c2, "pid": db_c1.id, "pname": c1}) db_c2 = await get_or_create(crud, db, _q, fields) for c3 in category_maps[period][sub][c1][c2]: # 三级分类 _q.update({"name": c3, "pid": db_c2.id}) fields.update({"name": c3, "pid": db_c2.id, "pname": c2}) db_c3 = await get_or_create(crud, db, _q, fields) if ctype == "work": for c4 in category_maps[period][sub][c1][c2][c3]: # 四级分类 if not c4: continue _q.update({"name": c4, "pid": db_c3.id}) fields.update({"name": c4, "pid": db_c3.id, "pname": c3}) await get_or_create(crud, db, _q, fields) success += 1 else: success += 1 # 删除上传文件 os.remove(disk_file) return {"data": {"success": success, "fail": len(errors), "errors": errors}} async def get_or_create(crud: CrudManager, db: AsyncSession, filters: Dict[str, Any], fields: Dict[str, Any]): db_obj = await crud.find_one(db, filters=filters, return_fields=["id"]) if not db_obj: obj_in = CategoryInDB(**fields) db_obj = await crud.insert_one(db, obj_in) return db_obj