Python中的协程与事件循环机制
协程(Coroutine)是一种比线程更轻量级的并发编程方式,它允许在单线程内实现并发操作。协程的核心思想是在执行过程中可以暂停,保存当前的执行状态,然后在适当的时候恢复执行。 协程是一种可以在执行过程中暂停并在稍后恢复的函数。与线程不同,协程的切换是由程序自身控制的,而不是由操作系统调度的。这种方式使得协程的切换开销非常小,适合处理大量的I/O密集型任务。 协程的工作原理基于以下几个关键概念: 使用协程的优势: Python中的协程实现经历了几个阶段的发展: 生成器协程是基于Python的生成器实现的协程,使用 原生协程是Python 3.5+引入的协程实现,使用 在Python 3.4及之前的版本中,需要使用 事件循环是协程执行的核心,它负责调度和执行协程任务,处理I/O操作等。Python的 事件循环是一个无限循环,它不断地从任务队列中取出任务并执行,直到所有任务都完成。事件循环的主要职责包括: 事件循环的工作流程如下: Python的 可以使用 可以使用 可以使用 try-except 语句来捕获和处理协程中的异常: 协程可以嵌套调用,形成协程链: 协程非常适合网络编程,特别是处理大量的并发连接: 协程可以高效地处理大量的并发任务,如数据处理、文件操作等: 协程可以用于异步文件操作,提高I/O密集型任务的性能: 协程可以用于异步数据库操作,提高数据库访问的并发性能: 让我们测试协程与其他并发模型的性能比较: 在使用协程时,可以采取以下策略来优化性能: 我们将使用Python的 异步Web服务器适用于以下场景: 本文详细分析了Python中的协程与事件循环机制,包括: 协程是Python中一种强大的并发编程方式,它通过在单线程内实现并发操作,大大提高了I/O密集型任务的处理效率。与线程和进程相比,协程的上下文切换开销非常小,适合处理大量的并发任务。 在Python 3.5+中,使用 通过本文的学习,读者应该能够: 协程是Python中一种非常有前途的并发编程方式,它为我们提供了一种高效、简洁的方式来处理并发任务。在未来的Python开发中,协程将会被越来越广泛地应用,特别是在网络编程、数据处理等I/O密集型场景中。 协程与事件循环机制是Python中实现高效并发编程的重要工具,它们为我们提供了一种轻量级、高性能的并发处理方式。通过使用协程,我们可以在单线程内实现并发操作,大大提高了I/O密集型任务的处理效率。 本文介绍了协程的基本概念、实现方式、高级特性和应用场景,并通过具体的代码示例和实践案例,展示了协程在实际项目中的应用。希望本文能够帮助读者理解协程的工作原理,掌握协程的使用方法,并在实际项目中有效地应用协程来提高程序的性能和并发能力。 在Python的未来发展中,协程将会扮演越来越重要的角色,特别是随着异步I/O库的不断完善和普及。通过学习和掌握协程,我们可以编写更加高效、简洁的Python代码,应对日益复杂的并发编程需求。Python中的协程与事件循环机制
1. 协程的概念与基本原理
1.1 协程的定义
1.2 协程与其他并发模型的比较
并发模型 优点 缺点 线程 由操作系统调度,使用简单 上下文切换开销大,可能导致竞态条件 进程 完全隔离,安全性高 内存占用大,进程间通信复杂 协程 上下文切换开销小,并发度高 需要显式 yield 控制权,编程复杂度较高 1.3 协程的工作原理
# 协程的基本原理示例
import time
# 简单的协程实现(使用生成器)
def simple_coroutine():
print("协程开始")
value = yield
print(f"协程接收到值:{value}")
value = yield "协程返回值"
print(f"协程接收到第二个值:{value}")
return "协程结束"
# 创建协程对象
coro = simple_coroutine()
# 启动协程
print("启动协程:")
next(coro) # 执行到第一个 yield
# 发送值并恢复协程
print("\n发送第一个值:")
try:
result = coro.send("Hello") # 发送值并执行到第二个 yield
print(f"协程返回值:{result}")
# 发送第二个值
print("\n发送第二个值:")
result = coro.send("World") # 发送值并执行到结束
except StopIteration as e:
print(f"协程结束,返回值:{e.value}")
# 测试协程的暂停与恢复
print("\n测试协程的暂停与恢复:")
def timer_coroutine():
"""计时器协程"""
start = time.time()
while True:
elapsed = time.time() - start
yield elapsed
time.sleep(0.5) # 模拟耗时操作
# 创建计时器协程
timer = timer_coroutine()
# 使用计时器
print("开始计时:")
for i in range(5):
elapsed = next(timer)
print(f"第 {i + 1} 次调用,已过时间:{elapsed:.2f}秒")1.4 协程的优势
2. Python中的协程实现
send()、throw() 和 close() 方法(Python 2.5+)async/await 语法的协程(Python 3.5+)2.1 生成器协程
yield 语句来暂停执行:# 生成器协程示例
def generator_coroutine():
"""生成器协程"""
print("协程开始")
while True:
value = yield
print(f"协程接收到值:{value}")
if value == "exit":
break
print("协程结束")
# 创建协程对象
coro = generator_coroutine()
# 启动协程
next(coro)
# 发送值
coro.send("Hello")
coro.send("World")
coro.send("exit")
# 测试带返回值的生成器协程
def counting_coroutine():
"""计数协程"""
count = 0
while True:
action = yield count
if action == "increment":
count += 1
elif action == "reset":
count = 0
elif action == "exit":
break
return count
# 创建协程对象
coro = counting_coroutine()
# 启动协程
print(f"初始值:{next(coro)}")
# 发送操作
print(f"递增后:{coro.send('increment')}")
print(f"递增后:{coro.send('increment')}")
print(f"重置后:{coro.send('reset')}")
print(f"递增后:{coro.send('increment')}")
# 退出协程
try:
coro.send("exit")
except StopIteration as e:
print(f"协程结束,最终计数:{e.value}")2.2 原生协程
async/await 语法:# 原生协程示例
import asyncio
async def native_coroutine():
"""原生协程"""
print("协程开始")
await asyncio.sleep(1) # 模拟耗时操作
print("协程继续")
await asyncio.sleep(1) # 模拟耗时操作
print("协程结束")
return "协程返回值"
# 运行协程
async def main():
result = await native_coroutine()
print(f"协程返回值:{result}")
# 启动事件循环
print("启动事件循环:")
asyncio.run(main())
# 测试带参数的原生协程
async def greet(name):
"""问候协程"""
print(f"Hello, {name}!")
await asyncio.sleep(1)
print(f"Goodbye, {name}!")
return f"Greeted {name}"
# 运行多个协程
async def main_multiple():
# 并发运行多个协程
task1 = asyncio.create_task(greet("Alice"))
task2 = asyncio.create_task(greet("Bob"))
task3 = asyncio.create_task(greet("Charlie"))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
print(f"所有协程完成,结果:{results}")
# 启动事件循环
print("\n运行多个协程:")
asyncio.run(main_multiple())2.3 协程装饰器
@asyncio.coroutine 装饰器来标记协程函数:# 协程装饰器示例
import asyncio
@asyncio.coroutine
def decorated_coroutine():
"""使用装饰器的协程"""
print("协程开始")
yield from asyncio.sleep(1) # 模拟耗时操作
print("协程继续")
yield from asyncio.sleep(1) # 模拟耗时操作
print("协程结束")
return "协程返回值"
# 运行协程
@asyncio.coroutine
def main():
result = yield from decorated_coroutine()
print(f"协程返回值:{result}")
# 启动事件循环
print("启动事件循环:")
asyncio.run(main())3. 事件循环的工作原理
asyncio 库提供了事件循环的实现。3.1 事件循环的概念
3.2 事件循环的工作流程
# 事件循环的工作原理示例
import asyncio
import time
async def task1():
"""任务1"""
print("任务1开始")
await asyncio.sleep(2) # 模拟耗时操作
print("任务1结束")
return "任务1返回值"
async def task2():
"""任务2"""
print("任务2开始")
await asyncio.sleep(1) # 模拟耗时操作
print("任务2结束")
return "任务2返回值"
async def task3():
"""任务3"""
print("任务3开始")
await asyncio.sleep(1.5) # 模拟耗时操作
print("任务3结束")
return "任务3返回值"
async def main():
"""主协程"""
print(f"主协程开始,时间:{time.strftime('%H:%M:%S')}")
# 创建任务
task1_obj = asyncio.create_task(task1())
task2_obj = asyncio.create_task(task2())
task3_obj = asyncio.create_task(task3())
# 等待任务完成
results = await asyncio.gather(task1_obj, task2_obj, task3_obj)
print(f"主协程结束,时间:{time.strftime('%H:%M:%S')}")
print(f"任务结果:{results}")
# 启动事件循环
print("启动事件循环:")
asyncio.run(main())
# 测试事件循环的任务调度
async def periodic_task(name, interval):
"""周期性任务"""
for i in range(3):
print(f"{name} 执行,第 {i + 1} 次,时间:{time.strftime('%H:%M:%S')}")
await asyncio.sleep(interval)
async def main_periodic():
"""主协程"""
print(f"主协程开始,时间:{time.strftime('%H:%M:%S')}")
# 创建周期性任务
task1 = asyncio.create_task(periodic_task("任务A", 1))
task2 = asyncio.create_task(periodic_task("任务B", 2))
# 等待任务完成
await asyncio.gather(task1, task2)
print(f"主协程结束,时间:{time.strftime('%H:%M:%S')}")
# 启动事件循环
print("\n测试周期性任务:")
asyncio.run(main_periodic())3.3 事件循环的类型
asyncio 库提供了多种事件循环实现,适用于不同的平台和场景:# 事件循环的类型示例
import asyncio
# 获取当前事件循环
loop = asyncio.get_event_loop()
print(f"当前事件循环:{type(loop).__name__}")
# 测试不同的事件循环策略
print("\n测试事件循环策略:")
# 默认策略
default_policy = asyncio.get_event_loop_policy()
print(f"默认策略:{type(default_policy).__name__}")
# 尝试使用 uvloop
print("\n尝试使用 uvloop:")
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.new_event_loop()
print(f"uvloop 事件循环:{type(loop).__name__}")
except ImportError:
print("uvloop 未安装")
# 测试事件循环的关闭
print("\n测试事件循环的关闭:")
async def test_task():
print("测试任务")
await asyncio.sleep(1)
print("测试任务完成")
# 创建并运行事件循环
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(test_task())
finally:
loop.close()
print("事件循环已关闭")4. 协程的高级特性
4.1 协程的取消
cancel() 方法来取消正在执行的协程任务:# 协程的取消示例
import asyncio
import time
async def long_running_task():
"""长时间运行的任务"""
print("长时间运行的任务开始")
try:
for i in range(10):
print(f"任务执行中... {i + 1}/10")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("任务被取消")
raise # 重新抛出异常,确保任务正确结束
finally:
print("任务清理")
return "任务完成"
async def main():
"""主协程"""
# 创建任务
task = asyncio.create_task(long_running_task())
# 等待一段时间后取消任务
await asyncio.sleep(3)
print("取消任务")
task.cancel()
# 等待任务完成
try:
result = await task
print(f"任务结果:{result}")
except asyncio.CancelledError:
print("捕获到任务取消异常")
# 启动事件循环
print("测试协程的取消:")
asyncio.run(main())4.2 协程的超时处理
asyncio.wait_for() 函数来设置协程的超时时间:# 协程的超时处理示例
import asyncio
async def slow_task():
"""慢速任务"""
print("慢速任务开始")
await asyncio.sleep(5) # 模拟耗时操作
print("慢速任务结束")
return "慢速任务返回值"
async def main():
"""主协程"""
print("测试超时处理:")
try:
# 设置超时时间为3秒
result = await asyncio.wait_for(slow_task(), timeout=3)
print(f"任务结果:{result}")
except asyncio.TimeoutError:
print("任务超时")
# 启动事件循环
asyncio.run(main())
# 测试带超时的并行任务
async def task_with_timeout(name, delay):
"""带延迟的任务"""
print(f"任务 {name} 开始,延迟 {delay} 秒")
await asyncio.sleep(delay)
print(f"任务 {name} 结束")
return f"任务 {name} 返回值"
async def main_multiple():
"""主协程"""
print("\n测试带超时的并行任务:")
try:
# 创建任务
task1 = task_with_timeout("A", 2)
task2 = task_with_timeout("B", 4)
task3 = task_with_timeout("C", 1)
# 设置超时时间为3秒
results = await asyncio.wait_for(
asyncio.gather(task1, task2, task3),
timeout=3
)
print(f"任务结果:{results}")
except asyncio.TimeoutError:
print("任务超时")
# 启动事件循环
asyncio.run(main_multiple())4.3 协程的异常处理
# 协程的异常处理示例
import asyncio
async def task_with_exception():
"""会抛出异常的任务"""
print("任务开始")
await asyncio.sleep(1)
raise ValueError("任务执行出错")
async def main():
"""主协程"""
print("测试异常处理:")
try:
result = await task_with_exception()
print(f"任务结果:{result}")
except ValueError as e:
print(f"捕获到异常:{e}")
# 启动事件循环
asyncio.run(main())
# 测试并行任务的异常处理
async def task1():
"""任务1"""
print("任务1开始")
await asyncio.sleep(1)
raise ValueError("任务1出错")
async def task2():
"""任务2"""
print("任务2开始")
await asyncio.sleep(2)
print("任务2结束")
return "任务2返回值"
async def main_multiple():
"""主协程"""
print("\n测试并行任务的异常处理:")
try:
# 创建任务
task1_obj = asyncio.create_task(task1())
task2_obj = asyncio.create_task(task2())
# 等待任务完成
results = await asyncio.gather(task1_obj, task2_obj)
print(f"任务结果:{results}")
except ValueError as e:
print(f"捕获到异常:{e}")
# 启动事件循环
asyncio.run(main_multiple())4.4 协程的嵌套
# 协程的嵌套示例
import asyncio
async def inner_coroutine():
"""内部协程"""
print("内部协程开始")
await asyncio.sleep(1)
print("内部协程结束")
return "内部协程返回值"
async def middle_coroutine():
"""中间协程"""
print("中间协程开始")
result = await inner_coroutine()
print(f"获取内部协程结果:{result}")
await asyncio.sleep(1)
print("中间协程结束")
return f"中间协程返回值,内部结果:{result}"
async def outer_coroutine():
"""外部协程"""
print("外部协程开始")
result = await middle_coroutine()
print(f"获取中间协程结果:{result}")
await asyncio.sleep(1)
print("外部协程结束")
return f"外部协程返回值,中间结果:{result}"
async def main():
"""主协程"""
print("测试协程嵌套:")
result = await outer_coroutine()
print(f"最终结果:{result}")
# 启动事件循环
asyncio.run(main())
# 测试深度嵌套
async def nested_coroutine(depth):
"""深度嵌套的协程"""
if depth > 0:
print(f"嵌套深度 {depth} 开始")
result = await nested_coroutine(depth - 1)
print(f"嵌套深度 {depth} 结束,获取结果:{result}")
return f"深度 {depth} 返回值"
else:
print("嵌套深度 0 开始")
await asyncio.sleep(0.5)
print("嵌套深度 0 结束")
return "深度 0 返回值"
async def main_depth():
"""主协程"""
print("\n测试深度嵌套:")
result = await nested_coroutine(5)
print(f"最终结果:{result}")
# 启动事件循环
asyncio.run(main_depth())5. 协程的应用场景
5.1 网络编程
# 协程在网络编程中的应用
import asyncio
import aiohttp
async def fetch_url(session, url):
"""获取URL内容"""
try:
async with session.get(url) as response:
status = response.status
content_length = response.content_length or 0
print(f"URL: {url}, 状态码: {status}, 内容长度: {content_length}")
# 读取响应内容
await response.read()
return status
except Exception as e:
print(f"URL: {url}, 错误: {e}")
return None
async def main():
"""主协程"""
urls = [
"https://www.example.com",
"https://www.google.com",
"https://www.github.com",
"https://www.python.org",
"https://www.baidu.com",
"https://www.microsoft.com",
"https://www.apple.com",
"https://www.amazon.com",
"https://www.facebook.com",
"https://www.twitter.com"
]
print(f"开始获取 {len(urls)} 个URL")
# 创建会话
async with aiohttp.ClientSession() as session:
# 创建任务
tasks = [fetch_url(session, url) for url in urls]
# 等待所有任务完成
results = await asyncio.gather(*tasks)
print(f"\n所有URL获取完成,成功: {results.count(200)}, 失败: {results.count(None)}")
# 启动事件循环
print("测试协程网络编程:")
asyncio.run(main())5.2 并发任务处理
# 协程在并发任务处理中的应用
import asyncio
import time
async def process_task(task_id, delay):
"""处理任务"""
print(f"任务 {task_id} 开始,延迟 {delay} 秒")
await asyncio.sleep(delay) # 模拟耗时操作
result = task_id * 10
print(f"任务 {task_id} 结束,结果: {result}")
return result
async def main():
"""主协程"""
# 创建任务列表
tasks = [
process_task(1, 2),
process_task(2, 1),
process_task(3, 3),
process_task(4, 1.5),
process_task(5, 2.5),
process_task(6, 0.5),
process_task(7, 1.2),
process_task(8, 2.8),
process_task(9, 0.8),
process_task(10, 1.8)
]
print(f"开始处理 {len(tasks)} 个任务")
start_time = time.time()
# 并发处理所有任务
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"\n所有任务处理完成,耗时: {end_time - start_time:.2f}秒")
print(f"任务结果: {results}")
print(f"结果总和: {sum(results)}")
# 启动事件循环
print("测试协程并发任务处理:")
asyncio.run(main())
# 测试批量任务处理
async def batch_process(tasks, batch_size=5):
"""批量处理任务"""
results = []
for i in range(0, len(tasks), batch_size):
batch = tasks[i:i + batch_size]
print(f"处理批次 {i//batch_size + 1}, 任务数量: {len(batch)}")
batch_results = await asyncio.gather(*batch)
results.extend(batch_results)
return results
async def main_batch():
"""主协程"""
# 创建大量任务
tasks = [process_task(i, 0.1) for i in range(1, 21)]
print(f"\n开始批量处理 {len(tasks)} 个任务")
start_time = time.time()
# 批量处理任务
results = await batch_process(tasks, batch_size=5)
end_time = time.time()
print(f"\n所有任务处理完成,耗时: {end_time - start_time:.2f}秒")
print(f"任务结果: {results}")
# 启动事件循环
asyncio.run(main_batch())5.3 异步文件操作
# 协程在异步文件操作中的应用
import asyncio
import aiofiles
import time
async def write_file(filename, content):
"""异步写入文件"""
async with aiofiles.open(filename, 'w') as f:
await f.write(content)
print(f"文件 {filename} 写入完成")
async def read_file(filename):
"""异步读取文件"""
async with aiofiles.open(filename, 'r') as f:
content = await f.read()
print(f"文件 {filename} 读取完成,内容长度: {len(content)}")
return content
async def main():
"""主协程"""
# 创建测试文件
files = [f"test{i}.txt" for i in range(1, 6)]
contents = [f"Content for file {i}\n" * 1000 for i in range(1, 6)]
print(f"开始处理 {len(files)} 个文件")
start_time = time.time()
# 异步写入文件
write_tasks = [write_file(files[i], contents[i]) for i in range(len(files))]
await asyncio.gather(*write_tasks)
# 异步读取文件
read_tasks = [read_file(file) for file in files]
read_results = await asyncio.gather(*read_tasks)
end_time = time.time()
print(f"\n所有文件操作完成,耗时: {end_time - start_time:.2f}秒")
print(f"读取的文件数量: {len(read_results)}")
# 启动事件循环
print("测试协程异步文件操作:")
asyncio.run(main())5.4 数据库操作
# 协程在数据库操作中的应用
import asyncio
# 注意:需要安装 aiomysql 库
try:
import aiomysql
async def create_connection():
"""创建数据库连接"""
conn = await aiomysql.connect(
host='localhost',
port=3306,
user='root',
password='password', # 请替换为实际密码
db='test', # 请替换为实际数据库
loop=asyncio.get_event_loop()
)
return conn
async def test_db():
"""测试数据库操作"""
try:
# 创建连接
conn = await create_connection()
cursor = await conn.cursor()
# 创建表
await cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
age INT NOT NULL
)
''')
print("表创建成功")
# 插入数据
users = [('Alice', 30), ('Bob', 25), ('Charlie', 35)]
await cursor.executemany(
'INSERT INTO users (name, age) VALUES (%s, %s)',
users
)
await conn.commit()
print(f"插入 {cursor.rowcount} 条数据")
# 查询数据
await cursor.execute('SELECT * FROM users')
results = await cursor.fetchall()
print("查询结果:")
for row in results:
print(row)
# 清理数据
await cursor.execute('DELETE FROM users')
await conn.commit()
print("数据清理完成")
except Exception as e:
print(f"数据库操作错误:{e}")
finally:
if 'cursor' in locals():
await cursor.close()
if 'conn' in locals():
conn.close()
# 启动事件循环
print("测试协程数据库操作:")
asyncio.run(test_db())
except ImportError:
print("aiomysql 库未安装,跳过数据库测试")6. 协程的性能考虑
6.1 性能测试
# 协程的性能测试
import asyncio
import threading
import time
# 测试函数:模拟I/O操作
def io_operation(delay):
"""模拟I/O操作"""
time.sleep(delay)
async def async_io_operation(delay):
"""异步模拟I/O操作"""
await asyncio.sleep(delay)
# 测试线程性能
def test_threads(count, delay):
"""测试线程性能"""
threads = []
for i in range(count):
t = threading.Thread(target=io_operation, args=(delay,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 测试协程性能
async def test_coroutines(count, delay):
"""测试协程性能"""
tasks = []
for i in range(count):
task = asyncio.create_task(async_io_operation(delay))
tasks.append(task)
await asyncio.gather(*tasks)
# 测试同步性能
def test_sync(count, delay):
"""测试同步性能"""
for i in range(count):
io_operation(delay)
# 运行性能测试
print("协程性能测试:")
count = 1000
delay = 0.01
# 测试同步
print(f"\n测试同步执行 {count} 个任务,每个任务延迟 {delay} 秒")
start = time.time()
test_sync(count, delay)
end = time.time()
print(f"同步执行耗时:{end - start:.4f}秒")
# 测试线程
print(f"\n测试线程执行 {count} 个任务,每个任务延迟 {delay} 秒")
start = time.time()
test_threads(count, delay)
end = time.time()
print(f"线程执行耗时:{end - start:.4f}秒")
# 测试协程
print(f"\n测试协程执行 {count} 个任务,每个任务延迟 {delay} 秒")
start = time.time()
asyncio.run(test_coroutines(count, delay))
end = time.time()
print(f"协程执行耗时:{end - start:.4f}秒")
# 测试更大的任务量
count = 10000
delay = 0.001
print(f"\n测试更大的任务量:{count} 个任务,每个任务延迟 {delay} 秒")
# 测试协程
print("\n测试协程执行:")
start = time.time()
asyncio.run(test_coroutines(count, delay))
end = time.time()
print(f"协程执行耗时:{end - start:.4f}秒")
# 测试线程(注意:线程数量过多可能会导致系统资源耗尽)
print(f"\n测试线程执行(使用 1000 个线程):")
start = time.time()
test_threads(1000, delay * 10) # 减少线程数量,增加每个线程的延迟
end = time.time()
print(f"线程执行耗时:{end - start:.4f}秒")6.2 性能优化策略
# 协程性能优化策略示例
import asyncio
import time
# 测试不同的任务分组方式
async def process_task(task_id):
"""处理任务"""
await asyncio.sleep(0.01) # 模拟耗时操作
return task_id
async def process_batch(tasks):
"""处理批次任务"""
return await asyncio.gather(*tasks)
async def main_no_batching():
"""不使用批次处理"""
tasks = [process_task(i) for i in range(10000)]
results = await asyncio.gather(*tasks)
return results
async def main_with_batching(batch_size=1000):
"""使用批次处理"""
tasks = [process_task(i) for i in range(10000)]
results = []
for i in range(0, len(tasks), batch_size):
batch = tasks[i:i + batch_size]
batch_results = await process_batch(batch)
results.extend(batch_results)
return results
# 运行性能测试
print("协程性能优化策略测试:")
# 测试不使用批次处理
print("\n测试不使用批次处理:")
start = time.time()
asyncio.run(main_no_batching())
end = time.time()
print(f"不使用批次处理耗时:{end - start:.4f}秒")
# 测试使用批次处理
print("\n测试使用批次处理:")
start = time.time()
asyncio.run(main_with_batching())
end = time.time()
print(f"使用批次处理耗时:{end - start:.4f}秒")
# 测试不同批次大小
print("\n测试不同批次大小:")
batch_sizes = [100, 500, 1000, 2000, 5000]
for batch_size in batch_sizes:
start = time.time()
asyncio.run(main_with_batching(batch_size))
end = time.time()
print(f"批次大小 {batch_size}:{end - start:.4f}秒")7. 实践案例:实现一个简单的异步Web服务器
7.1 案例概述
asyncio 和 aiohttp 库来实现一个简单的异步Web服务器,展示协程在网络编程中的应用。7.2 实现代码
# 实现异步Web服务器
import asyncio
from aiohttp import web
import time
# 处理函数:首页
async def handle_index(request):
"""处理首页请求"""
return web.Response(text="Hello, Async Web Server!")
# 处理函数:延迟响应
async def handle_delay(request):
"""处理延迟响应请求"""
# 获取延迟参数
delay = float(request.match_info.get('delay', 1))
print(f"处理延迟请求,延迟 {delay} 秒")
# 模拟耗时操作
await asyncio.sleep(delay)
return web.Response(text=f"Delayed response after {delay} seconds")
# 处理函数:并发测试
async def handle_concurrent(request):
"""处理并发测试请求"""
# 获取并发数参数
count = int(request.match_info.get('count', 10))
print(f"处理并发测试请求,并发数 {count}")
# 创建并发任务
async def task(i):
await asyncio.sleep(0.1) # 模拟耗时操作
return i
# 执行并发任务
tasks = [task(i) for i in range(count)]
results = await asyncio.gather(*tasks)
return web.Response(text=f"Concurrent tasks completed: {results}")
# 处理函数:状态信息
async def handle_status(request):
"""处理状态信息请求"""
# 获取事件循环信息
loop = asyncio.get_event_loop()
stats = {
"loop": type(loop).__name__,
"time": time.strftime('%Y-%m-%d %H:%M:%S'),
"uptime": f"{time.time() - start_time:.2f} seconds"
}
return web.json_response(stats)
# 初始化服务器
async def init_app():
"""初始化应用"""
app = web.Application()
# 注册路由
app.add_routes([
web.get('/', handle_index),
web.get('/delay/{delay}', handle_delay),
web.get('/concurrent/{count}', handle_concurrent),
web.get('/status', handle_status)
])
return app
# 全局变量:服务器启动时间
start_time = time.time()
# 启动服务器
print("启动异步Web服务器:")
print("访问地址:http://localhost:8080")
print("测试路径:")
print(" / - 首页")
print(" /delay/{秒数} - 延迟响应测试")
print(" /concurrent/{数量} - 并发测试")
print(" /status - 服务器状态")
print("\n按 Ctrl+C 停止服务器")
# 运行服务器
web.run_app(init_app(), port=8080)7.3 应用场景
8. 总结
async/await 语法可以更简洁、更清晰地编写协程代码。结合 asyncio 库提供的事件循环和各种异步I/O操作,我们可以构建高性能的异步应用程序。9. 参考文献
10. 结语