simple way to build the declarative and destributed data pipelines with python

Overview

unipipeline

simple way to build the declarative and distributed data pipelines.

Why you should use it

  • Declarative strict config
  • Scaffolding
  • Fully typed
  • Python support 3.6+
  • Brokers support
    • kafka
    • rabbitmq
    • inmemory simple pubsub
  • Interruption handling = safe user code transactions
  • CLI

How to Install

$ pip3 install unipipeline

Example

# dag.yml
---

service:
  name: "example"
  echo_colors: true
  echo_level: error


external:
  service_name: {}


brokers:
  default_broker:
    import_template: "unipipeline.brokers.uni_memory_broker:UniMemoryBroker"

  ender_broker:
    import_template: "example.brokers.uni_log_broker:LogBroker"


messages:
  __default__:
    import_template: "example.messages.{{name}}:{{name|camel}}"

  input_message: {}

  inetermediate_message: {}

  ender_message: {}


cron:
  my_super_task:
    worker: my_super_cron_worker
    when: 0/1 * * * *

  my_mega_task:
    worker: my_super_cron_worker
    when: 0/2 * * * *

  my_puper_task:
    worker: my_super_cron_worker
    when: 0/3 * * * *


waitings:
  __default__:
    import_template: example.waitings.{{name}}_wating:{{name|camel}}Waiting

  common_db: {}


workers:
  __default__:
    import_template: "example.workers.{{name}}:{{name|camel}}"

  my_super_cron_worker:
    input_message: uni_cron_message

  input_worker:
    input_message: input_message
    waiting_for:
      - common_db

  intermediate_first_worker:
    input_message: inetermediate_message
    output_workers:
      - ender_second_worker
    waiting_for:
      - common_db

  intermediate_second_worker:
    input_message: inetermediate_message
    external: service_name
    output_workers:
      - ender_frist_worker

  ender_frist_worker:
    input_message: ender_message

  ender_second_worker:
    input_message: ender_message
    broker: ender_broker
    waiting_for:
      - common_db

Get Started

  1. create ./unipipeline.yml such as example above

  2. run cli command

unipipeline -f ./unipipeline.yml scaffold

It should create all structure of your workers, brokers and so on

  1. remove error raising from workers

  2. correct message structure for make more usefull

  3. correct broker connection (if need)

  4. run cli command to run your consumer

unipipeline -f ./unipipeline.yml consume input_worker

or with python

from unipipeline import Uni
u = Uni(f'./unipipeline.yml')
u.init_consumer_worker(f'input_worker')
u.initialize()
u.start_consuming()
  1. produce some message to the message broker by your self or with tools
unipipeline -f ./unipipeline.yml produce --worker input_worker --data='{"some": "prop"}'

or with python

# main.py
from unipipeline import Uni

u = Uni(f'./unipipeline.yml')
u.init_producer_worker(f'input_worker')
u.initialize()
u.send_to(f'input_worker', dict(some='prop'))

Definition

Service

service:
  name: some_name       # need for health-check file name
  echo_level: warning   # level of uni console logs (debug, info, warning, error)
  echo_colors: true     # show colors in console

External

external:
  some_name_of_external_service: {}
  • no props

  • it needs for declarative grouping the external workers with service

Worker

workers:
  __default__:                                        # each worker get this default props if defined
    retry_max_count: 10
    
  some_worker_name:
    retry_max_count: 3                                # just counter. message move to /dev/null if limit has reached 
    retry_delay_s: 1                                  # delay before retry
    topic: "{{name}}"                                 # template string
    error_payload_topic: "{{topic}}__error__payload"  # template string
    error_topic: "{{topic}}__error"                   # template string
    broker: "default_broker"                          # broker name. reference to message transport 
    external: null                                    # name of external service. reference in this config file 
    ack_after_success: true                           # automatic ack after process message
    waiting_for:                                      # list of references
      - some_waiting_name                             # name of block. this worker must wait for connection of this external service if need
    output_workers:                                   # list of references
      - some_other_worker_name                        # allow worker sending messages to this worker
    
    inport_template: "some.module.hierarchy.to.worker.{{name}}:{{name|camel}}OfClass"   # required module and classname for import

    input_message: "name_of_message"                  # required reference of input message type 

Waiting

waitings:
  some_blocked_service_name:
    retry_max_count: 3                         # the same semantic as worker.retry_max_count
    retry_delay_s: 10                          # the same semantic as worker.retry_delay_s
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

Broker

brokers:
  some_name_of_broker:
    retry_max_count: 3                         # the same semantic as worker.retry_max_count
    retry_delay_s: 10                          # the same semantic as worker.retry_delay_s
    content_type: application/json             # content type
    compression: null                          # compression (null, application/x-gzip, application/x-bz2, application/x-lzma)
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

Message

messages:
  name_of_message:
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

build in messages:

messages:
  uni_cron_message:
    import_template: unipipeline.messages.uni_cron_message:UniCronMessage

CLI

unipipeline

usage: unipipeline --help

UNIPIPELINE: simple way to build the declarative and distributed data pipelines. this is cli tool for unipipeline

positional arguments:
  {check,scaffold,init,consume,cron,produce}
                        sub-commands
    check               check loading of all modules
    scaffold            create all modules and classes if it is absent. no args
    init                initialize broker topics for workers
    consume             start consuming workers. connect to brokers and waiting for messages
    cron                start cron jobs, That defined in config file
    produce             publish message to broker. send it to worker

optional arguments:
  -h, --help            show this help message and exit
  --config-file CONFIG_FILE, -f CONFIG_FILE
                        path to unipipeline config file (default: ./unipipeline.yml)
  --verbose [VERBOSE]   verbose output (default: false)

unipipeline check

usage: 
    unipipeline -f ./unipipeline.yml check
    unipipeline -f ./unipipeline.yml --verbose=yes check

check loading of all modules

optional arguments:
  -h, --help  show this help message and exit

unipipeline init

usage: 
    unipipeline -f ./unipipeline.yml init
    unipipeline -f ./unipipeline.yml --verbose=yes init
    unipipeline -f ./unipipeline.yml --verbose=yes init --workers some_worker_name_01 some_worker_name_02

initialize broker topics for workers

optional arguments:
  -h, --help            show this help message and exit
  --workers INIT_WORKERS [INIT_WORKERS ...], -w INIT_WORKERS [INIT_WORKERS ...]
                        workers list for initialization (default: [])

unipipeline scaffold

usage: 
    unipipeline -f ./unipipeline.yml scaffold
    unipipeline -f ./unipipeline.yml --verbose=yes scaffold

create all modules and classes if it is absent. no args

optional arguments:
  -h, --help  show this help message and exit

unipipeline consume

usage: 
    unipipeline -f ./unipipeline.yml consume
    unipipeline -f ./unipipeline.yml --verbose=yes consume
    unipipeline -f ./unipipeline.yml consume --workers some_worker_name_01 some_worker_name_02
    unipipeline -f ./unipipeline.yml --verbose=yes consume --workers some_worker_name_01 some_worker_name_02

start consuming workers. connect to brokers and waiting for messages

optional arguments:
  -h, --help            show this help message and exit
  --workers CONSUME_WORKERS [CONSUME_WORKERS ...], -w CONSUME_WORKERS [CONSUME_WORKERS ...]
                        worker list for consuming

unipipeline produce

usage: 
    unipipeline -f ./unipipeline.yml produce --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml --verbose=yes produce --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml produce --alone --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml --verbose=yes produce --alone --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}

publish message to broker. send it to worker

optional arguments:
  -h, --help            show this help message and exit
  --alone [PRODUCE_ALONE], -a [PRODUCE_ALONE]
                        message will be sent only if topic is empty
  --worker PRODUCE_WORKER, -w PRODUCE_WORKER
                        worker recipient
  --data PRODUCE_DATA, -d PRODUCE_DATA
                        data for sending

unipipeline cron

usage: 
    unipipeline -f ./unipipeline.yml cron
    unipipeline -f ./unipipeline.yml --verbose=yes cron

start cron jobs, That defined in config file

optional arguments:
  -h, --help  show this help message and exit

Contributing

TODO LIST

  1. RPC Gateways: http, tcp, udp
  2. Close/Exit uni by call method
  3. Async producer
  4. Common Error Handling
  5. Async get_answer
  6. Server of Message layout
  7. Prometheus api
  8. req/res Sdk
  9. request tasks result registry
  10. Async consumer
  11. Async by default
  12. Multi-threading start with run-groups
Owner
aliaksandr-master
aliaksandr-master
BIGDATA SIMULATION ONE PIECE WORLD CENSUS

ONE PIECE is a Japanese manga of great international success. The story turns inhabited in a fictional world, tells the adventures of a young man whose body gained rubber properties after accidentall

Maycon Cypriano 3 Jun 30, 2022
A neural-based binary analysis tool

A neural-based binary analysis tool Introduction This directory contains the demo of a neural-based binary analysis tool. We test the framework using

Facebook Research 208 Dec 22, 2022
Business Intelligence (BI) in Python, OLAP

Open Mining Business Intelligence (BI) Application Server written in Python Requirements Python 2.7 (Backend) Lua 5.2 or LuaJIT 5.1 (OML backend) Mong

Open Mining 1.2k Dec 27, 2022
Lale is a Python library for semi-automated data science.

Lale is a Python library for semi-automated data science. Lale makes it easy to automatically select algorithms and tune hyperparameters of pipelines that are compatible with scikit-learn, in a type-

International Business Machines 293 Dec 29, 2022
Data and code accompanying the paper Politics and Virality in the Time of Twitter

Politics and Virality in the Time of Twitter Data and code accompanying the paper Politics and Virality in the Time of Twitter. In specific: the code

Cardiff NLP 3 Jul 02, 2022
Stochastic Gradient Trees implementation in Python

Stochastic Gradient Trees - Python Stochastic Gradient Trees1 by Henry Gouk, Bernhard Pfahringer, and Eibe Frank implementation in Python. Based on th

John Koumentis 2 Nov 18, 2022
Calculate multilateral price indices in Python (with Pandas and PySpark).

IndexNumCalc Calculate multilateral price indices using the GEKS-T (CCDI), Time Product Dummy (TPD), Time Dummy Hedonic (TDH), Geary-Khamis (GK) metho

Dr. Usman Kayani 3 Apr 27, 2022
nrgpy is the Python package for processing NRG Data Files

nrgpy nrgpy is the Python package for processing NRG Data Files Website and source: https://github.com/nrgpy/nrgpy Documentation: https://nrgpy.github

NRG Tech Services 23 Dec 08, 2022
A library to create multi-page Streamlit applications with ease.

A library to create multi-page Streamlit applications with ease.

Jackson Storm 107 Jan 04, 2023
Statistical & Probabilistic Analysis of Store Sales, University Survey, & Manufacturing data

Statistical_Modelling Statistical & Probabilistic Analysis of Store Sales, University Survey, & Manufacturing data Statistical Methods for Decision Ma

Avnika Mehta 1 Jan 27, 2022
CleanX is an open source python library for exploring, cleaning and augmenting large datasets of X-rays, or certain other types of radiological images.

cleanX CleanX is an open source python library for exploring, cleaning and augmenting large datasets of X-rays, or certain other types of radiological

Candace Makeda Moore, MD 20 Jan 05, 2023
Predictive Modeling & Analytics on Home Equity Line of Credit

Predictive Modeling & Analytics on Home Equity Line of Credit Data (Python) HMEQ Data Set In this assignment we will use Python to examine a data set

Dhaval Patel 1 Jan 09, 2022
Flexible HDF5 saving/loading and other data science tools from the University of Chicago

deepdish Flexible HDF5 saving/loading and other data science tools from the University of Chicago. This repository also host a Deep Learning blog: htt

UChicago - Department of Computer Science 255 Dec 10, 2022
Larch: Applications and Python Library for Data Analysis of X-ray Absorption Spectroscopy (XAS, XANES, XAFS, EXAFS), X-ray Fluorescence (XRF) Spectroscopy and Imaging

Larch: Data Analysis Tools for X-ray Spectroscopy and More Documentation: http://xraypy.github.io/xraylarch Code: http://github.com/xraypy/xraylarch L

xraypy 95 Dec 13, 2022
A multi-platform GUI for bit-based analysis, processing, and visualization

A multi-platform GUI for bit-based analysis, processing, and visualization

Mahlet 529 Dec 19, 2022
A collection of robust and fast processing tools for parsing and analyzing web archive data.

ChatNoir Resiliparse A collection of robust and fast processing tools for parsing and analyzing web archive data. Resiliparse is part of the ChatNoir

ChatNoir 24 Nov 29, 2022
PCAfold is an open-source Python library for generating, analyzing and improving low-dimensional manifolds obtained via Principal Component Analysis (PCA).

PCAfold is an open-source Python library for generating, analyzing and improving low-dimensional manifolds obtained via Principal Component Analysis (PCA).

Burn Research 4 Oct 13, 2022
This project is the implementation template for HW 0 and HW 1 for both the programming and non-programming tracks

This project is the implementation template for HW 0 and HW 1 for both the programming and non-programming tracks

Donald F. Ferguson 4 Mar 06, 2022
Clean and reusable data-sciency notebooks.

KPACUBO KPACUBO is a set Jupyter notebooks focused on the best practices in both software development and data science, namely, code reuse, explicit d

Matvey Morozov 1 Jan 28, 2022
A Python Tools to imaging the shallow seismic structure

ShallowSeismicImaging Tools to imaging the shallow seismic structure, above 10 km, based on the ZH ratio measured from the ambient seismic noise, and

Xiao Xiao 9 Aug 09, 2022