You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
blog/content/posts/python-async-pipeline.md

5.8 KiB

+++ title = "Python异步管道" date = 2022-12-09 draft = false [taxonomies] tags=["python"] +++ 最近flink使用比较多使用python处理大规模数据的时按照Pythonic风格编码很难受在github上找了一下python流式管道的库发现了pypelnaiostream

pypeln使用

Concurrent data pipelines in Python >>>

pypeln是一个并发数据管道库当你觉得使用Spark、Flink、Dask过重直接处理太慢的时候可以使用它。

安装

pip install pypeln -i https://pypi.douban.com/simple

基本用法

## 使用多进程模式
# import pypeln.process as operator

## 使用多线程模式
# import pypeln.thread as operator

# 使用协程模式
import pypeln.task as operator

def before_start_hook(database_uri):
    async def wrapper():
        return {'database':await MongoClient(database_uri)}
    return wrapper

async def on_done_hook(database):
    await database.close()

async def find_url(data_id,database):
    return await database.get_url_by_id(data_id)

async def mock_http(url):
    # 模拟http请求
    return await asyncio.sleep(3,{'url':url})

async def mock_data_store(doc,database):
    await database.insert_one(doc)

async def mock_data_source():
    for i in range(100):
        yield str(i)
pipes=(mock_data_source()
# on_start依赖注入到运行函数中 on_done在结束时回调
      |operator.map(find_url,on_start=before_start_hook('data_uri'),on_done=on_done_hook,workers=8,maxsize=8)
      |operator.map(mock_http,maxsize=200,workers=200)
      |operator.each(mock_data_store,on_start=before_start_hook('data_uri'),on_done=on_done_hook,workers=8,maxsize=8)
      )
# 运行
for pipe in pipes:
    pass

pypeln的问题

pypeln对于普通的并发任务可以很好的处理该库没有实现buffer运算符无法将流转换成批进行批量操作写数据库和写文件存在瓶颈。

aiostream

Generator-based operators for asynchronous iteration

aiostream是一个基于生成器的异步库使用拉模型天然背压。

安装

pip install aiostream -i https://pypi.douban.com/simple

基本使用

import asyncio
from aiostream import stream, pipe

async def mock_http(url):
    # 模拟http请求
    return await asyncio.sleep(3,{'url':url})

async def mock_data_store(docs):
    await database.insert_one(doc)

async def mock_data_source():
    for i in range(100):
        yield str(i)

async def main():
    async with get_database() as database:
        async def find_url(data_id):
            return await database.get_url_by_id(data_id)
        async def mock_data_store(docs):
            await database.insert_many(docs)
        await (stream.iterate(mock_data_source())
        |stream.map(find_url,task_limit=5)
        |stream.map(mock_http,task_limit=5)
        |stream.timeout_buffer(100,3)
        |stream.map(mock_data_store,task_limit=2)
        )
asyncio.run(main())

上面示例代码中timeout_buffer操作符官方没有实现根据github issue中作者给出了样例

from contextlib import asynccontextmanager

import asyncio
from aiostream import pipe, operator, streamcontext


@asynccontextmanager
async def buffer(streamer, size=1):
    queue = asyncio.Queue(maxsize=size)
    sentinel = object()

    async def consume():
        try:
            async for item in streamer:
                await queue.put(item)
        finally:
            await queue.put(sentinel)

    @operator
    async def wrapper():
        while True:
            item = await queue.get()
            if item is sentinel:
                await future
                return
            yield item


    future = asyncio.ensure_future(consume())
    try:
        yield wrapper()
    finally:
        future.cancel()


@operator(pipable=True)
async def catch(source, exc_cls):
    async with streamcontext(source) as streamer:
        try:
            async for item in streamer:
                yield item
        except exc_cls:
            return


@operator(pipable=True)
async def chunks(source, n, timeout):
    async with streamcontext(source) as streamer:
        async with buffer(streamer) as buffered:
            async with streamcontext(buffered) as first_streamer:
                async for first in first_streamer:
                    tail = await (
                            buffered
                            | pipe.timeout(timeout)
                            | catch.pipe(asyncio.TimeoutError)
                            | pipe.take(n - 1)
                            | pipe.list()
                    )
                    yield [first, *tail]
pipe.timeout_buffer = chunks.pipe

aiostream的问题

拉模型分组分流实现比较麻烦,所有的流使用merge操作符汇聚调用await方法执行,RxPY是一个很好的替代品采取推模式但是3.x之后官方不在维护背压back-pressurereactivex概念难以理解,只能放弃使用。

应用

aiostream除了适合流式处理数据也特别适合处理爬虫业务使用aiostream重构后的爬虫整体结构更加清晰适合长期维护的爬虫。依靠python异步的性能资源利用率数据爬取效率均有一定提升。

之前公司内部部分项目使用scrapy但是99%的scrapy特性没有使用,只是将scrapy作为爬取器与调度器然后通过pipeline落库。今年爬虫项目大部分都上了k8s集群维护不依赖scrapy的进程守护、web查看等功能因此写了一个简化版本的scrapy兼容部分scrapy api公司内部所有使用scrapy的爬虫均可以替换依赖的方式兼容无需修改代码。

后续考虑使用aiostream重构一版异步scrapy兼容框架减少项目内存与CPU资源的占用。