Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
Python library which makes it possible to dynamically mask/anonymize data using JSON string or python dict rules in a PySpark environment.

pyspark-anonymizer Python library which makes it possible to dynamically mask/anonymize data using JSON string or python dict rules in a PySpark envir

6 Jun 30, 2022
Python implementation of Weng-Lin Bayesian ranking, a better, license-free alternative to TrueSkill

Python implementation of Weng-Lin Bayesian ranking, a better, license-free alternative to TrueSkill This is a port of the amazing openskill.js package

Open Debates Project 156 Dec 14, 2022
Python package for stacking (machine learning technique)

vecstack Python package for stacking (stacked generalization) featuring lightweight functional API and fully compatible scikit-learn API Convenient wa

Igor Ivanov 671 Dec 25, 2022
Automated machine learning: Review of the state-of-the-art and opportunities for healthcare

Automated machine learning: Review of the state-of-the-art and opportunities for healthcare

42 Dec 23, 2022
A Python package to preprocess time series

Disclaimer: This package is WIP. Do not take any APIs for granted. tspreprocess Time series can contain noise, may be sampled under a non fitting rate

Maximilian Christ 57 Dec 17, 2022
List of Data Science Cheatsheets to rule the world

Data Science Cheatsheets List of Data Science Cheatsheets to rule the world. Table of Contents Business Science Business Science Problem Framework Dat

Favio André Vázquez 11.7k Dec 30, 2022
MLBox is a powerful Automated Machine Learning python library.

MLBox is a powerful Automated Machine Learning python library. It provides the following features: Fast reading and distributed data preprocessing/cle

Axel 1.4k Jan 06, 2023
TensorFlow Decision Forests (TF-DF) is a collection of state-of-the-art algorithms for the training, serving and interpretation of Decision Forest models.

TensorFlow Decision Forests (TF-DF) is a collection of state-of-the-art algorithms for the training, serving and interpretation of Decision Forest models. The library is a collection of Keras models

538 Jan 01, 2023
Time series changepoint detection

changepy Changepoint detection in time series in pure python Install pip install changepy Examples from changepy import pelt from cha

Rui Gil 92 Nov 08, 2022
Python bindings for MPI

MPI for Python Overview Welcome to MPI for Python. This package provides Python bindings for the Message Passing Interface (MPI) standard. It is imple

MPI for Python 604 Dec 29, 2022
BioPy is a collection (in-progress) of biologically-inspired algorithms written in Python

BioPy is a collection (in-progress) of biologically-inspired algorithms written in Python. Some of the algorithms included are mor

Jared M. Smith 40 Aug 26, 2022
A classification model capable of accurately predicting the price of secondhand cars

The purpose of this project is create a classification model capable of accurately predicting the price of secondhand cars. The data used for model building is open source and has been added to this

Akarsh Singh 2 Sep 13, 2022
Machine Learning University: Accelerated Natural Language Processing Class

Machine Learning University: Accelerated Natural Language Processing Class This repository contains slides, notebooks and datasets for the Machine Lea

AWS Samples 2k Jan 01, 2023
The easy way to combine mlflow, hydra and optuna into one machine learning pipeline.

mlflow_hydra_optuna_the_easy_way The easy way to combine mlflow, hydra and optuna into one machine learning pipeline. Objective TODO Usage 1. build do

shibuiwilliam 9 Sep 09, 2022
Simple and flexible ML workflow engine.

This is a simple and flexible ML workflow engine. It helps to orchestrate events across a set of microservices and create executable flow to handle requests. Engine is designed to be configurable wit

Katana ML 295 Jan 06, 2023
A repository to index and organize the latest machine learning courses found on YouTube.

📺 ML YouTube Courses At DAIR.AI we ❤️ open education. We are excited to share some of the best and most recent machine learning courses available on

DAIR.AI 9.6k Jan 01, 2023
customer churn prediction prevention in telecom industry using machine learning and survival analysis

Telco Customer Churn Prediction - Plotly Dash Application Description This dash application allows you to predict telco customer churn using machine l

Benaissa Mohamed Fayçal 3 Nov 20, 2021
Relevance Vector Machine implementation using the scikit-learn API.

scikit-rvm scikit-rvm is a Python module implementing the Relevance Vector Machine (RVM) machine learning technique using the scikit-learn API. Quicks

James Ritchie 204 Nov 18, 2022
A Python Package to Tackle the Curse of Imbalanced Datasets in Machine Learning

imbalanced-learn imbalanced-learn is a python package offering a number of re-sampling techniques commonly used in datasets showing strong between-cla

6.2k Jan 01, 2023
Automated Machine Learning Pipeline with Feature Engineering and Hyper-Parameters Tuning

The mljar-supervised is an Automated Machine Learning Python package that works with tabular data. I

MLJAR 2.4k Jan 02, 2023