simple way to build the declarative and destributed data pipelines with python

Overview

unipipeline

simple way to build the declarative and distributed data pipelines.

Why you should use it

  • Declarative strict config
  • Scaffolding
  • Fully typed
  • Python support 3.6+
  • Brokers support
    • kafka
    • rabbitmq
    • inmemory simple pubsub
  • Interruption handling = safe user code transactions
  • CLI

How to Install

$ pip3 install unipipeline

Example

# dag.yml
---

service:
  name: "example"
  echo_colors: true
  echo_level: error


external:
  service_name: {}


brokers:
  default_broker:
    import_template: "unipipeline.brokers.uni_memory_broker:UniMemoryBroker"

  ender_broker:
    import_template: "example.brokers.uni_log_broker:LogBroker"


messages:
  __default__:
    import_template: "example.messages.{{name}}:{{name|camel}}"

  input_message: {}

  inetermediate_message: {}

  ender_message: {}


cron:
  my_super_task:
    worker: my_super_cron_worker
    when: 0/1 * * * *

  my_mega_task:
    worker: my_super_cron_worker
    when: 0/2 * * * *

  my_puper_task:
    worker: my_super_cron_worker
    when: 0/3 * * * *


waitings:
  __default__:
    import_template: example.waitings.{{name}}_wating:{{name|camel}}Waiting

  common_db: {}


workers:
  __default__:
    import_template: "example.workers.{{name}}:{{name|camel}}"

  my_super_cron_worker:
    input_message: uni_cron_message

  input_worker:
    input_message: input_message
    waiting_for:
      - common_db

  intermediate_first_worker:
    input_message: inetermediate_message
    output_workers:
      - ender_second_worker
    waiting_for:
      - common_db

  intermediate_second_worker:
    input_message: inetermediate_message
    external: service_name
    output_workers:
      - ender_frist_worker

  ender_frist_worker:
    input_message: ender_message

  ender_second_worker:
    input_message: ender_message
    broker: ender_broker
    waiting_for:
      - common_db

Get Started

  1. create ./unipipeline.yml such as example above

  2. run cli command

unipipeline -f ./unipipeline.yml scaffold

It should create all structure of your workers, brokers and so on

  1. remove error raising from workers

  2. correct message structure for make more usefull

  3. correct broker connection (if need)

  4. run cli command to run your consumer

unipipeline -f ./unipipeline.yml consume input_worker

or with python

from unipipeline import Uni
u = Uni(f'./unipipeline.yml')
u.init_consumer_worker(f'input_worker')
u.initialize()
u.start_consuming()
  1. produce some message to the message broker by your self or with tools
unipipeline -f ./unipipeline.yml produce --worker input_worker --data='{"some": "prop"}'

or with python

# main.py
from unipipeline import Uni

u = Uni(f'./unipipeline.yml')
u.init_producer_worker(f'input_worker')
u.initialize()
u.send_to(f'input_worker', dict(some='prop'))

Definition

Service

service:
  name: some_name       # need for health-check file name
  echo_level: warning   # level of uni console logs (debug, info, warning, error)
  echo_colors: true     # show colors in console

External

external:
  some_name_of_external_service: {}
  • no props

  • it needs for declarative grouping the external workers with service

Worker

workers:
  __default__:                                        # each worker get this default props if defined
    retry_max_count: 10
    
  some_worker_name:
    retry_max_count: 3                                # just counter. message move to /dev/null if limit has reached 
    retry_delay_s: 1                                  # delay before retry
    topic: "{{name}}"                                 # template string
    error_payload_topic: "{{topic}}__error__payload"  # template string
    error_topic: "{{topic}}__error"                   # template string
    broker: "default_broker"                          # broker name. reference to message transport 
    external: null                                    # name of external service. reference in this config file 
    ack_after_success: true                           # automatic ack after process message
    waiting_for:                                      # list of references
      - some_waiting_name                             # name of block. this worker must wait for connection of this external service if need
    output_workers:                                   # list of references
      - some_other_worker_name                        # allow worker sending messages to this worker
    
    inport_template: "some.module.hierarchy.to.worker.{{name}}:{{name|camel}}OfClass"   # required module and classname for import

    input_message: "name_of_message"                  # required reference of input message type 

Waiting

waitings:
  some_blocked_service_name:
    retry_max_count: 3                         # the same semantic as worker.retry_max_count
    retry_delay_s: 10                          # the same semantic as worker.retry_delay_s
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

Broker

brokers:
  some_name_of_broker:
    retry_max_count: 3                         # the same semantic as worker.retry_max_count
    retry_delay_s: 10                          # the same semantic as worker.retry_delay_s
    content_type: application/json             # content type
    compression: null                          # compression (null, application/x-gzip, application/x-bz2, application/x-lzma)
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

Message

messages:
  name_of_message:
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

build in messages:

messages:
  uni_cron_message:
    import_template: unipipeline.messages.uni_cron_message:UniCronMessage

CLI

unipipeline

usage: unipipeline --help

UNIPIPELINE: simple way to build the declarative and distributed data pipelines. this is cli tool for unipipeline

positional arguments:
  {check,scaffold,init,consume,cron,produce}
                        sub-commands
    check               check loading of all modules
    scaffold            create all modules and classes if it is absent. no args
    init                initialize broker topics for workers
    consume             start consuming workers. connect to brokers and waiting for messages
    cron                start cron jobs, That defined in config file
    produce             publish message to broker. send it to worker

optional arguments:
  -h, --help            show this help message and exit
  --config-file CONFIG_FILE, -f CONFIG_FILE
                        path to unipipeline config file (default: ./unipipeline.yml)
  --verbose [VERBOSE]   verbose output (default: false)

unipipeline check

usage: 
    unipipeline -f ./unipipeline.yml check
    unipipeline -f ./unipipeline.yml --verbose=yes check

check loading of all modules

optional arguments:
  -h, --help  show this help message and exit

unipipeline init

usage: 
    unipipeline -f ./unipipeline.yml init
    unipipeline -f ./unipipeline.yml --verbose=yes init
    unipipeline -f ./unipipeline.yml --verbose=yes init --workers some_worker_name_01 some_worker_name_02

initialize broker topics for workers

optional arguments:
  -h, --help            show this help message and exit
  --workers INIT_WORKERS [INIT_WORKERS ...], -w INIT_WORKERS [INIT_WORKERS ...]
                        workers list for initialization (default: [])

unipipeline scaffold

usage: 
    unipipeline -f ./unipipeline.yml scaffold
    unipipeline -f ./unipipeline.yml --verbose=yes scaffold

create all modules and classes if it is absent. no args

optional arguments:
  -h, --help  show this help message and exit

unipipeline consume

usage: 
    unipipeline -f ./unipipeline.yml consume
    unipipeline -f ./unipipeline.yml --verbose=yes consume
    unipipeline -f ./unipipeline.yml consume --workers some_worker_name_01 some_worker_name_02
    unipipeline -f ./unipipeline.yml --verbose=yes consume --workers some_worker_name_01 some_worker_name_02

start consuming workers. connect to brokers and waiting for messages

optional arguments:
  -h, --help            show this help message and exit
  --workers CONSUME_WORKERS [CONSUME_WORKERS ...], -w CONSUME_WORKERS [CONSUME_WORKERS ...]
                        worker list for consuming

unipipeline produce

usage: 
    unipipeline -f ./unipipeline.yml produce --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml --verbose=yes produce --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml produce --alone --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml --verbose=yes produce --alone --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}

publish message to broker. send it to worker

optional arguments:
  -h, --help            show this help message and exit
  --alone [PRODUCE_ALONE], -a [PRODUCE_ALONE]
                        message will be sent only if topic is empty
  --worker PRODUCE_WORKER, -w PRODUCE_WORKER
                        worker recipient
  --data PRODUCE_DATA, -d PRODUCE_DATA
                        data for sending

unipipeline cron

usage: 
    unipipeline -f ./unipipeline.yml cron
    unipipeline -f ./unipipeline.yml --verbose=yes cron

start cron jobs, That defined in config file

optional arguments:
  -h, --help  show this help message and exit

Contributing

TODO LIST

  1. RPC Gateways: http, tcp, udp
  2. Close/Exit uni by call method
  3. Async producer
  4. Common Error Handling
  5. Async get_answer
  6. Server of Message layout
  7. Prometheus api
  8. req/res Sdk
  9. request tasks result registry
  10. Async consumer
  11. Async by default
  12. Multi-threading start with run-groups
Owner
aliaksandr-master
aliaksandr-master
MS in Data Science capstone project. Studying attacks on autonomous vehicles.

Surveying Attack Models for CAVs Guide to Installing CARLA and Collecting Data Our project focuses on surveying attack models for Connveced Autonomous

Isabela Caetano 1 Dec 09, 2021
Detailed analysis on fraud claims in insurance companies, gives you information as to why huge loss take place in insurance companies

Insurance-Fraud-Claims Detailed analysis on fraud claims in insurance companies, gives you information as to why huge loss take place in insurance com

1 Jan 27, 2022
Fitting thermodynamic models with pycalphad

ESPEI ESPEI, or Extensible Self-optimizing Phase Equilibria Infrastructure, is a tool for thermodynamic database development within the CALPHAD method

Phases Research Lab 42 Sep 12, 2022
Titanic data analysis for python

Titanic-data-analysis This Repo is an analysis on Titanic_mod.csv This csv file contains some assumed data of the Titanic ship after sinking This full

Hardik Bhanot 1 Dec 26, 2021
Minimal working example of data acquisition with nidaqmx python API

Data Aquisition using NI-DAQmx python API Based on this project It is a minimal working example for data acquisition using the NI-DAQmx python API. It

Pablo 1 Nov 05, 2021
Statsmodels: statistical modeling and econometrics in Python

About statsmodels statsmodels is a Python package that provides a complement to scipy for statistical computations including descriptive statistics an

statsmodels 8k Dec 29, 2022
Orchest is a browser based IDE for Data Science.

Orchest is a browser based IDE for Data Science. It integrates your favorite Data Science tools out of the box, so you don’t have to. The application is easy to use and can run on your laptop as well

Orchest 3.6k Jan 09, 2023
The OHSDI OMOP Common Data Model allows for the systematic analysis of healthcare observational databases.

The OHSDI OMOP Common Data Model allows for the systematic analysis of healthcare observational databases.

Bell Eapen 14 Jan 02, 2023
Implementation in Python of the reliability measures such as Omega.

reliabiliPy Summary Simple implementation in Python of the [reliability](https://en.wikipedia.org/wiki/Reliability_(statistics) measures for surveys:

Rafael Valero Fernández 2 Apr 27, 2022
ASOUL直播间弹幕抓取&&数据分析

ASOUL直播间弹幕抓取&&数据分析(更新中) 这些文件用于爬取ASOUL直播间的弹幕(其他直播间也可以)和其他信息,以及简单的数据分析生成。

159 Dec 10, 2022
ETL pipeline on movie data using Python and postgreSQL

Movies-ETL ETL pipeline on movie data using Python and postgreSQL Overview This project consisted on a automated Extraction, Transformation and Load p

Juan Nicolas Serrano 0 Jul 07, 2021
ETL flow framework based on Yaml configs in Python

ETL framework based on Yaml configs in Python A light framework for creating data streams. Setting up streams through configuration in the Yaml file.

Павел Максимов 18 Jul 06, 2022
Pip install minimal-pandas-api-for-polars

Minimal Pandas API for Polars Install From PyPI: pip install minimal-pandas-api-for-polars Example Usage (see tests/test_minimal_pandas_api_for_polars

Austin Ray 6 Oct 16, 2022
💬 Python scripts to parse Messenger, Hangouts, WhatsApp and Telegram chat logs into DataFrames.

Chatistics Python 3 scripts to convert chat logs from various messaging platforms into Pandas DataFrames. Can also generate histograms and word clouds

Florian 893 Jan 02, 2023
Hydrogen (or other pure gas phase species) depressurization calculations

HydDown Hydrogen (or other pure gas phase species) depressurization calculations This code is published under an MIT license. Install as simple as: pi

Anders Andreasen 13 Nov 26, 2022
A probabilistic programming language in TensorFlow. Deep generative models, variational inference.

Edward is a Python library for probabilistic modeling, inference, and criticism. It is a testbed for fast experimentation and research with probabilis

Blei Lab 4.7k Jan 09, 2023
GWpy is a collaboration-driven Python package providing tools for studying data from ground-based gravitational-wave detectors

GWpy is a collaboration-driven Python package providing tools for studying data from ground-based gravitational-wave detectors. GWpy provides a user-f

GWpy 342 Jan 07, 2023
Tools for analyzing data collected with a custom unity-based VR for insects.

unityvr Tools for analyzing data collected with a custom unity-based VR for insects. Organization: The unityvr package contains the following submodul

Hannah Haberkern 1 Dec 14, 2022
Data-sets from the survey and analysis

bachelor-thesis "Umfragewerte.xlsx" contains the orginal survey results. "umfrage_alle.csv" contains the survey results but one participant is cancele

1 Jan 26, 2022
Multiple Pairwise Comparisons (Post Hoc) Tests in Python

scikit-posthocs is a Python package that provides post hoc tests for pairwise multiple comparisons that are usually performed in statistical data anal

Maksim Terpilowski 264 Dec 30, 2022