+++ title = "Python异步管道" date = 2022-12-09 draft = false [taxonomies] tags=["python"] +++ 最近flink使用比较多,使用python处理大规模数据的时,按照`Pythonic`风格编码很难受,在github上找了一下python流式管道的库,发现了[pypeln](https://github.com/cgarciae/pypeln),[aiostream](https://github.com/vxgmichel/aiostream)。 ## pypeln使用 > Concurrent data pipelines in Python >>> pypeln是一个并发数据管道库,当你觉得使用Spark、Flink、Dask过重,直接处理太慢的时候,可以使用它。 ### 安装 ```bash pip install pypeln -i https://pypi.douban.com/simple ``` ### 基本用法 ```python ## 使用多进程模式 # import pypeln.process as operator ## 使用多线程模式 # import pypeln.thread as operator # 使用协程模式 import pypeln.task as operator def before_start_hook(database_uri): async def wrapper(): return {'database':await MongoClient(database_uri)} return wrapper async def on_done_hook(database): await database.close() async def find_url(data_id,database): return await database.get_url_by_id(data_id) async def mock_http(url): # 模拟http请求 return await asyncio.sleep(3,{'url':url}) async def mock_data_store(doc,database): await database.insert_one(doc) async def mock_data_source(): for i in range(100): yield str(i) pipes=(mock_data_source() # on_start:依赖注入到运行函数中 on_done:在结束时回调 |operator.map(find_url,on_start=before_start_hook('data_uri'),on_done=on_done_hook,workers=8,maxsize=8) |operator.map(mock_http,maxsize=200,workers=200) |operator.each(mock_data_store,on_start=before_start_hook('data_uri'),on_done=on_done_hook,workers=8,maxsize=8) ) # 运行 for pipe in pipes: pass ``` ### pypeln的问题 pypeln对于普通的并发任务可以很好的处理,该库没有实现buffer运算符,无法将流转换成批进行批量操作,写数据库和写文件存在瓶颈。 ## aiostream > Generator-based operators for asynchronous iteration aiostream是一个基于生成器的异步库,使用拉模型,天然背压。 ### 安装 ```bash pip install aiostream -i https://pypi.douban.com/simple ``` ### 基本使用 ```python import asyncio from aiostream import stream, pipe async def mock_http(url): # 模拟http请求 return await asyncio.sleep(3,{'url':url}) async def mock_data_store(docs): await database.insert_one(doc) async def mock_data_source(): for i in range(100): yield str(i) async def main(): async with get_database() as database: async def find_url(data_id): return await database.get_url_by_id(data_id) async def mock_data_store(docs): await database.insert_many(docs) await (stream.iterate(mock_data_source()) |stream.map(find_url,task_limit=5) |stream.map(mock_http,task_limit=5) |stream.timeout_buffer(100,3) |stream.map(mock_data_store,task_limit=2) ) asyncio.run(main()) ``` 上面示例代码中`timeout_buffer`操作符官方没有实现,根据github issue中作者给出了样例: ```python from contextlib import asynccontextmanager import asyncio from aiostream import pipe, operator, streamcontext @asynccontextmanager async def buffer(streamer, size=1): queue = asyncio.Queue(maxsize=size) sentinel = object() async def consume(): try: async for item in streamer: await queue.put(item) finally: await queue.put(sentinel) @operator async def wrapper(): while True: item = await queue.get() if item is sentinel: await future return yield item future = asyncio.ensure_future(consume()) try: yield wrapper() finally: future.cancel() @operator(pipable=True) async def catch(source, exc_cls): async with streamcontext(source) as streamer: try: async for item in streamer: yield item except exc_cls: return @operator(pipable=True) async def chunks(source, n, timeout): async with streamcontext(source) as streamer: async with buffer(streamer) as buffered: async with streamcontext(buffered) as first_streamer: async for first in first_streamer: tail = await ( buffered | pipe.timeout(timeout) | catch.pipe(asyncio.TimeoutError) | pipe.take(n - 1) | pipe.list() ) yield [first, *tail] pipe.timeout_buffer = chunks.pipe ``` ### aiostream的问题 拉模型分组分流实现比较麻烦,所有的流使用`merge`操作符汇聚调用`await`方法执行,[RxPY](https://github.com/ReactiveX/RxPY)是一个很好的替代品,采取推模式,但是3.x之后官方不在维护背压(back-pressure),`reactivex`概念难以理解,只能放弃使用。 ## 应用 aiostream除了适合流式处理数据,也特别适合处理爬虫业务,使用aiostream重构后的爬虫,整体结构更加清晰,适合长期维护的爬虫。依靠python异步的性能,资源利用率,数据爬取效率均有一定提升。 之前公司内部部分项目使用`scrapy`,但是99%的`scrapy`特性没有使用,只是将`scrapy`作为爬取器与调度器,然后通过pipeline落库。今年爬虫项目大部分都上了k8s集群维护,不依赖scrapy的进程守护、web查看等功能,因此写了一个简化版本的`scrapy`,兼容部分scrapy api,公司内部所有使用scrapy的爬虫均可以替换依赖的方式兼容,无需修改代码。 后续考虑使用aiostream重构一版异步scrapy兼容框架减少项目内存与CPU资源的占用。