Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
Minimalistic Gridworld Environment (MiniGrid)

Minimalistic Gridworld Environment (MiniGrid) There are other gridworld Gym environments out there, but this one is designed to be particularly simple

Maxime Chevalier-Boisvert 1.7k Jan 03, 2023
Find your desired product in Digikala using this app.

Digikala Search Find your desired product in Digikala using this app. با این برنامه محصول مورد نظر خود را در دیجیکالا پیدا کنید. About me Full name: M

Matin Ardestani 17 Sep 15, 2022
Utility/Raiding selfbot made by Shell and Roover.

Utility/Raiding selfbot made by Shell and Roover. We are open to suggestions and ideas.

Shell 2 Dec 08, 2021
Rates how pog a word or user is. Not random and does have *some* kind of algorithm to it.

PogRater :D Rates how pogchamp a word is :D A fun project coded by JBYT27 using Python3 Have you ever wondered how pog a word is? Well, congrats, you

an aspirin 2 Jun 25, 2022
Bots in moderation and a game (for now)

Tutorial: come far funzionare il bot e durarlo per 24/7 (o quasi...) Ci sono 17 passi per seguire: Andare sul sito Replit https://replit.com/ Vedrete

ZacyKing 1 Dec 27, 2021
✨ Udemy Coupon Finder For Discord. Supports Turkish & English Language.

Udemy Course Finder Bot | Udemy Kupon Bulucu Botu This bot finds new udemy coupons and sends to the channel. Before Setup You must have python = 3.6

Penguen 4 May 04, 2022
Provides guideline on how to configure pre-commit hooks in your own python project

Pre-commit Configuration Guide The main aim of this repository is to act as a guide on how to configure the pre-commit hooks in your existing python p

Faraz Ahmed Khan 2 Mar 31, 2022
The bidirectional mapping library for Python.

bidict The bidirectional mapping library for Python. Status bidict: has been used for many years by several teams at Google, Venmo, CERN, Bank of Amer

Joshua Bronson 1.2k Dec 31, 2022
This is a simple SV calling package for diploid assemblies.

dipdiff This is a simple SV calling package for diploid assemblies. It uses a modified version of svim-asm. The package includes its own version minim

Mikhail Kolmogorov 11 Jan 05, 2023
Async Python Circuit Breaker implementation

aiocircuitbreaker This is an async Python implementation of the circuitbreaker library. Installation The project is available on PyPI. Simply run: $ p

5 Sep 05, 2022
A web application which you can search, buy or sell shares with current prices which provided by IEX.

CS50 - Stock Exchange A web application which you can search, buy or sell shares with current prices which provided by IEX. Table of Contents Setup St

1 May 28, 2022
Iris-client - Python client for DFIR-IRIS

Python client dfir_iris_client offers a Python interface to communicate with IRI

DFIR-IRIS 11 Dec 22, 2022
A dot matrix rendered using braille characters.

⣿ dotmatrix A dot matrix rendered using braille characters. Description This library provides class called Matrix which represents a dot matrix that c

Tim Fischer 25 Dec 12, 2022
J MBF - Assalamualaikum Mamang...

★ VISITOR ★ ★ INFORMATION ★ Script Ini DiBuat Oleh YayanXD Script Ini Akan DiPerjual Belikan Tanggal 9 Januari 2022 Jika Mau Beli Script Silahkan Hub

Risky [ Zero Tow ] 5 Apr 08, 2022
JARVIS PC Assistant is an assisting program to make your computer easier to use

JARVIS-PC-Assistant JARVIS PC Assistant is an assisting program to make your computer easier to use Welcome to the J.A.R.V.I.S. PC Assistant help file

Dasun Nethsara 2 Dec 02, 2022
Send notifications created in Frappe or ERPNext as push notication via Firebase Cloud Message(FCM)

FCM Notification for ERPNext Send notifications created in Frappe or ERPNext as push notication via Firebase Cloud Message(FCM) Steps to use the app:

Tridz 9 Nov 14, 2022
A multi-platform fuzzer for poking at userland binaries and servers

litefuzz A multi-platform fuzzer for poking at userland binaries and servers litefuzz intro why how it works what it does what it doesn't do support p

52 Nov 18, 2022
This is a far more in-depth and advanced version of "Write user interface to a file API Sample"

Fusion360-Write-UserInterface This is a far more in-depth and advanced version of "Write user interface to a file API Sample" from https://help.autode

4 Mar 18, 2022
🌍💉 Global COVID-19 vaccination data at the regional level.

COVID-19 vaccination data at subnational level. To ensure its officiality, the source data is carefully verified.

sociepy 61 Sep 21, 2022