Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | 51CTO学院 | CSDN程序员研修院 | OSChina 博客 | 腾讯云社区 | 阿里云栖社区 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏多维度架构

5.4. FastAPI

5.4.1. Post Request

5.4.1.1. From 数据

			
pip install python-multipart
			
			
            	
from fastapi import FastAPI, Form
# from starlette.requests import Request
from starlette.responses import Response
from starlette.testclient import TestClient

app = FastAPI()

@app.post("/form")
async def login(username: str = Form(), password: str = Form()):
    return {"username": username, "password": password}

client = TestClient(app)
data = {"username": "netkiller", "password": "123456"}
response = client.post("/form", data=data)
print(response.content.decode())
            	
			

5.4.1.2. Json 数据转为 dict

             
from fastapi import FastAPI, Request
from typing import Dict

# from starlette.requests import Request
from starlette.responses import Response
from starlette.testclient import TestClient

app = FastAPI()


@app.post("/json")
async def json(item: dict):
    print(item)
    return "OK"


client = TestClient(app)
data = {"key": "value"}
response = client.post("/json", json=data)
print(response.content.decode())      
            
			

5.4.1.3. Data 原始数据

		    
from fastapi import FastAPI, Request

# from starlette.requests import Request
from starlette.responses import Response
from starlette.testclient import TestClient

app = FastAPI()


@app.post("/webhook")
async def the_webhook(request: Request):
    return await request.body()


data = b"""EURUSD Less Than 1.09092
{"Condition": "value"}
[3,4,5,]
{}"""

data = b"""EURUSD Less Than 1.09092"""

client = TestClient(app)
response = client.post("/webhook", data=data)
print(response.content.decode())
            
			

5.4.1.4. POST 接收 JSON 数据

			
@app.post("/android/notification", summary="通知", description=f"通知接口", tags=["android"])
async def notification(request: Request):
    jsonText = (await request.body()).decode()
    print(jsonText)
    print(await request.json())
    return {"status": true, "data": {}, "msg": "成功"}			
			
			

5.4.2. api_route

		
@app.api_route("/", methods=["GET", "POST"])
async def handler():
    return {}		
		
		

5.4.3. slowapi 流向控制

		
pip install slowapi		
		
		

		
from fastapi import FastAPI
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address


limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/home")
@limiter.limit("5/minute")
async def homepage(request: Request):
    return PlainTextResponse("test")

@app.get("/mars")
@limiter.limit("5/minute")
async def homepage(request: Request, response: Response):
    return {"key": "value"}		
		
		

5.4.4. 异步执行

		
from fastapi import APIRouter
import time
import asyncio
 
router = APIRouter()

 
@router.get("/a")
async def a():
    time.sleep(1)
    return {"message": "异步模式,但是同步执行sleep函数,执行过程是串行的"}
 
 
@router.get("/b")
async def b():
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, time.sleep, 1)
    return {"message": "异步线程池中运行sleep函数"}
 
 
@router.get("/c")
async def c():
    await asyncio.sleep(1)
    return {"message": "异步模式,且异步执行sleep函数"}
 
 
@router.get("/d")
def d():
    time.sleep(1)
    return {"message": "同步模式"}
		
		

5.4.5. 缓存

		
pip install fastapi-cache2
		
		
		
from fastapi import FastAPI
from starlette.requests import Request
from starlette.responses import Response

from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache

from redis import asyncio as aioredis

app = FastAPI()


@cache()
async def get_cache():
    return 1


@app.get("/")
@cache(expire=60)
async def index():
    return dict(hello="world")


@app.on_event("startup")
async def startup():
    redis = aioredis.from_url("redis://localhost", encoding="utf8", decode_responses=True)
    FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")		
		
		

5.4.5.1. 缓存 Json 数据结构

			
@app.get("/")
@cache(expire=60, coder=JsonCoder)
async def index():
    return dict(hello="world")			
			
			

5.4.5.2. 自定义 key

			
def my_key_builder(
        func,
        namespace: Optional[str] = "",
        request: Request = None,
        response: Response = None,
        *args,
        **kwargs,
):
    prefix = FastAPICache.get_prefix()
    cache_key = f"{prefix}:{namespace}:{func.__module__}:{func.__name__}:{args}:{kwargs}"
    return cache_key


@app.get("/")
@cache(expire=60, coder=JsonCoder, key_builder=my_key_builder)
async def index():
    return dict(hello="world")			
			
			

5.4.6. HTTP Auth

		
security = HTTPBasic()

def auth(credentials: Annotated[HTTPBasicCredentials, Depends(security)]):
    current_username_bytes = credentials.username.encode("utf8")
    correct_username_bytes = b"admin"
    is_correct_username = compare_digest(current_username_bytes, correct_username_bytes)
    current_password_bytes = credentials.password.encode("utf8")
    correct_password_bytes = b"admin"
    is_correct_password = compare_digest(current_password_bytes, correct_password_bytes)
    if not (is_correct_username and is_correct_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Basic"},
        )
    return credentials.username

		
		

应用与方法

		
@app.get("/who")
@cache(expire=60)
def read_current_user(username: Annotated[str, Depends(auth)]):
    return {"username": username}		
		
		

全局配置

		
app = FastAPI(title="netkiller", description="HTTP Auth 测试", dependencies=[Depends(auth)])			
		
		

5.4.7. SSE

SSE 协议格式



协议字段之间使用\r\n 分割,数据结尾处使用两个\r\n。

event: message\r\ndata: \xe4\xb8\x83\r\nretry: 15000\r\n\r\n
event: message\r\ndata: \xe5\xa4\x95\r\nretry: 15000\r\n\r\n

event: 表示事件,message和error,对应前端会分别触发onmessage或onerror事件。
retry: 重试时间,让客户端在retry时间后进行重试,单位是毫秒。
data: 具体的数据。

		
pip install sse_starlette		
		
		

服务器端

		
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
import uvicorn

app = FastAPI()


@app.get("/")
async def home():
    return {"message": "Hello World"}


@app.get("/sse")
async def sse(request: Request):
    async def ServerSendEvents(request: Request):
        books = ["Netkiller Linux 手札", "Netkiller MySQL 手札", "Netkiller Python 手札", "Netkiller Spring 手札", "Netkiller Java 手札", "Netkiller FreeBSD 手札", "Netkiller Network 手札", "Netkiller Blockchain 手札"]
        for book in books:
            if await request.is_disconnected():
                print("连接已中断")
                break
            yield {"event": "message", "retry": 15000, "data": book}

            await asyncio.sleep(0.5)

    g = ServerSendEvents(request)
    return EventSourceResponse(g)


if __name__ == "__main__":
    try:
        uvicorn.run(app=app, host="0.0.0.0", port=8080, log_level="info")
    except KeyboardInterrupt:
        print("Crtl+C Pressed. Shutting down.")		
		
		

客户端

		
#!/usr/bin/python
# -*-coding:utf-8-*-
import requests


def test():
    url = r"http://127.0.0.1:8080/sse"
    headers = {"Content-Type": "text/event-stream"}
    response = requests.get(url, headers=headers, stream=True)
    for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
        print(chunk)


if __name__ == "__main__":
    test()		
		
		

5.4.8. 解决 Sqlalchemy 返回模型无法打印的问题

直接打印返回结果

		
<models.model.Picture object at 0x1063e9910>		
		
		

倒入依赖,然后使用 jsonable_encoder 编码

		
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder		
		
		
		
@router.get(
    "/{android_id}/open/{id}",
    summary="打开绘本",
    description="开始阅读绘本作品",
)
async def open(id: int):
    session = database.session()
    book = {}
    try:
        from sqlalchemy.orm import load_only

        book["book"] = session.query(PictureBook).filter(PictureBook.id == id).first()
        book["story"] = (
            session.query(Picture)
            .join(PictureBookHasPicture, Picture.id == PictureBookHasPicture.picture_id)
            .filter(PictureBookHasPicture.picture_book_id == id)
            .options(load_only(Picture.image, Picture.story))
            .all()
        )
        if book["book"]:
            response = Response(True, "打开绘本", jsonable_encoder(book))
        else:
            response = Response(False, "打开绘本没有数据", None)
    except Exception as e:
        response = Response(False, e, None)
        logger.error(response)
    finally:
        session.close()
    logger.info(response)
    return JSONResponse(content=response)		
		
		

5.4.9. 返回图片

		
import qrcode, logging, io
		
@router.get("/{android_id}/qrcode")
async def device_bind_qrcode(android_id: str):
    try:
        data = f"{android_id}"
        qr = qrcode.QRCode(
            version=1,
            error_correction=qrcode.constants.ERROR_CORRECT_L,
            box_size=20,
            border=2,
        )
        qr.add_data(data)
        qr.make(fit=True)
        img = qr.make_image(fill_color="black", back_color="white")
        out = io.BytesIO()
        img.save(out, "PNG")
        logger.info(f"QR Code: {data}")
        return Response(content=out.getvalue(), media_type="image/png")
    except Exception as e:
        logger.error(e)
        return None		
		
		

5.4.10. Fief 认证框架

https://docs.fief.dev/integrate/python/fastapi/