掘金 后端 ( ) • 2024-06-21 18:45

FastAPI 是一个现代的、快速(高性能)的 web 框架,用于基于标准 Python 类型提示的 Python 3.7+ 构建 API。它的生态系统和扩展性使得它非常适合构建各种规模的应用程序。本篇文章将详细介绍 FastAPI 的扩展与生态系统,帮助你更深入地理解和使用这个强大的工具。

1. FastAPI 的扩展概述

FastAPI 提供了丰富的扩展和工具,使得开发者能够轻松地集成第三方库和工具,满足不同的需求。以下是一些常见的扩展:

  1. 数据库集成
  2. 认证与授权
  3. 缓存
  4. 测试
  5. 分布式任务队列
  6. 文档与代码生成
  7. 性能监控与日志

2. 数据库集成

FastAPI 支持多种数据库集成,常用的有 SQLAlchemy、Tortoise-ORM 和 MongoDB。下面以 SQLAlchemy 和 MySQL 为例,详细讲解如何集成数据库。

2.1 安装依赖

首先,安装必要的库:

pip install sqlalchemy aiomysql databases

2.2 配置数据库

配置数据库连接和表定义:

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base

DATABASE_URL = "mysql+pymysql://root:123456@localhost/test_db"

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50))
    email = Column(String(50))


Base.metadata.create_all(bind=engine)

2.3 CRUD 操作

实现基本的 CRUD 操作:

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel

DATABASE_URL = "mysql+pymysql://root:123456@localhost/test_db"

# 创建数据库引擎
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


# 定义数据库模型
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), index=True)
    email = Column(String(100), unique=True, index=True)


# 创建数据库表
Base.metadata.create_all(bind=engine)


# 定义Pydantic模型
class UserCreate(BaseModel):
    name: str
    email: str


class UserResponse(BaseModel):
    id: int
    name: str
    email: str

    class Config:
        from_attributes = True


app = FastAPI()


# 数据库依赖项
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = User(name=user.name, email=user.email)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


@app.get("/users/{user_id}", response_model=UserResponse)
async def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

3. 认证与授权

FastAPI 提供了内置的 OAuth2 支持,可以轻松实现认证与授权。以下是一个使用 OAuth2 密码模式的示例:

3.1 安装依赖

安装认证与授权所需的依赖:

pip install python-jose[cryptography] passlib[bcrypt]

3.2 配置认证

配置认证和授权相关的代码:

from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel


# 定义模型
class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


# 定义密钥、算法和过期时间
SECRET_KEY = "your_secret_key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 创建密码哈希上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 创建 OAuth2PasswordBearer 实例
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 假数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "[email protected]",
        "hashed_password": pwd_context.hash("secret"),
        "disabled": False,
    }
}

app = FastAPI()


# 验证密码
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


# 获取密码哈希值
def get_password_hash(password):
    return pwd_context.hash(password)


# 获取用户
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
    return None


# 验证用户身份
def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


# 创建访问令牌
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


# 认证令牌并获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


# 获取当前活跃用户
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

13-基于token的身份认证.png

4. 缓存

首先,确保安装了 fastapi-cache2aioredis

pip install fastapi-cache2 aioredis

以下是一个使用 fastapi-cache2 实现内存缓存的完整示例:

from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi_cache.backends.inmemory import InMemoryBackend
from fastapi_cache.decorator import cache

app = FastAPI()


@app.on_event("startup")
async def startup():
    # Initialize the cache with an in-memory backend
    FastAPICache.init(InMemoryBackend(), prefix="fastapi-cache")


@app.get("/items/{item_id}")
@cache(expire=60)  # Cache the response for 60 seconds
async def read_item(item_id: int):
    return {"item_id": item_id, "message": "This is a cached response"}


@app.get("/clear-cache")
async def clear_cache():
    await FastAPICache.clear(namespace="fastapi-cache")
    return {"message": "Cache cleared"}

5. 测试

测试是确保应用程序稳定和高质量的重要环节。FastAPI 提供了对测试的良好支持。以下是一个简单的测试示例:

5.1 安装依赖

安装测试所需的依赖:

pip install httpx pytest

5.2 编写测试

from fastapi.testclient import TestClient
from .main import app

client = TestClient(app)

def test_read_main():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"message": "Hello World"}

def test_read_item():
    response = client.get("/items/foo")
    assert response.status_code == 200
    assert response.json() == {"item_id": "foo", "q": None}

6. 分布式任务队列

使用分布式任务队列处理耗时任务,可以显著提高系统性能。以下是一个使用 Celery 的示例:

6.1 安装依赖

pip install celery[redis] redis

6.2 配置 Celery

# 这行代码从 celery 库中导入 Celery 类。Celery 是一个分布式任务队列框架,这个类是创建和配置 Celery 应用的核心。
from celery import Celery

"""
这里创建了一个 Celery 应用实例,并将其赋值给 celery_app 变量。
"worker" 是该 Celery 应用的名称。这只是一个标识,可以随意指定。

broker 参数指定了消息代理(broker)的 URL。在这个例子中,使用的是 Redis 作为消息代理。
redis://localhost:6379/0 表示 Redis 服务器运行在本地(localhost),端口号为 6379,使用数据库 0。
消息代理负责接受和分发任务。

backend 参数指定了结果后端的 URL。在这个例子中,同样使用 Redis 作为结果后端。
结果后端用于存储任务的执行结果。
"""
celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

"""
这行代码配置了任务路由。任务路由决定了特定任务应该发送到哪个队列。
这是一个任务路由规则,指定了 app.worker.process_task 任务应该发送到名为 tasks 的队列中。
"app.worker.process_task" 是任务的名称(包括模块和任务函数的路径)。
{"queue": "tasks"} 指定了该任务应路由到 tasks 队列。
"""
celery_app.conf.task_routes = {
    "app.worker.process_task": {"queue": "tasks"},
}

"""
celery_app.task 是一个装饰器,用于将函数注册为 Celery 任务。
name="app.worker.process_task" 指定了任务的名称。这个名称用于标识和调用任务。
定义任务函数 process_task。该函数将两个参数 x 和 y 相加,并返回结果。
"""


@celery_app.task(name="app.worker.process_task")
def process_task(x, y):
    return x + y

执行这行命令用于启动 Celery worker 进程,它从指定的 Celery 应用程序实例中获取任务并处理它们。

celery -A app.worker.celery_app worker --loglevel=info

6.3 调用任务

from fastapi import FastAPI, BackgroundTasks
from celery.result import AsyncResult
from worker import celery_app

app = FastAPI()


@app.post("/process/")
async def run_task(background_tasks: BackgroundTasks):
    task = celery_app.send_task("app.worker.process_task", args=[10, 20])
    return {"task_id": task.id}


@app.get("/status/{task_id}")
async def get_status(task_id: str):
    task_result = AsyncResult(task_id, app=celery_app)
    return {"task_id": task_id, "status": task_result.status, "result": task_result.result}

注意:Windows平台上没调明白,以后再试试

7. 性能监控与日志

性能监控和日志记录是确保应用程序稳定性和可维护性的关键。可以使用 Prometheus 和 Grafana 进行性能监控,使用 Loguru 进行日志记录。

7.1 安装依赖

安装性能监控和日志记录所需的依赖:

pip install loguru

7.2 配置日志

配置日志记录:

from loguru import logger
from fastapi import FastAPI

app = FastAPI()

logger.add("file.log", rotation="500 MB")

@app.get("/")
async def read_root():
    logger.info("Root endpoint accessed")
    return {"message": "Hello World"}

总结

FastAPI 是一个功能强大且灵活的框架,适用于构建各种规模的应用程序。通过本文的介绍,你可以了解到 FastAPI 的各种高级特性及其扩展和生态系统的详细用法和最佳实践。希望这些内容能够帮助你更好地理解和使用 FastAPI。如果你有任何问题或需要进一步的帮助,请随时与我联系。