Framework for creating efficient data processing pipelines

Related tags



Framework for creating efficient data processing pipelines.


Feel free to ask questions in telegram

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask

class MyModel:
    """This is CPU bound model example."""
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        self.number = number
        self.sum = None  # result will be here
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await
        task = Task(int(number))
        return web.json_response(data={'result': task.sum})

def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
    app.router.add_post('/sum', SumView)

    return app

if __name__ == '__main__':


Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example

class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None

class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    OVERHEAD_TIME = 0.02

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)

class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict

def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]

async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))

tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,

await flow_with_batch_handler.stop()


The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log

if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
Owner engineering team open source projects
A webapp for taking fast notes, designed for business, school, and collaboration with groups.

JOTS Journal of the Session A webapp for taking fast notes, designed for business, school, and collaboration with groups.

Zebadiah S. Taylor 2 Jun 10, 2022
Emulate and Dissect MSF and *other* attacks

Need help in analyzing Windows shellcode or attack coming from Metasploit Framework or Cobalt Strike (or may be also other malicious or obfuscated code)? Do you need to automate tasks with simple scr

123 Dec 16, 2022
🐍 This snake helps you reconnect the Web, with RSS feeds!

This snake helps you reconnect the Web, with RSS feeds! RSSerpent is an open-source software that create RSS feeds for websites that do not provide an

211 Dec 08, 2022
Open source home automation that puts local control and privacy first

Home Assistant Open source home automation that puts local control and privacy first. Powered by a worldwide community of tinkerers and DIY enthusiast

Home Assistant 57k Jan 02, 2023
Odoo. Open Source Apps To Grow Your Business.

Odoo Odoo is a suite of web based open source business apps. The main Odoo Apps include an Open Source CRM, Website Builder, eCommerce, Warehouse Mana

Odoo 27.6k Jan 09, 2023
Converts a base copy of Pokemon BDSP's masterdatas into a more readable and editable Pokemon Showdown Format.

Showdown-BDSP-Converter Converts a base copy of Pokemon BDSP's masterdatas into a more readable and editable Pokemon Showdown Format. Download the lat

Alden Mo 2 Jan 02, 2022
An unofficial opensource Pokemon cursor theme for Windows and Linux.

pokemon-cursor An unofficial opensource Pokemon cursor theme for Windows and Linux. Cursor Sizes 22 24 28 32 40 48 56 64 72 80 88 96 Colors Quick inst

Kaiz Khatri 72 Dec 26, 2022
πŸŽ‰ πŸŽ‰ PyComp - Java Code compiler written in python.

πŸŽ‰ πŸŽ‰ PyComp Java Code compiler written in python. This is yet another compiler meant for babcock students project which was created using pure python

Alumona Benaiah 5 Nov 30, 2022
HSPICE can not perform Monte Carlo (MC) simulations while considering aging effects

HSPICE can not perform Monte Carlo (MC) simulations while considering aging effects. I developed a python wrapper that automatically performs MC and aging simulations using HPSICE to save engineering

Habib Kazemi 2 Nov 22, 2021
A Way to Use Python, Easier.

PyTools A Way to Use Python, Easier. How to Install Just copy this code, then make a new file in your project directory called, then paste

Kamran 2 Aug 15, 2022
pvaPy provides Python bindings for EPICS pvAccess

PvaPy - PvAccess for Python The PvaPy package is a Python API for EPICS7. It supports both PVA and CA providers, all standard EPICS7 types (structures

EPICS Base 25 Dec 05, 2022
This script can be used to get unlimited Gb for WARP.

Warp-Unlimited-GB This script can be used to get unlimited Gb for WARP. How to use Change the value of the 'referrer' to warp id of yours You can down

Anix Sam Saji 1 Feb 14, 2022
This project intends to take the user's CEP (brazilian adress code) and return the local in which the CEP is placed.

This project aims to simply return the CEP's (the brazilian resident adress code) User of the application. The project uses a request and passes on to

Daniel Soares Saldanha 4 Nov 17, 2021
This repository contains various tools useful for offensive operations (reversing, etc) regarding the PE (Portable Executable) format

PE-Tools This repository contains various tools useful for offensive operations (reversing, etc) regarding the PE (Portable Executable) format Install

stark0de 4 Oct 13, 2022
Make your Discord Account Online 24/7!

Online-Forever Make your Discord Account Online 24/7! A Code written in Python that helps you to keep your account 24/7. The is the main file.

SealedSaucer 0 Mar 16, 2022
Solutions to the language assignment for Internship in JALA Technologies.

Python Assignment Solutions (JALA Technologies) Solutions to the language assignment for Internship in JALA Technologies. Features Properly formatted

Samyak Jain 2 Jan 17, 2022
Python Function to manage users via SCIM

Python Function to manage users via SCIM This script helps you to manage your v2 users. You can add and delete users or groups, add users to groups an

4 Oct 11, 2022
A collection of common regular expressions bundled with an easy to use interface.

CommonRegex Find all times, dates, links, phone numbers, emails, ip addresses, prices, hex colors, and credit card numbers in a string. We did the har

Madison May 1.5k Dec 31, 2022
Module for working with the site with python

dnevnikru Module for working with the site with python Dnevnik object accepts login and password from the account Methods: homew

Aleksandr 21 Nov 21, 2022
Module 2's katas from Launch X's python introduction course.

Module2Katas Module 2's katas from Launch X's python introduction course. Virtual environment creation process (on Windows): Create a folder in any de

Javier MΓ©ndez 1 Feb 10, 2022