提交 956e1906 编辑于 作者: Libcat's avatar Libcat 👷
浏览文件

基本功能

上级
流水线 #3569 已失败 ,包含阶段
in 6 minute 和 15 second
# Created by https://www.toptal.com/developers/gitignore/api/python,visualstudiocode
# Edit at https://www.toptal.com/developers/gitignore?templates=python,visualstudiocode
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
pytestdebug.log
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
doc/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
#poetry.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
# .env
.env/
.venv/
env/
venv/
ENV/
env.bak/
venv.bak/
pythonenv*
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# operating system-related files
# file properties cache/storage on macOS
*.DS_Store
# thumbnail cache on Windows
Thumbs.db
# profiling data
.prof
### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide
# End of https://www.toptal.com/developers/gitignore/api/python,visualstudiocode
\ No newline at end of file
from decimal import Decimal
from typing import List, Optional, Any
from datetime import datetime
from fastapi import FastAPI
from pydantic import BaseModel
from pydantic.utils import GetterDict
import peewee
# Important!
# see https://fastapi.tiangolo.com/zh/advanced/sql-databases-peewee/#create-a-peeweegetterdict-for-the-pydantic-models-schemas
class PeeweeGetterDict(GetterDict):
def get(self, key: Any, default: Any = None):
res = getattr(self._obj, key, default)
if isinstance(res, peewee.ModelSelect):
return list(res)
return res
class Model(BaseModel):
class Config:
orm_mode = True
getter_dict = PeeweeGetterDict
class Response(Model):
"""带提示和自定义数据的操作结果"""
succ: bool
msg: str
class ProductType(Model):
id: int
name: str
# products: List[Product]
class ProductTypeCreate(Model):
name: str
class ProductBase(Model):
"""商品"""
id: int
name: str
price: Decimal
description: str
# product_type: ProductType
image: Optional[str]
class ProductGet(Model):
"""商品"""
id: int
name: str
price: Decimal
description: str
detail: str
inventory: int
sales: int
product_type: ProductType
created_time: datetime
image: Optional[str]
class ProductCreate(Model):
"""商品"""
name: str
description: str
detail: str
inventory: int
price: Decimal
product_type: int
image: Optional[str]
class UserBase(Model):
"""用户"""
id: int
username: str
email: str
is_admin: bool
image: Optional[str]
created_time: datetime
class UserCreate(Model):
"""用户"""
username: str
password: str
email: str
image: Optional[str]
class UserUpdate(Model):
"""用户"""
email: Optional[str]
image: Optional[str]
class UserLogin(Model):
"""用户登录"""
username: str
password: str
class UserPassword(Model):
"""修改密码"""
oldPassword: str
newPassword: str
class UserAddressBase(Model):
"""地址"""
name: str
phone: str
address: str
class UserAddressFull(Model):
"""地址"""
id: int
name: str
phone: str
address: str
created_time: datetime
# 订单中的商品
class OrderItems(Model):
"""订单项目 读取"""
product: ProductBase
amount: int
price: Decimal
total: Decimal
class OrderItemsCreate(Model):
"""订单项目 创建"""
product: int
amount: int
class OrderGet(Model):
"""订单 读取"""
id: int
user: UserBase
address: UserAddressBase
memo: Optional[str]
created_time: datetime
items: List[OrderItems]
total: Decimal
class OrderCreate(Model):
"""订单 创建"""
user: int
address: int
memo: Optional[str]
items: List[OrderItemsCreate]
deleteFromCar: Optional[bool] = True
class OrderResult(Response):
"""下单结果"""
order: Optional[OrderGet]
# 购物车项目
class CarItemBase(Model):
"""购物车项目"""
id: int
product: ProductBase
amount: int
created_time: datetime
class CarItemUpdate(Model):
"""购物车项目 增加/修改/删除通用"""
product: int
amount: Optional[int] = 1
class SigninSignupResult(Response):
"""注册结果"""
user: Optional[UserBase]
from typing import List, Optional, Union
from fastapi import APIRouter, Depends, Response
import apischemas as api
import database as db
from .authorize import get_user
router = APIRouter(tags=["购物车"], prefix="/car")
@router.get('/',
response_model=List[api.CarItemBase],
description="获取用户购物车")
async def get_multi(user: db.User = Depends(get_user)):
return list(db.CarItems.select().where(db.CarItems.user==user.id))
def get_car_item(userid: int, productid: int)->Union[db.CarItems, None]:
return db.CarItems.get_or_none(db.CarItems.user==userid, db.CarItems.product_id==productid)
@router.post('/', response_model=api.CarItemBase, description="添加产品到购物车")
async def create_one(item: api.CarItemUpdate,
user: db.User = Depends(get_user)):
# 先查询购物车中是否已经存在该商品
dbitem = get_car_item(user.id, item.product)
if dbitem:
dbitem.amount += item.amount # 数量求和
dbitem.save()
else:
dbitem = db.CarItems.create(**item.dict(), user=user.id)
return dbitem
@router.delete('/', response_model=api.CarItemBase, description="删除购物车项目")
async def delete_one(item: api.CarItemUpdate, user: db.User = Depends(get_user)):
dbitem = get_car_item(user.id, item.product)
if dbitem:
dbitem.delete_instance()
else:
return Response("该项目不存在", 404)
@router.put('/', response_model=api.CarItemBase, description="修改购物车项目")
async def modify_one(item: api.CarItemUpdate, user: db.User = Depends(get_user)):
dbitem = get_car_item(user.id, item.product)
if dbitem:
dbitem.amount = item.amount
dbitem.save()
return dbitem
\ No newline at end of file
from typing import List, Optional
from fastapi import APIRouter, UploadFile, File
from pathlib import Path
import apischemas as api
import database as db
router = APIRouter(tags=["文件操作"], prefix="/upload")
@router.post("/")
async def create_upload_file(file: UploadFile = File(...)):
content = await file.read()
with open(Path("upload")/Path(file.filename), 'wb') as f:
f.write(content)
return {"filename": file.filename, "content_type": file.content_type, "size": len(content)}
\ No newline at end of file
from typing import List, Optional
from decimal import Decimal
from fastapi import APIRouter, Depends, HTTPException, Response
import apischemas as api
from database import *
from .authorize import get_user
router = APIRouter(tags=["订单系统"], prefix="/order")
@router.get('/', response_model=List[api.OrderGet], description="获取订单列表")
async def get_multiple(user: User = Depends(get_user)):
return list(user.orders.order_by(Order.created_time.desc()))
@router.get('/{id}', response_model=api.OrderGet, description="获取订单")
async def get_order_by_id(id: int):
dbitem = Order.get_or_none(Order.id == id)
if dbitem:
return dbitem
@router.post('/', response_model=api.OrderGet, description="创建订单")
async def create_one(order: api.OrderCreate):
dbitem = None
with dbconnection.atomic() as transaction:
# 先创建一个订单,总金额默认为0
db_order = Order.create(**order.dict())
# 循环创建每个订单项
order_total = Decimal(0)
for item in order.items:
# 查询商品
prod: Product = Product.get_or_none(Product.id == item.product)
# 检查商品存在
if prod == None:
transaction.rollback()
return Response(f"商品{item.product}不存在!", 403)
# 检查商品库存呢
if prod.inventory < item.amount:
transaction.rollback()
return Response(f"商品{item.product}库存不足!", 403)
# 减去商品库存
prod.inventory -= item.amount
prod.sales += item.amount
prod.save()
# 创建订单项 计算总价
OrderItems.create(order=db_order,
product=prod,
amount=item.amount,
price=prod.price,
total=item.amount * prod.price)
(CarItems.delete().where(CarItems.product_id == item.product,
CarItems.user_id == order.user).execute())
order_total += item.amount * prod.price # 累加总价
# 设置订单总价
db_order.total = order_total
db_order.save()
dbitem = db_order
if dbitem:
dbitem = Order.get_by_id(dbitem.id)
return dbitem
@router.delete('/{id}', description="删除订单")
async def delete_one(id: int, user: User = Depends(get_user)):
dbitem:Order = Order.get_or_none(Order.id == id)
if dbitem:
if dbitem.user_id==user.id:
dbitem.delete_instance()
else:
return Response("您没有该订单的权限", 403)
else:
return Response("该订单不存在", 404)
\ No newline at end of file
from decimal import Decimal
from pathlib import Path
from typing import List, Optional
from fastapi import APIRouter, UploadFile, Form, Response
import arrow
import apischemas as api
from database import *
router = APIRouter(tags=["产品CRUD"], prefix="/products")
@router.get('/', response_model=List[api.ProductGet], description="获取产品列表")
async def get_list(product_type: Optional[int] = None,
name: Optional[str] = None):
q = Product.select()
if product_type:
q = q.where(Product.product_type_id == product_type)
if name:
q = q.where(Product.name.contains(name))
return list(q)
@router.get('/{id}', response_model=api.ProductGet, description="获取产品")
async def get_one(id: int):
return Product.get_or_none(Product.id == id)
@router.post('/', response_model=api.ProductGet, description="创建产品")
async def create_one(name: str = Form(...),
description: Optional[str] = Form(""),
product_type: int = Form(...),
price: Optional[Decimal] = Form(0),
inventory: Optional[int] = Form(0),
sales: Optional[int] = Form(0),
image: Optional[UploadFile] = Form(None),
detail: Optional[str] = Form("")):
filename = None
if image:
content = await image.read()
suffix = Path(image.filename).suffix
# 获取图片后缀
if suffix == "" or suffix == None:
suffix = ".jpg"
filename = f"product-{arrow.now().int_timestamp}{suffix}"
# 保存图片
with open("upload/" + filename, 'wb') as f:
f.write(content)
dbitem = Product.create(name=name,
description=description,
product_type=product_type,
price=price,
inventory=inventory,
sales=sales,
image=filename,
detail=detail)
return dbitem
@router.put('/{id}', response_model=api.ProductGet, description="修改产品")
async def update_one(id: int,
name: Optional[str] = Form(None),
description: Optional[str] = Form(None),
product_type: Optional[int] = Form(None),
price: Optional[Decimal] = Form(None),
inventory: Optional[int] = Form(None),
sales: Optional[int] = Form(None),
image: Optional[UploadFile] = Form(None),
detail: Optional[str] = Form(None)):
dbitem: Product = Product.get_or_none(Product.id == id)
if not dbitem:
return Response("该项目不存在", 404)
# 按需更新
if name:
dbitem.name = name
if description:
dbitem.description = description
if product_type:
dbitem.product_type = product_type
if price:
dbitem.price = price
if inventory:
dbitem.inventory = inventory
if sales:
dbitem.sales = sales
if detail:
dbitem.detail = detail
if image:
content = await image.read()
suffix = Path(image.filename).suffix
# 获取图片后缀
if suffix == "" or suffix == None:
suffix = ".jpg"
filename = f"product-{arrow.now().int_timestamp}{suffix}"
# 保存图片
with open("upload/" + filename, 'wb') as f:
f.write(content)
dbitem.image = filename
# 保存
dbitem.save()
return dbitem
@router.delete('/{id}', description="删除产品")
async def delete_one(id: int):
dbitem: Product = Product.get_or_none(Product.id == id)
if dbitem: