Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
[CVPR 2020] Rethinking Class-Balanced Methods for Long-Tailed Visual Recognition from a Domain Adaptation Perspective

Rethinking Class-Balanced Methods for Long-Tailed Visual Recognition from a Domain Adaptation Perspective [Arxiv] This is PyTorch implementation of th

Abdullah Jamal 22 Nov 19, 2022
Serverless demo showing users how they can capture (and obfuscate) their Lambda payloads in Datadog APM

Serverless-capture-lambda-payload-demo Serverless demo showing users how they can capture (and obfuscate) their Lambda payloads in Datadog APM This wi

Datadog, Inc. 1 Nov 02, 2021
tgEasy | Easy for a Brighter Shine | Monkey Patcher Addon for Pyrogram

tgEasy | Easy for a Brighter Shine | Monkey Patcher Addon for Pyrogram

Jayant Hegde Kageri 35 Nov 12, 2022
Repo created for the purpose of adding any kind of programs and projects

Programs and Project Repository A repository for adding programs and projects of any kind starting from beginners level to expert ones Contributing to

Unicorn Dev Community 3 Nov 02, 2022
Goddard A collection of small, simple strategies for Freqtrade

Goddard A collection of small, simple strategies for Freqtrade. Simply add the strategy you choose in your strategies folder and run. ⚠️ General Crypt

Shane Jones 118 Dec 14, 2022
Telegram bot to upload media to telegra.ph

Telegraph @StarkTelegraphBot A star ⭐ from you means a lot to us ! Telegram bot to upload media to telegra.ph Usage Deploy to Heroku Tap on above butt

Stark Bots 24 Dec 29, 2022
A python script for compiling and executing .cc files

Debug And Run A python script for compiling and executing .cc files Example dbrun fname.cc [DEBUG MODE] Compiling fname.cc with C++17 ------------

1 May 28, 2022
A desktop app to check the unlocked courses bases on previously done courses.

Course Picker A desktop app to check the unlocked courses bases on previously done courses. Table of contents About the Project Built with What it doe

Ahmed Symum Swapno 3 Feb 07, 2022
A tool converting rpk (记乎) to apkg (Anki Package)

RpkConverter This tool is used to convert rpk file to Anki apkg. 如果遇到任何问题,请发起issue,并描述情况。如果转换rpk出现问题,请将文件发到邮箱 ssqyang [AT] outlook.com,我会debug并修复问题。 下

9 Nov 01, 2021
A multi-platform fuzzer for poking at userland binaries and servers

litefuzz A multi-platform fuzzer for poking at userland binaries and servers litefuzz intro why how it works what it does what it doesn't do support p

52 Nov 18, 2022
pybicyclewheel calulates the required spoke length for bicycle wheels

pybicyclewheel pybicyclewheel calulates the required spoke length for bicycle wheels. (under construcion) - homepage further readings wikipedia bicyc

karl 0 Aug 24, 2022
A web-based chat application that enables multiple users to interact with one another

A web-based chat application that enables multiple users to interact with one another, in the same chat room or different ones according to their choosing.

3 Apr 22, 2022
Grouping nucleotide coordinate ranges.

NuclRanger Grouping nucleotide coordinate ranges. A quick pre-processing step for "bedtools getfasta":- https://bedtools.readthedocs.io/en/latest/cont

Sujanavan Tiruvayipati 1 Oct 04, 2022
Some basic sorting algos

Sorting-Algos Some basic sorting algos HacktoberFest 2021 This repository consists of mezzo-level projects that undertake a simple task and perform it

Manthan Ghasadiya 7 Dec 13, 2022
Get a link to the web version of a git-tracked file or directory

githyperlink Get a link to the web version of a git-tracked file or directory. Applies to GitHub and GitLab remotes (and maybe others but those are no

Tomas Fiers 2 Nov 08, 2022
addon for blender to import mocap data from tools like easymocap, frankmocap and Vibe

b3d_mocap_import addon for blender to import mocap data from tools like easymocap, frankmocap and Vibe ==================VIBE================== To use

Carlos Barreto 97 Dec 07, 2022
A practice program to find the LCM i.e Lowest Common Multiplication of two numbers using python without library.

Finding-LCM-using-python-from-scratch Here, I write a practice program to find the LCM i.e Lowest Common Multiplication of two numbers using python wi

Sachin Vinayak Dabhade 4 Sep 24, 2021
Better firefox bookmarks script for rofi

rofi-bookmarks Small python script to open firefox bookmarks with rofi. Features Icons! Only show bookmarks in a specified bookmark folder Show entire

32 Nov 10, 2022
Lookup for interesting stuff in SMB shares

SMBSR - what is that? Well, SMBSR is a python script which given a CIDR/IP/IP_file/HOSTNAME(s) enumerates all the SMB services listening (445) among t

Vincenzo 112 Dec 15, 2022
A clock purely made with python(turtle)...

Clock A clock purely made with python(turtle)... Requirements Pythone3 IDE or any other IDE Installation Clone this repository Running Open this proje

Abhyush 1 Jan 11, 2022