ETL flow framework based on Yaml configs in Python

Overview

logo

ETL framework based on Yaml configs in Python

Supported Python Versions License Code style: black

A light framework for creating data streams. Setting up streams through configuration in the Yaml file. There is a schedule, task pools, concurrency limitation. Works quickly, does not require a lot of resources. Runs on Windows and Linux. Flow run in parallel via threading library. Internally SQLite Database. Native data transformation. There is a web interface.

At the moment there are connectors to sources

  • CSV file
  • SQLite
  • Postgres
  • MySQL
  • Yandex Metrika Management API
  • Yandex Metrika Stats API
  • Yandex Metrika Logs API
  • Yandex Direct API
  • Yandex Direct Report API
  • Criteo
  • Google Sheets

Storages

  • Save to csv file
  • Clickhouse

Documentation

Requirements

  • python >=3.9
  • virtual environment

Settings

It is highly recommended to install in a virtual environment.

Flowmaster needs a home, '{HOME}/FlowMaster' is the default,
but you can lay foundation somewhere else if you prefer
(optional)

For Windows

setx FLOWMASTER_HOME "{YOUR_PATH}"

For Linux

export FLOWMASTER_HOME={YOUR_PATH}

Installing

pip install flowmaster==0.7.1

# For install web UI.
pip install flowmaster[webui]==0.7.1

# Optional libraries.
pip install flowmaster[clickhouse,postgres,mysql,yandexdirect,yandexmetrika,criteo,googlesheets]==0.7.1

Run

flowmaster run --help
flowmaster run

WEB UI

http://localhost:8822

CHANGELOG

Support

Telegram support chat

Author

Pavel Maksimov

My contacts Telegram, Facebook

Удачи тебе, друг! Поставь звездочку ;)

You might also like...
signac-flow - manage workflows with signac
signac-flow - manage workflows with signac

signac-flow - manage workflows with signac The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, a

Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.
Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.

Data lineage made simple, reliable, and automated. Effortlessly track the flow of data, understand dependencies and analyze impact. Features Visualiza

Randomisation-based inference in Python based on data resampling and permutation.

Randomisation-based inference in Python based on data resampling and permutation.

Karate Club: An API Oriented Open-source Python Framework for Unsupervised Learning on Graphs (CIKM 2020)
Karate Club: An API Oriented Open-source Python Framework for Unsupervised Learning on Graphs (CIKM 2020)

Karate Club is an unsupervised machine learning extension library for NetworkX. Please look at the Documentation, relevant Paper, Promo Video, and Ext

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code. Tuplex has similar Python APIs to Apache Spark or Dask, but rather than invoking the Python interpreter, Tuplex generates optimized LLVM bytecode for the given pipeline and input data set.

BioMASS - A Python Framework for Modeling and Analysis of Signaling Systems
BioMASS - A Python Framework for Modeling and Analysis of Signaling Systems

Mathematical modeling is a powerful method for the analysis of complex biological systems. Although there are many researches devoted on produ

 PyChemia, Python Framework for Materials Discovery and Design
PyChemia, Python Framework for Materials Discovery and Design

PyChemia, Python Framework for Materials Discovery and Design PyChemia is an open-source Python Library for materials structural search. The purpose o

wikirepo is a Python package that provides a framework to easily source and leverage standardized Wikidata information
wikirepo is a Python package that provides a framework to easily source and leverage standardized Wikidata information

Python based Wikidata framework for easy dataframe extraction wikirepo is a Python package that provides a framework to easily source and leverage sta

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams Motivation When dataset freshness is critical, the annotating of high speed

Comments
  •  No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'

    No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'

    Привет, очень хороший проект, однако столкнулся со следующей проблемой при устанвоке библиотеки

    1. с ванильным python pip такого пакета вообще не видно
    2. при установке через conda установка проходит замечательно, однако при запуске получаю
    (base) [email protected]:~/FlowMaster$ flowmaster run
    Traceback (most recent call last):
      File "/home/ubuntu/miniforge3/bin/flowmaster", line 5, in <module>
        from flowmaster.__main__ import app
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/__main__.py", line 9, in <module>
        import flowmaster.cli.notebook
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/cli/notebook.py", line 5, in <module>
        from flowmaster.service import (
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/service.py", line 11, in <module>
        from flowmaster.operators.etl.policy import ETLNotebook
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/__init__.py", line 3, in <module>
        from flowmaster.operators.etl.providers.abstract import ProviderAbstract, ExportAbstract
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/__init__.py", line 4, in <module>
        from flowmaster.operators.etl.providers.criteo import CriteoProvider
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/criteo/__init__.py", line 2, in <module>
        from flowmaster.operators.etl.providers.criteo.export import (
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/criteo/export.py", line 8, in <module>
        from flowmaster.executors import SleepIteration
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/executors/__init__.py", line 16, in <module>
        from flowmaster.pool import pools
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/pool.py", line 106, in <module>
        pools_dict = YamlHelper.parse_file(str(Settings.POOL_CONFIG_FILEPATH))
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/utils/yaml_helper.py", line 14, in parse_file
        with open(path, "rb") as f:
    FileNotFoundError: [Errno 2] No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'
    

    Что я делаю не так?(

    opened by micweeks 1
Releases(0.7.1)
  • 0.7.1(Aug 29, 2021)

    • prevented planned of tasks from one instance of the operator class
    • fixed error GeneratorExit
    • fixed transform array type for Clickhouse loader
    Source code(tar.gz)
    Source code(zip)
  • 0.6.1(Jun 22, 2021)

    Redesigned executor

    New

    • add politics 'time_limit_seconds_from_worktime', 'soft_time_limit_seconds'.
    • add provider 'flowmaster'

    Fixing

    • fix schedule (interval seconds mode)
    • add logging 'loguru'
    • fix clear_statuses_of_lost_items
    • fix allow_execute_flow
    • change command 'db reset'

    There are backward incompatible changes

    • new field 'expires_utc' in FlowItem
    • rename command 'run' to 'run_local' and rename command 'run_thread' to 'run'
    • add new class ExecutorIterationTask.
    • change, moving and rename class ThreadExecutor to ThreadAsyncExecutor.
    • change and rename class SleepTask to SleepIteration.
    • change and rename class TaskPool to NextIterationInPools.
    • ETLOperator return ExecutorIterationTask.
    • rename func order_flow to ordering_flow_tasks.
    • rename func start_executor to sync_executor.
    • rename field FlowItem.config_hash to FlowItem.notebook_hash
    • change FLOW_CONFIGS_DIR and rename FLOW_CONFIGS_DIR to NOTEBOOKS_DIR
    • rename objects config to notebook
    • add class Settings
    Source code(tar.gz)
    Source code(zip)
  • 0.3.1(May 15, 2021)

  • 0.2.2(May 13, 2021)

Owner
Павел Максимов
Python Data Engineer, Python Developer, ETL, Разработчик рекомендательных систем
Павел Максимов
DefAP is a program developed to facilitate the exploration of a material's defect chemistry

DefAP is a program developed to facilitate the exploration of a material's defect chemistry. A large number of features are provided and rapid exploration is supported through the use of autoplotting

6 Oct 25, 2022
Jupyter notebooks for the book "The Elements of Statistical Learning".

This repository contains Jupyter notebooks implementing the algorithms found in the book and summary of the textbook.

Madiyar 369 Dec 30, 2022
Dbt-core - dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.

Dbt-core - dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.

dbt Labs 6.3k Jan 08, 2023
Collections of pydantic models

pydantic-collections The pydantic-collections package provides BaseCollectionModel class that allows you to manipulate collections of pydantic models

Roman Snegirev 20 Dec 26, 2022
PostQF is a user-friendly Postfix queue data filter which operates on data produced by postqueue -j.

PostQF Copyright © 2022 Ralph Seichter PostQF is a user-friendly Postfix queue data filter which operates on data produced by postqueue -j. See the ma

Ralph Seichter 11 Nov 24, 2022
signac-flow - manage workflows with signac

signac-flow - manage workflows with signac The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, a

Glotzer Group 44 Oct 14, 2022
Employee Turnover Analysis

Employee Turnover Analysis Submission to the DataCamp competition "Can you help reduce employee turnover?"

Jannik Wiedenhaupt 1 Feb 13, 2022
PySpark bindings for H3, a hierarchical hexagonal geospatial indexing system

h3-pyspark: Uber's H3 Hexagonal Hierarchical Geospatial Indexing System in PySpark PySpark bindings for the H3 core library. For available functions,

Kevin Schaich 12 Dec 24, 2022
Uses MIT/MEDSL, New York Times, and US Census datasources to analyze per-county COVID-19 deaths.

Covid County Executive summary Setup Install miniconda, then in the command line, run conda create -n covid-county conda activate covid-county conda i

Ahmed Fasih 1 Dec 22, 2021
Visions provides an extensible suite of tools to support common data analysis operations

Visions And these visions of data types, they kept us up past the dawn. Visions provides an extensible suite of tools to support common data analysis

168 Dec 28, 2022
SparseLasso: Sparse Solutions for the Lasso

SparseLasso: Sparse Solutions for the Lasso Introduction SparseLasso provides a Scikit-Learn based estimation of the Lasso with cross-validation tunin

Gabriel Okasa 1 Nov 08, 2021
Orchest is a browser based IDE for Data Science.

Orchest is a browser based IDE for Data Science. It integrates your favorite Data Science tools out of the box, so you don’t have to. The application is easy to use and can run on your laptop as well

Orchest 3.6k Jan 09, 2023
Spaghetti: an open-source Python library for the analysis of network-based spatial data

pysal/spaghetti SPAtial GrapHs: nETworks, Topology, & Inference Spaghetti is an open-source Python library for the analysis of network-based spatial d

Python Spatial Analysis Library 203 Jan 03, 2023
Pipeline to convert a haploid assembly into diploid

HapDup (haplotype duplicator) is a pipeline to convert a haploid long read assembly into a dual diploid assembly. The reconstructed haplotypes

Mikhail Kolmogorov 50 Jan 05, 2023
A multi-platform GUI for bit-based analysis, processing, and visualization

A multi-platform GUI for bit-based analysis, processing, and visualization

Mahlet 529 Dec 19, 2022
This module is used to create Convolutional AutoEncoders for Variational Data Assimilation

VarDACAE This module is used to create Convolutional AutoEncoders for Variational Data Assimilation. A user can define, create and train an AE for Dat

Julian Mack 23 Dec 16, 2022
Program that predicts the NBA mvp based on data from previous years.

NBA MVP Predictor A machine learning model using RandomForest Regression that predicts NBA MVP's using player data. Explore the docs » View Demo · Rep

Muhammad Rabee 1 Jan 21, 2022
Projeto para realizar o RPA Challenge . Utilizando Python e as bibliotecas Selenium e Pandas.

RPA Challenge in Python Projeto para realizar o RPA Challenge (www.rpachallenge.com), utilizando Python. O objetivo deste desafio é criar um fluxo de

Henrique A. Lourenço 1 Apr 12, 2022
2019 Data Science Bowl

Kaggle-2019-Data-Science-Bowl-Solution - Here i present my solution to kaggle 2019 data science bowl and how i improved it to win a silver medal in that competition.

Deepak Nandwani 1 Jan 01, 2022
Python library for creating data pipelines with chain functional programming

PyFunctional Features PyFunctional makes creating data pipelines easy by using chained functional operators. Here are a few examples of what it can do

Pedro Rodriguez 2.1k Jan 05, 2023