ETL flow framework based on Yaml configs in Python

Overview

logo

ETL framework based on Yaml configs in Python

Supported Python Versions License Code style: black

A light framework for creating data streams. Setting up streams through configuration in the Yaml file. There is a schedule, task pools, concurrency limitation. Works quickly, does not require a lot of resources. Runs on Windows and Linux. Flow run in parallel via threading library. Internally SQLite Database. Native data transformation. There is a web interface.

At the moment there are connectors to sources

  • CSV file
  • SQLite
  • Postgres
  • MySQL
  • Yandex Metrika Management API
  • Yandex Metrika Stats API
  • Yandex Metrika Logs API
  • Yandex Direct API
  • Yandex Direct Report API
  • Criteo
  • Google Sheets

Storages

  • Save to csv file
  • Clickhouse

Documentation

Requirements

  • python >=3.9
  • virtual environment

Settings

It is highly recommended to install in a virtual environment.

Flowmaster needs a home, '{HOME}/FlowMaster' is the default,
but you can lay foundation somewhere else if you prefer
(optional)

For Windows

setx FLOWMASTER_HOME "{YOUR_PATH}"

For Linux

export FLOWMASTER_HOME={YOUR_PATH}

Installing

pip install flowmaster==0.7.1

# For install web UI.
pip install flowmaster[webui]==0.7.1

# Optional libraries.
pip install flowmaster[clickhouse,postgres,mysql,yandexdirect,yandexmetrika,criteo,googlesheets]==0.7.1

Run

flowmaster run --help
flowmaster run

WEB UI

http://localhost:8822

CHANGELOG

Support

Telegram support chat

Author

Pavel Maksimov

My contacts Telegram, Facebook

Удачи тебе, друг! Поставь звездочку ;)

You might also like...
signac-flow - manage workflows with signac
signac-flow - manage workflows with signac

signac-flow - manage workflows with signac The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, a

Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.
Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.

Data lineage made simple, reliable, and automated. Effortlessly track the flow of data, understand dependencies and analyze impact. Features Visualiza

Randomisation-based inference in Python based on data resampling and permutation.

Randomisation-based inference in Python based on data resampling and permutation.

Karate Club: An API Oriented Open-source Python Framework for Unsupervised Learning on Graphs (CIKM 2020)
Karate Club: An API Oriented Open-source Python Framework for Unsupervised Learning on Graphs (CIKM 2020)

Karate Club is an unsupervised machine learning extension library for NetworkX. Please look at the Documentation, relevant Paper, Promo Video, and Ext

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code. Tuplex has similar Python APIs to Apache Spark or Dask, but rather than invoking the Python interpreter, Tuplex generates optimized LLVM bytecode for the given pipeline and input data set.

BioMASS - A Python Framework for Modeling and Analysis of Signaling Systems
BioMASS - A Python Framework for Modeling and Analysis of Signaling Systems

Mathematical modeling is a powerful method for the analysis of complex biological systems. Although there are many researches devoted on produ

 PyChemia, Python Framework for Materials Discovery and Design
PyChemia, Python Framework for Materials Discovery and Design

PyChemia, Python Framework for Materials Discovery and Design PyChemia is an open-source Python Library for materials structural search. The purpose o

wikirepo is a Python package that provides a framework to easily source and leverage standardized Wikidata information
wikirepo is a Python package that provides a framework to easily source and leverage standardized Wikidata information

Python based Wikidata framework for easy dataframe extraction wikirepo is a Python package that provides a framework to easily source and leverage sta

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams Motivation When dataset freshness is critical, the annotating of high speed

Comments
  •  No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'

    No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'

    Привет, очень хороший проект, однако столкнулся со следующей проблемой при устанвоке библиотеки

    1. с ванильным python pip такого пакета вообще не видно
    2. при установке через conda установка проходит замечательно, однако при запуске получаю
    (base) [email protected]:~/FlowMaster$ flowmaster run
    Traceback (most recent call last):
      File "/home/ubuntu/miniforge3/bin/flowmaster", line 5, in <module>
        from flowmaster.__main__ import app
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/__main__.py", line 9, in <module>
        import flowmaster.cli.notebook
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/cli/notebook.py", line 5, in <module>
        from flowmaster.service import (
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/service.py", line 11, in <module>
        from flowmaster.operators.etl.policy import ETLNotebook
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/__init__.py", line 3, in <module>
        from flowmaster.operators.etl.providers.abstract import ProviderAbstract, ExportAbstract
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/__init__.py", line 4, in <module>
        from flowmaster.operators.etl.providers.criteo import CriteoProvider
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/criteo/__init__.py", line 2, in <module>
        from flowmaster.operators.etl.providers.criteo.export import (
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/criteo/export.py", line 8, in <module>
        from flowmaster.executors import SleepIteration
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/executors/__init__.py", line 16, in <module>
        from flowmaster.pool import pools
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/pool.py", line 106, in <module>
        pools_dict = YamlHelper.parse_file(str(Settings.POOL_CONFIG_FILEPATH))
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/utils/yaml_helper.py", line 14, in parse_file
        with open(path, "rb") as f:
    FileNotFoundError: [Errno 2] No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'
    

    Что я делаю не так?(

    opened by micweeks 1
Releases(0.7.1)
  • 0.7.1(Aug 29, 2021)

    • prevented planned of tasks from one instance of the operator class
    • fixed error GeneratorExit
    • fixed transform array type for Clickhouse loader
    Source code(tar.gz)
    Source code(zip)
  • 0.6.1(Jun 22, 2021)

    Redesigned executor

    New

    • add politics 'time_limit_seconds_from_worktime', 'soft_time_limit_seconds'.
    • add provider 'flowmaster'

    Fixing

    • fix schedule (interval seconds mode)
    • add logging 'loguru'
    • fix clear_statuses_of_lost_items
    • fix allow_execute_flow
    • change command 'db reset'

    There are backward incompatible changes

    • new field 'expires_utc' in FlowItem
    • rename command 'run' to 'run_local' and rename command 'run_thread' to 'run'
    • add new class ExecutorIterationTask.
    • change, moving and rename class ThreadExecutor to ThreadAsyncExecutor.
    • change and rename class SleepTask to SleepIteration.
    • change and rename class TaskPool to NextIterationInPools.
    • ETLOperator return ExecutorIterationTask.
    • rename func order_flow to ordering_flow_tasks.
    • rename func start_executor to sync_executor.
    • rename field FlowItem.config_hash to FlowItem.notebook_hash
    • change FLOW_CONFIGS_DIR and rename FLOW_CONFIGS_DIR to NOTEBOOKS_DIR
    • rename objects config to notebook
    • add class Settings
    Source code(tar.gz)
    Source code(zip)
  • 0.3.1(May 15, 2021)

  • 0.2.2(May 13, 2021)

Owner
Павел Максимов
Python Data Engineer, Python Developer, ETL, Разработчик рекомендательных систем
Павел Максимов
Data Analytics on Genomes and Genetics

Data Analytics performed on On genomes and Genetics dataset to predict genetic disorder and disorder subclass. DONE by TEAM SIGMA!

1 Jan 12, 2022
Numerical Analysis toolkit centred around PDEs, for demonstration and understanding purposes not production

Numerics Numerical Analysis toolkit centred around PDEs, for demonstration and understanding purposes not production Use procedure: Initialise a new i

George Whittle 1 Nov 13, 2021
pipeline for migrating lichess data into postgresql

How Long Does It Take Ordinary People To "Get Good" At Chess? TL;DR: According to 5.5 years of data from 2.3 million players and 450 million games, mo

Joseph Wong 182 Nov 11, 2022
Generate lookml for views from dbt models

dbt2looker Use dbt2looker to generate Looker view files automatically from dbt models. Features Column descriptions synced to looker Dimension for eac

lightdash 126 Dec 28, 2022
This is a repo documenting the best practices in PySpark.

Spark-Syntax This is a public repo documenting all of the "best practices" of writing PySpark code from what I have learnt from working with PySpark f

Eric Xiao 447 Dec 25, 2022
PyClustering is a Python, C++ data mining library.

pyclustering is a Python, C++ data mining library (clustering algorithm, oscillatory networks, neural networks). The library provides Python and C++ implementations (C++ pyclustering library) of each

Andrei Novikov 1k Jan 05, 2023
Senator Trades Monitor

Senator Trades Monitor This monitor will grab the most recent trades by senators and send them as a webhook to discord. Installation To use the monito

Yousaf Cheema 5 Jun 11, 2022
Autopsy Module to analyze Registry Hives based on bookmarks provided by EricZimmerman for his tool RegistryExplorer

Autopsy Module to analyze Registry Hives based on bookmarks provided by EricZimmerman for his tool RegistryExplorer

Mohammed Hassan 13 Mar 31, 2022
VevestaX is an open source Python package for ML Engineers and Data Scientists.

VevestaX Track failed and successful experiments as well as features. VevestaX is an open source Python package for ML Engineers and Data Scientists.

Vevesta 24 Dec 14, 2022
A Python adaption of Augur to prioritize cell types in perturbation analysis.

A Python adaption of Augur to prioritize cell types in perturbation analysis.

Theis Lab 2 Mar 29, 2022
bigdata_analyse 大数据分析项目

bigdata_analyse 大数据分析项目 wish 采用不同的技术栈,通过对不同行业的数据集进行分析,期望达到以下目标: 了解不同领域的业务分析指标 深化数据处理、数据分析、数据可视化能力 增加大数据批处理、流处理的实践经验 增加数据挖掘的实践经验

Way 2.4k Dec 30, 2022
PySpark Structured Streaming ROS Kafka ApacheSpark Cassandra

PySpark-Structured-Streaming-ROS-Kafka-ApacheSpark-Cassandra The purpose of this project is to demonstrate a structured streaming pipeline with Apache

Zekeriyya Demirci 5 Nov 13, 2022
track your GitHub statistics

GitHub-Stalker track your github statistics 👀 features find new followers or unfollowers find who got a star on your project or remove stars find who

Bahadır Araz 34 Nov 18, 2022
University Challenge 2021 With Python

University Challenge 2021 This repository contains: The TeX file of the technical write-up describing the University / HYPER Challenge 2021 under late

2 Nov 27, 2021
Building house price data pipelines with Apache Beam and Spark on GCP

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

1 Nov 22, 2021
Analysiscsv.py for extracting analysis and exporting as CSV

wcc_analysis Lichess page documentation: https://lichess.org/page/world-championships Each WCC has a study, studies are fetched using: https://lichess

32 Apr 25, 2022
Using Python to derive insights on particular Pokemon, Types, Generations, and Stats

Pokémon Analysis Andreas Nikolaidis February 2022 Introduction Exploratory Analysis Correlations & Descriptive Statistics Principal Component Analysis

Andreas 1 Feb 18, 2022
Python utility to extract differences between two pandas dataframes.

Python utility to extract differences between two pandas dataframes.

Jaime Valero 8 Jan 07, 2023
An experimental project I'm undertaking for the sole purpose of increasing my Python knowledge

5ePy is an experimental project I'm undertaking for the sole purpose of increasing my Python knowledge. #Goals Goal: Create a working, albeit lightwei

Hayden Covington 1 Nov 24, 2021
Tablexplore is an application for data analysis and plotting built in Python using the PySide2/Qt toolkit.

Tablexplore is an application for data analysis and plotting built in Python using the PySide2/Qt toolkit.

Damien Farrell 81 Dec 26, 2022