在快节奏的 web 开发世界中,性能至关重要。高效的缓存机制可以通过减少冗余计算和数据库查询来显着增强 api 的响应能力。在本文中,我们将探讨如何使用 sqlmodel 和 redis 将 py-cachify 库集成到 fastapi 应用程序中,以实现缓存和并发控制。
目录:
介绍
缓存是一种强大的技术,通过存储昂贵操作的结果并从快速访问存储中提供它们来提高 web 应用程序的性能。借助 py-cachify,我们可以无缝地将缓存添加到 fastapi 应用程序中,并利用 redis 进行存储。此外,py-cachify 提供并发控制工具,防止关键操作期间出现竞争情况。
在本教程中,我们将逐步在 fastapi 应用程序中设置 py-cachify 库,并使用用于 orm 的 sqlmodel 和用于缓存的 redis。
项目设置
让我们从设置项目环境开始。
先决条件
- python 3.12
- 诗歌(您可以使用任何您喜欢的包管理器)
- 本地运行或远程访问的redis服务器
安装依赖项
通过诗歌开始一个新项目:
# create new project poetry new --name app py-cachify-fastapi-demo # enter the directory cd py-cachify-fastapi-demo # point poetry to use python3.12 poetry env use python3.12 # add dependencies poetry add "fastapi[standard]" sqlmodel aiosqlite redis py-cachify
- fastapi:用于构建 api 的 web 框架。
- sqlmodel aiosqlite:结合 sqlalchemy 和 pydantic 进行 orm 和数据验证。
- redis:用于与 redis 交互的 python 客户端。
- py-cachify:缓存和锁定实用程序。
初始化 py-cachify
在使用 py-cachify 之前,我们需要使用 redis 客户端对其进行初始化。我们将使用 fastapi 的寿命参数来完成此操作。
# app/main.py from contextlib import asynccontextmanager from fastapi import fastapi from py_cachify import init_cachify from redis.asyncio import from_url @asynccontextmanager async def lifespan(_: fastapi): init_cachify( # replace with your redis url if it differs async_client=from_url('redis://localhost:6379/0'), ) yield app = fastapi(lifespan=lifespan)
在生命周期内,我们:
- 创建异步 redis 客户端。
- 使用此客户端初始化 py-cachify。
使用 sqlmodel 创建数据库模型
我们将创建一个简单的用户模型来与我们的数据库交互。
# app/db.py from sqlmodel import field, sqlmodel class user(sqlmodel, table=true): id: int | none = field(default=none, primary_key=true) name: str email: str
设置数据库引擎并在生命周期函数中创建表:
# app/db.py # adjust imports from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine # add the following at the end of the file sqlite_file_name = 'database.db' sqlite_url = f'sqlite+aiosqlite:///{sqlite_file_name}' engine = create_async_engine(sqlite_url, echo=true) session_maker = async_sessionmaker(engine) # app/main.py # adjust imports and lifespan function from sqlmodel import sqlmodel from .db import engine @asynccontextmanager async def lifespan(_: fastapi): init_cachify( async_client=from_url('redis://localhost:6379/0'), ) # create sql model tables async with engine.begin() as conn: await conn.run_sync(sqlmodel.metadata.create_all) yield
注意:为了简单起见,我们使用 sqlite,但您可以使用 sqlalchemy 支持的任何数据库。
构建 fastapi 端点
让我们创建端点来与我们的用户模型交互。
# app/main.py # adjust imports from fastapi import depends, fastapi from sqlalchemy.ext.asyncio import asyncsession from .db import user, engine, session_maker # database session dependency async def get_session(): async with session_maker() as session: yield session app = fastapi(lifespan=lifespan) @app.post('/users/') async def create_user(user: user, session: asyncsession = depends(get_session)) -> user: session.add(user) await session.commit() await session.refresh(user) return user @app.get('/users/{user_id}') async def read_user(user_id: int, session: asyncsession = depends(get_session)) -> user | none: return await session.get(user, user_id) @app.put('/users/{user_id}') async def update_user(user_id: int, new_user: user, session: asyncsession = depends(get_session)) -> user | none: user = await session.get(user, user_id) if not user: return none user.name = new_user.name user.email = new_user.email session.add(user) await session.commit() await session.refresh(user) return user
缓存端点结果
现在,让我们缓存 read_user 端点的结果,以避免不必要的数据库查询。
端点代码将如下所示:
# app/main.py # add the import from py_cachify import cached @app.get('/users/{user_id}') @cached('read_user-{user_id}', ttl=300) # new decorator async def read_user(user_id: int, session: asyncsession = depends(get_session)) -> user | none: return await session.get(user, user_id)
使用@cached装饰器:
- 我们使用 user_id 指定一个唯一的键。
- 将 ttl(生存时间)设置为 5 分钟(300 秒)。
- 5 分钟内使用相同 user_id 调用此端点将返回缓存的结果。
更新时重置缓存
当用户的数据更新时,我们需要重置缓存以确保客户端收到最新的信息。为了实现这一点,让我们修改 update_user 端点。
# app/main.py @app.put('/users/{user_id}') async def update_user(user_id: int, new_user: user, session: asyncsession = depends(get_session)) -> user | none: user = await session.get(user, user_id) if not user: return none user.name = new_user.name user.email = new_user.email session.add(user) await session.commit() await session.refresh(user) # reset cache for this user await read_user.reset(user_id=user_id) return user
通过调用 read_user.reset(user_id=user_id),我们:
- 清除特定user_id的缓存数据。
- 确保后续 get 请求从数据库获取新数据。
在下面,缓存的装饰器动态包装您的函数,添加 .reset 方法。此方法模仿函数的签名和类型,这样根据原始函数,它将是同步或异步的,并且将接受相同的参数。
.reset 方法使用缓存装饰器中定义的相同密钥生成逻辑来识别要使哪个缓存条目无效。例如,如果您的缓存键模式是 user-{user_id},则调用await read_user.reset(user_id=123) 将专门定位并删除 user_id=123 的缓存条目。
锁定更新端点的执行
为了防止更新期间的竞争条件,我们将使用一次装饰器来锁定更新端点的执行。
# app/main.py from py_cachify import once from fastapi.responses import jsonresponse @app.put('/users/{user_id}', response_model=user) # add the locking decorator @once('update-user-{user_id}', return_on_locked=jsonresponse(content={'status': 'update in progress'}, status_code=226)) async def update_user(user_id: int, new_user: user, session: asyncsession = depends(get_session)) -> user | none: user = await session.get(user, user_id) user.name = new_user.name user.email = new_user.email session.add(user) await session.commit() await session.refresh(user) # reset cache for this user await read_user.reset(user_id=user_id) return user
曾经:
- 我们根据user_id锁定功能。
- 如果另一个请求尝试同时更新同一用户,它将立即返回带有 226 im 已使用 状态代码的响应。
- 这可以防止同时更新导致数据不一致。
(可选)您可以配置 @once 以引发异常或在已获取锁的情况下返回特定值。
运行应用程序
现在是时候运行和测试我们的应用程序了!
1) 启动 redis 服务器:
确保您的 redis 服务器在本地运行或可远程访问。您可以使用 docker 启动本地 redis 服务器:
docker run --name redis -p 6379:6379 -d redis
2) 运行 fastapi 应用程序:
一切设置完毕后,您可以使用 poetry 启动 fastapi 应用程序。导航到项目的根目录并执行以下命令:
poetry run fastapi dev app/main.py
3) 测试和使用缓存和锁定:
缓存: 在 read_user 函数中添加延迟(例如,使用 asyncio.sleep)以模拟长时间运行的计算。观察结果缓存后响应时间如何显着改善。
示例:
import asyncio async def read_user(user_id: int, session: asyncsession = depends(get_session)) -> user | none: await asyncio.sleep(2) # simulate expensive computation or database call return await session.get(user, user_id)
并发和锁定:同样,在 update_user 函数中引入延迟,以观察并发更新尝试时锁的行为。
示例:
async def update_user(user_id: int, new_user: User, session: AsyncSession = Depends(get_session)) -> User | None: await asyncio.sleep(2) # Simulate update delay # update logic here…
这些延迟可以帮助您了解缓存和锁定机制的有效性,因为由于缓存,后续读取应该更快,并且应该通过锁定有效管理对同一资源的并发写入。
现在,您可以使用 postman 等工具或转到 http://127.0.0.1:8000/docs(当应用程序运行时!)来测试端点,并观察性能改进和并发控制的实际情况。
享受使用增强型 fastapi 应用程序进行实验的乐趣!
结论
通过将 py-cachify 集成到我们的 fastapi 应用程序中,我们释放了众多优势,增强了 api 的性能和可靠性。
让我们回顾一下一些关键优势:
- 增强的性能:缓存重复的函数调用可以减少冗余计算和数据库命中,从而大大缩短响应时间。
- 并发控制:通过内置锁定机制,py-cachify 可以防止竞争条件并确保数据一致性 – 对于高并发访问的应用程序至关重要。
- 灵活性:无论您使用同步还是异步操作,py-cachify 都能无缝适应,使其成为现代 web 应用程序的多功能选择。
- 易于使用:该库与 fastapi 等流行的 python 框架顺利集成,让您可以轻松上手。
- 完整类型注释: py-cachify 具有完全类型注释,有助于以最小的努力编写更好、更易于维护的代码。
- 最小设置: 如本教程所示,添加 py-cachify 只需要在现有设置之上添加几行即可充分利用其功能。
对于那些渴望进一步探索的人,请查看py-cachify 的 github 存储库和官方文档以获取更深入的指导、教程和示例。
您可以在 github 此处访问本教程的完整代码。请随意克隆存储库并尝试实现以满足您项目的需求。
如果您发现 py-cachify 有益,请考虑在 github 上给它一颗星来支持该项目!您的支持有助于推动进一步的改进和新功能。
编码愉快!