知乎专栏 | 多维度架构 |
pip install python-multipart
from fastapi import FastAPI, Form # from starlette.requests import Request from starlette.responses import Response from starlette.testclient import TestClient app = FastAPI() @app.post("/form") async def login(username: str = Form(), password: str = Form()): return {"username": username, "password": password} client = TestClient(app) data = {"username": "netkiller", "password": "123456"} response = client.post("/form", data=data) print(response.content.decode())
from fastapi import FastAPI, Request from typing import Dict # from starlette.requests import Request from starlette.responses import Response from starlette.testclient import TestClient app = FastAPI() @app.post("/json") async def json(item: dict): print(item) return "OK" client = TestClient(app) data = {"key": "value"} response = client.post("/json", json=data) print(response.content.decode())
from fastapi import FastAPI, Request # from starlette.requests import Request from starlette.responses import Response from starlette.testclient import TestClient app = FastAPI() @app.post("/webhook") async def the_webhook(request: Request): return await request.body() data = b"""EURUSD Less Than 1.09092 {"Condition": "value"} [3,4,5,] {}""" data = b"""EURUSD Less Than 1.09092""" client = TestClient(app) response = client.post("/webhook", data=data) print(response.content.decode())
pip install slowapi
from fastapi import FastAPI from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) app = FastAPI() app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) @app.get("/home") @limiter.limit("5/minute") async def homepage(request: Request): return PlainTextResponse("test") @app.get("/mars") @limiter.limit("5/minute") async def homepage(request: Request, response: Response): return {"key": "value"}
from fastapi import APIRouter import time import asyncio router = APIRouter() @router.get("/a") async def a(): time.sleep(1) return {"message": "异步模式,但是同步执行sleep函数,执行过程是串行的"} @router.get("/b") async def b(): loop = asyncio.get_event_loop() await loop.run_in_executor(None, time.sleep, 1) return {"message": "异步线程池中运行sleep函数"} @router.get("/c") async def c(): await asyncio.sleep(1) return {"message": "异步模式,且异步执行sleep函数"} @router.get("/d") def d(): time.sleep(1) return {"message": "同步模式"}
pip install fastapi-cache2
from fastapi import FastAPI from starlette.requests import Request from starlette.responses import Response from fastapi_cache import FastAPICache from fastapi_cache.backends.redis import RedisBackend from fastapi_cache.decorator import cache from redis import asyncio as aioredis app = FastAPI() @cache() async def get_cache(): return 1 @app.get("/") @cache(expire=60) async def index(): return dict(hello="world") @app.on_event("startup") async def startup(): redis = aioredis.from_url("redis://localhost", encoding="utf8", decode_responses=True) FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
@app.get("/") @cache(expire=60, coder=JsonCoder) async def index(): return dict(hello="world")
def my_key_builder( func, namespace: Optional[str] = "", request: Request = None, response: Response = None, *args, **kwargs, ): prefix = FastAPICache.get_prefix() cache_key = f"{prefix}:{namespace}:{func.__module__}:{func.__name__}:{args}:{kwargs}" return cache_key @app.get("/") @cache(expire=60, coder=JsonCoder, key_builder=my_key_builder) async def index(): return dict(hello="world")
security = HTTPBasic() def auth(credentials: Annotated[HTTPBasicCredentials, Depends(security)]): current_username_bytes = credentials.username.encode("utf8") correct_username_bytes = b"admin" is_correct_username = compare_digest(current_username_bytes, correct_username_bytes) current_password_bytes = credentials.password.encode("utf8") correct_password_bytes = b"admin" is_correct_password = compare_digest(current_password_bytes, correct_password_bytes) if not (is_correct_username and is_correct_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Basic"}, ) return credentials.username
应用与方法
@app.get("/who") @cache(expire=60) def read_current_user(username: Annotated[str, Depends(auth)]): return {"username": username}
全局配置
app = FastAPI(title="netkiller", description="HTTP Auth 测试", dependencies=[Depends(auth)])
SSE 协议格式
协议字段之间使用\r\n 分割,数据结尾处使用两个\r\n。
event: message\r\ndata: \xe4\xb8\x83\r\nretry: 15000\r\n\r\n
event: message\r\ndata: \xe5\xa4\x95\r\nretry: 15000\r\n\r\n
event: 表示事件,message和error,对应前端会分别触发onmessage或onerror事件。
retry: 重试时间,让客户端在retry时间后进行重试,单位是毫秒。
data: 具体的数据。
pip install sse_starlette
服务器端
from fastapi import FastAPI, Request from sse_starlette.sse import EventSourceResponse import asyncio import uvicorn app = FastAPI() @app.get("/") async def home(): return {"message": "Hello World"} @app.get("/sse") async def sse(request: Request): async def ServerSendEvents(request: Request): books = ["Netkiller Linux 手札", "Netkiller MySQL 手札", "Netkiller Python 手札", "Netkiller Spring 手札", "Netkiller Java 手札", "Netkiller FreeBSD 手札", "Netkiller Network 手札", "Netkiller Blockchain 手札"] for book in books: if await request.is_disconnected(): print("连接已中断") break yield {"event": "message", "retry": 15000, "data": book} await asyncio.sleep(0.5) g = ServerSendEvents(request) return EventSourceResponse(g) if __name__ == "__main__": try: uvicorn.run(app=app, host="0.0.0.0", port=8080, log_level="info") except KeyboardInterrupt: print("Crtl+C Pressed. Shutting down.")
客户端
#!/usr/bin/python # -*-coding:utf-8-*- import requests def test(): url = r"http://127.0.0.1:8080/sse" headers = {"Content-Type": "text/event-stream"} response = requests.get(url, headers=headers, stream=True) for chunk in response.iter_content(chunk_size=1024, decode_unicode=True): print(chunk) if __name__ == "__main__": test()
直接打印返回结果
<models.model.Picture object at 0x1063e9910>
倒入依赖,然后使用 jsonable_encoder 编码
from fastapi.responses import JSONResponse from fastapi.encoders import jsonable_encoder
@router.get( "/{android_id}/open/{id}", summary="打开绘本", description="开始阅读绘本作品", ) async def open(id: int): session = database.session() book = {} try: from sqlalchemy.orm import load_only book["book"] = session.query(PictureBook).filter(PictureBook.id == id).first() book["story"] = ( session.query(Picture) .join(PictureBookHasPicture, Picture.id == PictureBookHasPicture.picture_id) .filter(PictureBookHasPicture.picture_book_id == id) .options(load_only(Picture.image, Picture.story)) .all() ) if book["book"]: response = Response(True, "打开绘本", jsonable_encoder(book)) else: response = Response(False, "打开绘本没有数据", None) except Exception as e: response = Response(False, e, None) logger.error(response) finally: session.close() logger.info(response) return JSONResponse(content=response)
import qrcode, logging, io @router.get("/{android_id}/qrcode") async def device_bind_qrcode(android_id: str): try: data = f"{android_id}" qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=20, border=2, ) qr.add_data(data) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") out = io.BytesIO() img.save(out, "PNG") logger.info(f"QR Code: {data}") return Response(content=out.getvalue(), media_type="image/png") except Exception as e: logger.error(e) return None