python 的并发编程能力已经显着发展,为开发人员提供了编写高效、并行代码的强大工具。我花了相当多的时间探索这些先进技术,很高兴与您分享我的见解。
使用 asyncio 进行异步编程是 i/o 密集型任务的游戏规则改变者。它允许我们编写非阻塞代码,可以同时处理多个操作,而无需线程开销。下面是一个简单的示例,说明如何使用 asyncio 同时从多个 url 获取数据:
import asyncio import aiohttp async def fetch_url(session, url): async with session.get(url) as response: return await response.text() async def main(): urls = ['http://example.com', 'http://example.org', 'http://example.net'] async with aiohttp.clientsession() as session: tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks) for url, result in zip(urls, results): print(f"content length of {url}: {len(result)}") asyncio.run(main())
这段代码演示了我们如何创建多个协程来同时从不同的 url 获取数据。 asyncio.gather() 函数允许我们等待所有协程完成并收集它们的结果。
虽然 asyncio 非常适合 i/o 密集型任务,但它不适合 cpu 密集型操作。为此,我们转向concurrent.futures模块,它提供了threadpoolexecutor和processpoolexecutor。 threadpoolexecutor 非常适合不释放 gil 的 i/o 密集型任务,而 processpoolexecutor 非常适合 cpu 密集型任务。
下面是使用 threadpoolexecutor 并发下载多个文件的示例:
import concurrent.futures import requests def download_file(url): response = requests.get(url) filename = url.split('/')[-1] with open(filename, 'wb') as f: f.write(response.content) return f"downloaded {filename}" urls = [ 'https://example.com/file1.pdf', 'https://example.com/file2.pdf', 'https://example.com/file3.pdf' ] with concurrent.futures.threadpoolexecutor(max_workers=3) as executor: future_to_url = {executor.submit(download_file, url): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except exception as exc: print(f"{url} generated an exception: {exc}") else: print(data)
此代码创建一个包含三个工作线程的线程池,并为每个 url 提交一个下载任务。 as_completed() 函数允许我们在结果可用时对其进行处理,而不是等待所有任务完成。
立即学习“Python免费学习笔记(深入)”;
对于 cpu 密集型任务,我们可以使用 processpoolexecutor 来利用多个 cpu 核心。这是并行计算素数的示例:
import concurrent.futures import math def is_prime(n): if n < 2: return false for i in range(2, int(math.sqrt(n)) + 1): if n % i == 0: return false return true def find_primes(start, end): return [n for n in range(start, end) if is_prime(n)] ranges = [(1, 25000), (25001, 50000), (50001, 75000), (75001, 100000)] with concurrent.futures.processpoolexecutor() as executor: results = executor.map(lambda r: find_primes(*r), ranges) all_primes = [prime for sublist in results for prime in sublist] print(f"found {len(all_primes)} prime numbers")
此代码将查找素数的任务分为四个范围,并使用单独的 python 进程并行处理它们。 map() 函数将 find_primes() 函数应用于每个范围并收集结果。
当使用多个进程时,我们经常需要在它们之间共享数据。多处理模块为此提供了多种选项,包括共享内存和队列。这是使用共享内存数组的示例:
from multiprocessing import process, array import numpy as np def worker(shared_array, start, end): for i in range(start, end): shared_array[i] = i * i if __name__ == '__main__': size = 10000000 shared_array = array('d', size) # create 4 processes processes = [] chunk_size = size // 4 for i in range(4): start = i * chunk_size end = start + chunk_size if i < 3 else size p = process(target=worker, args=(shared_array, start, end)) processes.append(p) p.start() # wait for all processes to finish for p in processes: p.join() # convert shared array to numpy array for easy manipulation np_array = np.frombuffer(shared_array.get_obj()) print(f"sum of squares: {np_array.sum()}")
此代码创建一个共享内存数组,并使用四个进程并行计算数字的平方。共享数组允许所有进程写入相同的内存空间,避免了进程间通信的需要。
虽然这些技术很强大,但它们也面临着一系列挑战。竞争条件、死锁和过多的上下文切换都会影响性能和正确性。仔细设计并发代码并在必要时使用适当的同步原语至关重要。
例如,当多个线程或进程需要访问共享资源时,我们可以使用lock来保证线程安全:
from threading import lock, thread class counter: def __init__(self): self.count = 0 self.lock = lock() def increment(self): with self.lock: self.count += 1 def worker(counter, num_increments): for _ in range(num_increments): counter.increment() counter = counter() threads = [] for _ in range(10): t = thread(target=worker, args=(counter, 100000)) threads.append(t) t.start() for t in threads: t.join() print(f"final count: {counter.count}")
此代码演示了当多个线程同时递增共享计数器时,如何使用锁来保护共享计数器免受竞争条件的影响。
另一种先进技术是使用信号量来控制对有限资源的访问。下面是限制并发网络连接数的示例:
import asyncio import aiohttp from asyncio import semaphore async def fetch_url(url, semaphore): async with semaphore: async with aiohttp.clientsession() as session: async with session.get(url) as response: return await response.text() async def main(): urls = [f'http://example.com/{i}' for i in range(100)] semaphore = semaphore(10) # limit to 10 concurrent connections tasks = [fetch_url(url, semaphore) for url in urls] results = await asyncio.gather(*tasks) print(f"fetched {len(results)} urls") asyncio.run(main())
此代码使用信号量将并发网络连接数限制为 10,防止网络或服务器不堪重负。
使用并发代码时,正确处理异常也很重要。 asyncio 模块为 asyncio.gather() 函数提供了一个 return_exceptions 参数,该参数对此很有用:
import asyncio async def risky_operation(i): if i % 2 == 0: raise valueerror(f"even number not allowed: {i}") await asyncio.sleep(1) return i async def main(): tasks = [risky_operation(i) for i in range(10)] results = await asyncio.gather(*tasks, return_exceptions=true) for result in results: if isinstance(result, exception): print(f"got an exception: {result}") else: print(f"got a result: {result}") asyncio.run(main())
此代码演示了如何在不停止其他任务执行的情况下处理并发任务中的异常。
随着我们深入研究并发编程,我们会遇到更高级的概念,例如事件循环和协程链。这是一个演示如何链接协程的示例:
import asyncio async def fetch_data(url): print(f"fetching data from {url}") await asyncio.sleep(2) # simulate network delay return f"data from {url}" async def process_data(data): print(f"processing {data}") await asyncio.sleep(1) # simulate processing time return f"processed {data}" async def save_result(result): print(f"saving {result}") await asyncio.sleep(0.5) # simulate saving delay return f"saved {result}" async def fetch_process_save(url): data = await fetch_data(url) processed = await process_data(data) return await save_result(processed) async def main(): urls = ['http://example.com', 'http://example.org', 'http://example.net'] tasks = [fetch_process_save(url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())
此代码链接了三个协程(fetch_data、process_data 和 save_result),为每个 url 创建一个管道。然后 asyncio.gather() 函数同时运行这些管道。
在处理长时间运行的任务时,通常需要实现取消和超时机制。这是一个演示两者的示例:
import asyncio async def long_running_task(n): print(f"Starting long task {n}") try: await asyncio.sleep(10) print(f"Task {n} completed") return n except asyncio.CancelledError: print(f"Task {n} was cancelled") raise async def main(): tasks = [long_running_task(i) for i in range(5)] try: results = await asyncio.wait_for(asyncio.gather(*tasks), timeout=5) except asyncio.TimeoutError: print("Operation timed out, cancelling remaining tasks") for task in tasks: task.cancel() # Wait for all tasks to finish (they'll raise CancelledError) await asyncio.gather(*tasks, return_exceptions=True) else: print(f"All tasks completed successfully: {results}") asyncio.run(main())
此代码启动五个长时间运行的任务,但设置所有任务的超时时间为 5 秒才能完成。如果达到超时,则会取消所有剩余任务。
总之,python 的并发编程功能为编写高效的并行代码提供了广泛的工具和技术。从使用 asyncio 的异步编程到 cpu 密集型任务的多处理,这些先进技术可以显着提高应用程序的性能。然而,了解基本概念、为每项任务选择正确的工具以及仔细管理共享资源和潜在的竞争条件至关重要。通过实践和精心设计,我们可以利用 python 中并发编程的全部功能来构建快速、可扩展且响应迅速的应用程序。
我们的创作
一定要看看我们的创作:
投资者中心 | 智能生活 | 时代与回声 | 令人费解的谜团 | 印度教 | 精英开发 | JS学校
我们在媒体上
科技考拉洞察 | 时代与回响世界 | 投资者中央媒体 | 令人费解的谜团 | 科学与时代媒介 | 现代印度教