Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
Quanser Labs Robotic Arm With Python

Quanser-Labs-Robotic-Arm As a team, we programmed a Robotic-Arm in Python on the

1 Jul 11, 2022
Official repository for the BPF Performance Tools book

BPF Performance Tools This is the official repository of BPF (eBPF) tools from the book BPF Performance Tools: Linux and Application Observability. Th

Brendan Gregg 1.2k Dec 28, 2022
A simple calculator made with tkinter.

Simple Calculator A simple calculator made with tkinter. Requirements None, only you need to have windows 😉 ...Enjoy! Installation Clone this reposit

Abhyush 2 Jan 11, 2022
Final project for ENGG 5402 Advanced Robotics in CUHK

Final project Final project Update Foundations Ubuntu virtual machine Ubuntu How to use Github to keep tracking the change of code version? Docker Set

Junjia Liu 8 Aug 01, 2022
A simple PID tuner and simulator.

PIDtuner-V0.1 PlantPy PID tuner version 0.1 Features Supports first order and ramp process models. Supports Proportional action on PV or error or a sp

3 Jun 23, 2022
validation for pre-commit.ci configuration

pre-commit-ci-config validation for pre-commit.ci configuration installation pip install pre-commit-ci-config api pre_commit_ci_config.SCHEMA a cfgv s

pre-commit.ci 17 Jul 11, 2022
Module for remote in-memory Python package/module loading through HTTP/S

httpimport Python's missing feature! The feature has been suggested in Python Mailing List Remote, in-memory Python package/module importing through H

John Torakis 220 Dec 17, 2022
A program made in PYTHON🐍 that automatically performs data insertions into a POSTGRES database 🐘 , using as base a .CSV file 📁 , useful in mass data insertions

A program made in PYTHON🐍 that automatically performs data insertions into a POSTGRES database 🐘 , using as base a .CSV file 📁 , useful in mass data insertions.

Davi Galdino 1 Oct 17, 2022
This is the code of Python enthusiasts collection and written.

I am Python's enthusiast, like to collect Python's programs and code.

cnzb 35 Apr 18, 2022
MinimalGearDisplay, Assetto Corsa app

MinimalGearDisplay MinimalGearDisplay, Assetto Corsa app. Just displays the current gear you are on. Download and Install To use this app, clone or do

1 Jan 10, 2022
A collection of convenient parsers for Advent of Code problems.

Advent of Code Parsers A collection of convenient Python parsers for Advent of Code problems. Installation pip install aocp Quickstart You can import

Miguel Blanco Marcos 3 Dec 13, 2021
Free and open source qualitative research tool

Taguette A spin on the phrase "tag it!", Taguette is a free and open source qualitative research tool that allows users to: Import PDFs, Word Docs (.d

Remi Rampin 48 Jan 02, 2023
PyLaboratory 0 Feb 07, 2022
一个Graia-Saya的插件仓库

一个Graia-Saya的插件仓库 这是一个存储基于 Graia-Saya 的插件的仓库 如果您有这类项目

ZAPHAKIEL 111 Oct 24, 2022
An a simple sistem code in python

AMS OS An a simple code in python ⁕¿What is AMS OS? AMS OS is an a simple sistem code writed in python. This code helps you with the cotidian task, yo

1 Nov 10, 2021
berisi kodingan kodingan python umum yang kubuat.

python-codevault berisi kodingan kodingan python umum yang kubuat. untuk memudahkan transisi dan menjadi refrensi tutorial. daily challange for myself

Agung Zon Blade 1 Dec 19, 2021
Implent of Oracle Base line and Lea-3 Baseline

Oracle-Baseline Implent of Oracle Base line and Lea-3 Baseline Oracle Oracle : This model is used to obtain an oracle with a greedy algorithm similar

Andrew Zeng 2 Nov 12, 2021
From "fixed RAnDom CRashes" to "[FIX] Fixed random crashes."

Clean Commit From fixed RAnDom CRashes to [FIX] Fixed random crashes. Clean commit helps you by auto-formating your commits to make your repos better

Mathias 3 Dec 26, 2021
Wisdom Tree is a concentration app i am working on.

Wisdom Tree Wisdom Tree is a tui concentration app I am working on. Inspired by the wisdom tree in Plants vs. Zombies which gives in-game tips when it

NO ONE 241 Jan 01, 2023
Tools for analyzing Java JVM gc log files

gc_log This package consists of two separate utilities useful for : gc_log_visualizer.py regionsize.py GC Log Visualizer This was updated to run under

Brad Schoening 0 Jan 04, 2022