Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
Dev-meme - A repository that contains memes just for people like us

A repository that contains memes just for people like us. Coders are constantly

Padmashree Jha 4 Oct 31, 2022
PyGo custom language, New but similar language programming

New but similar language programming. Now we are capable to program in a very similar language to Python but at the same time get the efficiency of Go.

Fernando Perez 4 Nov 19, 2022
Replay Felica Exchange For Python

FelicaReplay Replay Felica Exchange Description Standalone Replay Module Usage Save FelicaRelay (=2.0) output to file, then python replay.py [FILE].

3 Jul 14, 2022
From "fixed RAnDom CRashes" to "[FIX] Fixed random crashes."

Clean Commit From fixed RAnDom CRashes to [FIX] Fixed random crashes. Clean commit helps you by auto-formating your commits to make your repos better

Mathias 3 Dec 26, 2021
Tracing and Observability with OpenFaaS

Tracing and Observability with OpenFaaS Today we will walk through how to add OpenTracing or OpenTelemetry with Grafana's Tempo. For this walk-through

Lucas Roesler 8 Nov 17, 2022
A program that analyzes data from inertia measurement units installeed in aircraft and generates g-exceedance curves

A program that analyzes data from inertia measurement units installeed in aircraft and generates g-exceedance curves

Pooya 1 Nov 23, 2021
Search and Find Jobs in Ethiopia

✨ EthioJobs ✨ Search and Find Jobs in Ethiopia Easy start critical warning Use pycharm No vscode No sublime No Vim No nothing when you want to use

Abdimk 12 Nov 09, 2022
Inverted-pendulum-with-fuzzy-control - Inverted pendulum with fuzzy control

Fuzzy Inverted Pendulum Basically, this project consists of an inverted pendulum

Mahan Ahmadvand 1 Aug 25, 2022
Pygments is a generic syntax highlighter written in Python

Welcome to Pygments This is the source of Pygments. It is a generic syntax highlighter written in Python that supports over 500 languages and text for

1.2k Jan 06, 2023
This is a simple SV calling package for diploid assemblies.

dipdiff This is a simple SV calling package for diploid assemblies. It uses a modified version of svim-asm. The package includes its own version minim

Mikhail Kolmogorov 11 Jan 05, 2023
Make after-work Mending More flexible In Python

Mending Make after-work Mending More flexible In Python A Lite Package focuses on making project's after-post mending pythonic and flexible. Certainly

2 Jun 15, 2022
Sacred is a tool to help you configure, organize, log and reproduce experiments developed at IDSIA.

Sacred Every experiment is sacred Every experiment is great If an experiment is wasted God gets quite irate Sacred is a tool to help you configure, or

IDSIA 4k Jan 02, 2023
Hook and simulate global keyboard events on Windows and Linux.

keyboard Take full control of your keyboard with this small Python library. Hook global events, register hotkeys, simulate key presses and much more.

BoppreH 3.2k Jan 01, 2023
An Airflow operator to call the main function from the dbt-core Python package

airflow-dbt-python An Airflow operator to call the main function from the dbt-core Python package Motivation Airflow running in a managed environment

Tomás Farías Santana 93 Jan 08, 2023
python for windows extensions

This is the readme for the Python for Win32 (pywin32) extensions source code. See CHANGES.txt for recent changes. 'setup.py' is a standard distutils

27 Dec 08, 2022
Something like Asteroids but not really, done in CircuitPython

CircuitPython Staroids Something like Asteroids, done in CircuitPython. Works with FunHouse, MacroPad, Pybadge, EdgeBadge, CLUE, and Pygamer. circuitp

Tod E. Kurt 14 May 31, 2022
Hypothesis strategies for generating Python programs, something like CSmith

hypothesmith Hypothesis strategies for generating Python programs, something like CSmith. This is definitely pre-alpha, but if you want to play with i

Zac Hatfield-Dodds 73 Dec 14, 2022
An universal linux port of deezer, supporting both Flatpak and AppImage

Deezer for linux This repo is an UNOFFICIAL linux port of the official windows-only Deezer app. Being based on the windows app, it allows downloading

Aurélien Hamy 154 Jan 06, 2023
Get information about what a Python frame is currently doing, particularly the AST node being executed

executing This mini-package lets you get information about what a frame is currently doing, particularly the AST node being executed. Usage Getting th

Alex Hall 211 Jan 01, 2023
Traductor de webs desde consola usando el servicio de Google Traductor.

proxiGG Traductor de webs desde consola usando el servicio de Google Traductor. Se adjunta el código fuente para Python3 y un binario compilado en C p

@as_informatico 2 Oct 20, 2021