我的 api 遇到问题,希望有人可以提供帮助。尽管添加了多线程,但性能提升远没有达到我的预期。理想情况下,如果一个线程需要 1 秒来完成一项任务,那么并发运行的 10 个线程也应该需要大约 1 秒(这是我的理解)。然而,我的 api 响应时间仍然很慢。
问题
我正在使用 fastapi 以及 playwright、mongodb 和 threadpoolexecutor 等库。目标是对 cpu 密集型任务使用线程,对 io 密集型任务使用异步等待。尽管如此,我的响应时间并没有像预期的那样改善。
书籍自动化示例
我的项目的一部分涉及使用 playwright 与 epub 查看器交互来自动进行图书查询。以下函数使用 playwright 打开浏览器、导航到书籍页面并执行搜索:
from playwright.async_api import async_playwright import asyncio async def search_with_playwright(search_text: str, book_id: str): async with async_playwright() as p: browser = await p.chromium.launch(headless=true) page = await browser.new_page() book_id = book_id.replace("-1", "") book_url = f"http://localhost:8002/book/{book_id}" await page.goto(book_url) await page.fill("#searchinput", search_text) await page.click("#searchbutton") await page.wait_for_selector("#searchresults") search_results = await page.evaluate(''' () => { let results = []; document.queryselectorall("#searchresults ul li").foreach(item => { let excerptelement = item.queryselector("strong:nth-of-type(1)"); let cfielement = item.queryselector("strong:nth-of-type(2)"); if (excerptelement && cfielement) { let excerpt = excerptelement.nextsibling ? excerptelement.nextsibling.nodevalue.trim() : ""; let cfi = cfielement.nextsibling ? cfielement.nextsibling.nodevalue.trim() : ""; results.push({ excerpt, cfi }); } }); return results; } ''') await browser.close() return search_results
上面的函数是异步的,以避免阻塞其他任务。然而,即使采用这种异步设置,性能仍然达不到预期。
注意:我计算过单本书打开书籍和运行查询所需的时间约为 0.0028s
重构示例
我使用 run_in_executor() 来执行 processpoolexecutor 中的函数,试图避免 gil 并正确管理工作负载。
async def query_mongo(query: str, id: str): query_vector = generate_embedding(query) results = db[id].aggregate([ { "$vectorSearch": { "queryVector": query_vector, "path": "embedding", "numCandidates": 2100, "limit": 50, "index": id } } ]) # Helper function for processing each document def process_document(document): try: chunk = document["chunk"] chapter = document["chapter"] number = document["chapter_number"] book_id = id results = asyncio.run(search_with_playwright(chunk, book_id)) return { "content": chunk, "chapter": chapter, "number": number, "results": results, } except Exception as e: print(f"Error processing document: {e}") return None # Using ThreadPoolExecutor for concurrency all_data = [] with ThreadPoolExecutor() as executor: futures = {executor.submit(process_document, doc): doc for doc in results} for future in as_completed(futures): try: result = future.result() if result: # Append result if it's not None all_data.append(result) except Exception as e: print(f"Error in future processing: {e}") return all_data
问题
即使在这些更改之后,我的 api 仍然很慢。我缺少什么?有没有人在 python 的 gil、线程或异步设置方面遇到过类似的问题?任何建议将不胜感激!