如果您熟悉 python,您很可能听说过 celery。它通常是异步处理任务的首选,例如图像处理或发送电子邮件。
与一些人交谈时,我开始注意到许多开发人员一开始都觉得 celery 令人印象深刻,但随着他们的项目规模和复杂性的增加,他们的兴奋开始消退。虽然有些人出于正当原因放弃了 celery,但其他人可能只是没有深入探索其核心,无法根据自己的需求进行定制。
在这篇博客中,我想讨论一些开发人员开始寻找替代方案甚至构建自定义后台工作框架的原因之一:公平处理。在用户/租户提交不同大小任务的环境中,一个租户的繁重工作量影响其他租户的风险可能会造成瓶颈并导致挫败感。
我将引导您了解在 celery 中实现公平处理的策略,确保平衡的任务分配,以便没有任何一个租户可以支配您的资源。
问题
让我们深入探讨多租户应用程序面临的常见挑战,特别是那些处理批处理的应用程序。想象一下,您有一个系统,用户可以将其图像处理任务排队,允许他们在短暂等待后收到处理后的图像。此设置不仅可以使您的 api 保持响应,还可以让您根据需要扩展工作线程以有效地处理负载。
一切都运行顺利 – 直到一个租户决定提交大量图像进行处理。您拥有多名工作人员,他们甚至可以自动扩展以满足不断增长的需求,因此您对您的基础设施充满信心。然而,当其他租户尝试对较小的批次(可能只是几张图像)进行排队并突然发现自己面临长时间的等待而没有任何更新时,麻烦就开始了。在您不知不觉中,支持请求开始涌入,用户抱怨您的服务速度缓慢甚至没有响应。
这种情况太常见了,因为 celery 默认情况下按照接收到的顺序处理任务。当一个租户因大量涌入的任务而让您的工作人员不堪重负时,即使是最好的自动扩展策略也可能不足以防止其他租户出现延误。因此,这些用户体验到的服务水平可能达不到承诺或预期的水平。
使用 celery 进行速率限制
确保公平处理的一个有效策略是实施速率限制。它允许您控制每个租户在特定时间范围内可以提交的任务数量。这可以防止任何单个租户垄断您的工人,并确保所有租户都有公平的机会来处理他们的任务。
celery 具有内置的任务级别速率限制功能:
# app.py from celery import celery app = celery("app", broker="redis://localhost:6379/0") @app.task(rate_limit="10/m") # limit to 10 tasks per minute def process_data(data): print(f"processing data: {data}") # call the task if __name__ == "__main__": for i in range(20): process_data.delay(f"data_{i}")
您可以通过执行以下命令来运行工作线程:
celery -a app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1
现在,运行app.py脚本来触发20个任务:
python app.py
如果您设法在本地运行它,您会注意到每个任务之间存在延迟,以确保执行速率限制。现在您可能认为这并不能真正帮助我们解决问题,您完全正确。 celery 的内置速率限制对于我们的任务可能涉及调用具有严格速率限制的外部服务的场景非常有用。
这个示例强调了内置功能对于复杂场景来说可能过于简单。然而,我们可以通过更深入地探索 celery 的框架来克服这个限制。让我们看看如何为每个租户设置适当的速率限制和自动重试。
我们将使用 redis 来跟踪每个租户的速率限制。 redis 是 celery 的流行数据库和代理,因此让我们利用这个可能已经在您的堆栈中的组件。
让我们导入几个库:
import time import redis from celery import celery, task
现在我们将为我们的速率限制任务实现一个自定义基任务类:
# initialize a redis client redis_client = redis.strictredis(host="localhost", port=6379, db=0) class ratelimitedtask(task): def __init__(self, *args, **kwargs): # set default rate limit if not hasattr(self, "custom_rate_limit"): self.custom_rate_limit = 10 super().__init__(*args, **kwargs) def __call__(self, tenant_id, *args, **kwargs): # rate limiting logic key = f"rate_limit:{tenant_id}:{self.name}" # increment the count for this minute current_count = redis_client.incr(key) if current_count == 1: # set expiration for the key if it's the first request redis_client.expire(key, 10) if current_count > self.custom_rate_limit: print(f"rate limit exceeded for tenant {tenant_id}. retrying...") raise self.retry(countdown=10) return super().__call__(tenant_id, *args, **kwargs)
这个自定义类将跟踪特定租户使用 redis 触发的任务量,并将 ttl 设置为 10 秒。如果超出速率限制,任务将在 10 秒后重试。所以基本上我们的默认速率限制是 10 秒内完成 10 个任务。
让我们定义一个模拟处理的示例任务:
@app.task(base=ratelimitedtask, custom_rate_limit=5) def process(tenant_id: int, data): """ mock processing task that takes 0.3 seconds to complete. """ print(f"processing data: {data} for tenant: {tenant_id}") time.sleep(0.3)
这里我们定义了一个流程任务,你可以看到我可以在任务级别更改custom_rate_limit。如果我们不指定 custom_rate_limit,则将分配默认值 10。 现在我们的速率限制已更改为 10 秒内完成 5 个任务。
现在让我们为不同的租户触发一些任务:
if __name__ == "__main__": for i in range(20): process.apply_async(args=(1, f"data_{i}")) for i in range(10): process.apply_async(args=(2, f"data_{i}"))
我们为租户 id 1 定义 20 个任务,为租户 id 2 定义 10 个任务。
所以我们完整的代码将如下所示:
# app.py import time import redis from celery import celery, task app = celery( "app", broker="redis://localhost:6379/0", broker_connection_retry_on_startup=false, ) # initialize a redis client redis_client = redis.strictredis(host="localhost", port=6379, db=0) class ratelimitedtask(task): def __init__(self, *args, **kwargs): if not hasattr(self, "custom_rate_limit"): self.custom_rate_limit = 10 super().__init__(*args, **kwargs) def __call__(self, tenant_id, *args, **kwargs): # rate limiting logic key = f"rate_limit:{tenant_id}:{self.name}" # increment the count for this minute current_count = redis_client.incr(key) if current_count == 1: # set expiration for the key if it's the first request redis_client.expire(key, 10) if current_count > self.custom_rate_limit: print(f"rate limit exceeded for tenant {tenant_id}. retrying...") raise self.retry(countdown=10) return super().__call__(tenant_id, *args, **kwargs) @app.task(base=ratelimitedtask, custom_rate_limit=5) def process(tenant_id: int, data): """ mock processing task that takes 0.3 seconds to complete. """ print(f"processing data: {data} for tenant: {tenant_id}") time.sleep(0.3) if __name__ == "__main__": for i in range(20): process.apply_async(args=(1, f"data_{i}")) for i in range(10): process.apply_async(args=(2, f"data_{i}"))
让我们运行我们的工作线程:
celery -a app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1
现在,运行 app.py 脚本来触发任务:
python app.py
如您所见,工作人员处理了第一个租户的 5 个任务,并为所有其他任务设置了重试。然后,它会执行第二个租户的 5 个任务,并为其他任务设置重试,然后继续进行。
这种方法允许您定义每个租户的速率限制,但正如您在我们的示例中看到的,对于运行速度非常快的任务,对速率限制过于严格最终会让工作人员在一段时间内无所事事。微调速率限制参数至关重要,并且取决于具体的任务和数量。不要犹豫,不断尝试,直到找到最佳平衡。
结论
我们探讨了 celery 的默认任务处理如何导致多租户环境中的不公平,以及速率限制如何帮助解决此问题。通过实施特定于租户的速率限制,我们可以防止任何单个租户垄断资源,并确保更公平地分配处理能力。
这种方法为在 celery 中实现公平处理提供了坚实的基础。然而,还有其他值得探索的技术来进一步优化多租户应用程序中的任务处理。虽然我最初计划在一篇文章中涵盖所有内容,但事实证明这个主题非常广泛!为了确保清晰度并保持本文的重点,我决定将其分为两部分。
在本系列的下一部分中,我们将深入研究任务优先级作为增强公平性和效率的另一种机制。这种方法允许您根据不同的标准为任务分配不同的优先级,确保即使在高需求时期也能及时处理关键任务。
敬请期待下期!