Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
🙌Kart of 210+ projects based on machine learning, deep learning, computer vision, natural language processing and all. Show your support by ✨ this repository.

ML-ProjectKart 📌 Repository This kart showcases the finest collection of all projects based on machine learning, deep learning, computer vision, natu

Prathima Kadari 203 Dec 28, 2022
A simple python script where the user inputs the current ingredients they have in their kitchen into ingredients.txt

A simple python script where the user inputs the current ingredients they have in their kitchen into ingredients.txt and then runs the main.py script, and it will output what recipes can be created b

Jordan Leich 3 Nov 02, 2022
navigation_commander is a ROS package to command the robot to navigate autonomously to each table for food delivery inside a hotel.

navigation_commander navigation_commander is a ROS package to command the robot to navigate autonomously to each table for food delivery inside a hote

ALEENA LENTIN 9 Nov 08, 2021
A Dungeon and Dragons Toolkit using Python

Pythons-Dungeons A Dungeon and Dragons Toolkit using Python Rules: -When you are commiting please don't delete parts of the code that are important -A

2 Oct 21, 2021
This is a database of 180.000+ symbols containing Equities, ETFs, Funds, Indices, Futures, Options, Currencies, Cryptocurrencies and Money Markets.

Finance Database As a private investor, the sheer amount of information that can be found on the internet is rather daunting.

Jeroen Bouma 1.4k Dec 31, 2022
mrcal is a generic toolkit to solve calibration and SFM-like problems originating at NASA/JPL

mrcal is a generic toolkit to solve calibration and SFM-like problems originating at NASA/JPL. Functionality related to these problems is exposed as a set of C and Python libraries and some commandli

Dima Kogan 102 Dec 23, 2022
A Bot that adds YouTube views to your video of choice

YoutubeViews Free Youtube viewer bot A Bot that adds YouTube views to your video of choice Installation git clone https://github.com/davdtheemonk/Yout

ProbablyX 5 Dec 06, 2022
Minutaria is a basic educational Python timer used to learn python and software testing libraries.

minutaria minutaria is a basic educational Python timer. The project is educational, it aims to teach myself programming, python programming, python's

1 Jul 16, 2021
Simple and easy to use python API for the COVID registration booking system of the math department @ unipd (torre archimede)

Simple and easy to use python API for the COVID registration booking system of the math department @ unipd (torre archimede). This API creates an interface with the official browser, with more useful

Guglielmo Camporese 4 Dec 24, 2021
easy_sbatch - Batch submitting Slurm jobs with script templates

easy_sbatch - Batch submitting Slurm jobs with script templates

Wei Shen 13 Oct 11, 2022
SuperCollider library for Python

SuperCollider library for Python This project is a port of core features of SuperCollider's language to Python 3. It is intended to be the same librar

Lucas Samaruga 65 Dec 22, 2022
Purge your likes and wall comments from VKontakte. Set yourself free from your digital footprint.

vk_liberator Regain liberty in the cruel social media world. This program assists you with purging your metadata from Russian social network VKontakte

20 Jun 11, 2021
Insights in greek football league 2020-2021 and bookmaker's accuracy

Greek_Football_League_Analysis_2020_2021 Aim of Project: This project aims in deriving useful insights from greek football league 2020-2021 by mean st

2 Jan 16, 2022
Projeto-menu - This project is designed to learn more about control mechanisms in Python programming

Projeto-menu - This project is designed to learn more about control mechanisms in Python programming

Henrik Ricarte 2 Mar 01, 2022
Users can read others' travel journeys in addition to being able to upload and delete posts detailing their own experiences

Users can read others' travel journeys in addition to being able to upload and delete posts detailing their own experiences! Posts are organized by country and destination within that country.

Christopher Zeas 1 Feb 03, 2022
Traductor de webs desde consola usando el servicio de Google Traductor.

proxiGG Traductor de webs desde consola usando el servicio de Google Traductor. Se adjunta el código fuente para Python3 y un binario compilado en C p

@as_informatico 2 Oct 20, 2021
Student Enrollment Analysis System

SEAS Student Enrollment Analysis System Steps to start working: create a user name "seas", host name: local, password: seas, mark all checkbox - go C

Md. Zakaria Kabir 3 Jul 12, 2022
Demo content - Automate your automation!

Automate-AAP2 Demo Content - Automate your automation! A fully automated Ansible Automation Platform. Context Installing and configuring Ansible Autom

0 Oct 27, 2022
This is a fork of the BakeTool with some improvements that I did to have better workflow.

blender-bake-tool This is a fork of the BakeTool with some improvements that I did to have better workflow. 99.99% of work was done by BakeTool team.

Acvarium 3 Oct 04, 2022
Konomi: Kind and Optimized Next brOadcast watching systeM Infrastructure

Konomi 備考・注意事項 現在 α 版で、まだ実験的なプロダクトです。通常利用には耐えないでしょうし、サポートもできません。 安定しているとは到底言いがたい品質ですが、それでも構わない方のみ導入してください。 使い方などの説明も用意できていないため、自力でトラブルに対処できるエンジニアの方以外に

tsukumi 243 Dec 30, 2022