前言
在写项目时,经常需要验证前端功能。前端需要获取后端数据,后端还没有写好。需要快速写一个接口,来给前端提供数据。利用fastapi快速给定义一个接口,和前端交互。
相关文章:利用FastAPI快速开发服务模块
过程
需要先注册路由->设计结构体->定义接口->编写处理函数
1. 注册路由
routes/api.py
api_router.include_router(pay.router, tags=["pay"])
2. 结构体
app/schemas/pay.py
from pydantic import BaseModel
class Payment(BaseModel):
"""
订单请求
"""
merno: str
firstname: str
lastname: str
url: str = None
ip: str
orderno: str
total: float
currency: str
email: str
notifyurl: str
returnurl: str
telephone: str = None
country: str
state: str
city: str
zipcode: str
address: str
useragent: str = None
goods: str
sign: str
class PayResponse(BaseModel):
"""
订单请求返回
"""
code: int
msg: str
data: dict
class Notify(BaseModel):
"""
订单请求
"""
orderno: str
total: float
status: str
transno: str
sign: str
3. api接口
app/http/api/pay.py
from fastapi import APIRouter, Depends
from app.http.deps import get_db
from app.schemas.pay import Payment, PayResponse, Notify
from app.services.pay.payment import ParameterHandle, NotifyHandle
router = APIRouter(
prefix="/pay"
)
@router.post("/submit", response_model=PayResponse)
def submit(item: Payment):
"""
生成支付订单接口
"""
payment = item.dict()
para = ParameterHandle(payment)
return para.respond()
@router.post("/notify", response_model=str)
def submit(item: Notify):
"""
生成支付订单接口
"""
notify = item.dict()
noti = NotifyHandle(notify)
return noti.respond()
4. 逻辑处理
from app.schemas.pay import Payment, Notify
from app.services.pay.superxpay import *
import logging
import json
def create_payment_response_from_submit(code, result):
"""
构建返回函数
"""
resp_dict = {
"code": code,
"msg": '成功',
"data": result
}
return resp_dict
class ParameterHandle:
def __init__(self, request_json: Payment):
self.req_json = request_json
def respond(self):
rep_json = {}
#记录请求
logging.debug(json.dumps(self.req_json, indent=4))
if verify_md5_sign(0, self.req_json):
ref = get_order_ref()
get_payurl_by_post(self.req_json, ref)
payurl = 'http://www.testpay.url'
logging.debug(payurl)
rep_json['payurl']= payurl
return create_payment_response_from_submit(1, rep_json)
class NotifyHandle:
def __init__(self, request_json: Notify):
self.req_json = request_json
def respond(self):
#记录请求
logging.debug(json.dumps(self.req_json, indent=4))
if verify_md5_sign(1, self.req_json):
pay_sucess_notify_aweb(self.req_json)
return 'SUCCESS'
评论 (0)