Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
Skoot is a lightweight python library of machine learning transformer classes that interact with scikit-learn and pandas.

Skoot is a lightweight python library of machine learning transformer classes that interact with scikit-learn and pandas. Its objective is to ex

Taylor G Smith 54 Aug 20, 2022
Python module for data science and machine learning users.

dsnk-distributions package dsnk distribution is a Python module for data science and machine learning that was created with the goal of reducing calcu

Emmanuel ASIFIWE 1 Nov 23, 2021
Machine Learning for RC Cars

Suiron Machine Learning for RC Cars Prediction visualization (green = actual, blue = prediction) Click the video below to see it in action! Dependenci

Kendrick Tan 706 Jan 02, 2023
A high performance and generic framework for distributed DNN training

BytePS BytePS is a high performance and general distributed training framework. It supports TensorFlow, Keras, PyTorch, and MXNet, and can run on eith

Bytedance Inc. 3.3k Dec 28, 2022
Accelerating model creation and evaluation.

EmeraldML A machine learning library for streamlining the process of (1) cleaning and splitting data, (2) training, optimizing, and testing various mo

Yusuf 0 Dec 06, 2021
Land Cover Classification Random Forest

You can perform Land Cover Classification on Satellite Images using Random Forest and visualize the result using Earthpy package. Make sure to install the required packages and such as

Dr. Sander Ali Khowaja 1 Jan 21, 2022
Traingenerator 🧙 A web app to generate template code for machine learning ✨

Traingenerator 🧙 A web app to generate template code for machine learning ✨ 🎉 Traingenerator is now live! 🎉

Johannes Rieke 1.2k Jan 07, 2023
Painless Machine Learning for python based on scikit-learn

PlainML Painless Machine Learning Library for python based on scikit-learn. Install pip install plainml Example from plainml import KnnModel, load_ir

1 Aug 06, 2022
Uses WiFi signals :signal_strength: and machine learning to predict where you are

Uses WiFi signals and machine learning (sklearn's RandomForest) to predict where you are. Even works for small distances like 2-10 meters.

Pascal van Kooten 5k Jan 09, 2023
Banpei is a Python package of the anomaly detection.

Banpei Banpei is a Python package of the anomaly detection. Anomaly detection is a technique used to identify unusual patterns that do not conform to

Hirofumi Tsuruta 282 Jan 03, 2023
SageMaker Python SDK is an open source library for training and deploying machine learning models on Amazon SageMaker.

SageMaker Python SDK SageMaker Python SDK is an open source library for training and deploying machine learning models on Amazon SageMaker. With the S

Amazon Web Services 1.8k Jan 01, 2023
An AutoML survey focusing on practical systems.

This project is a community effort in constructing and maintaining an up-to-date beginner-friendly introduction to AutoML, focusing on practical systems. AutoML is a big field, and continues to grow

AutoGOAL 16 Aug 14, 2022
Tools for diffing and merging of Jupyter notebooks.

nbdime provides tools for diffing and merging of Jupyter Notebooks.

Project Jupyter 2.3k Jan 03, 2023
Pandas DataFrames and Series as Interactive Tables in Jupyter

Pandas DataFrames and Series as Interactive Tables in Jupyter Star Turn pandas DataFrames and Series into interactive datatables in both your notebook

Marc Wouts 364 Jan 04, 2023
Distributed deep learning on Hadoop and Spark clusters.

Note: we're lovingly marking this project as Archived since we're no longer supporting it. You are welcome to read the code and fork your own version

Yahoo 1.3k Dec 28, 2022
Evaluate on three different ML model for feature selection using Breast cancer data.

Anomaly-detection-Feature-Selection Evaluate on three different ML model for feature selection using Breast cancer data. ML models: SVM, KNN and MLP.

Tarek idrees 1 Mar 17, 2022
Breast-Cancer-Classification - Using SKLearn breast cancer dataset which contains 569 examples and 32 features classifying has been made with 6 different algorithms

Breast-Cancer-Classification - Using SKLearn breast cancer dataset which contains 569 examples and 32 features classifying has been made with 6 different algorithms

Mert Sezer Ardal 1 Jan 31, 2022
Drug prediction

I have collected data about a set of patients, all of whom suffered from the same illness. During their course of treatment, each patient responded to one of 5 medications, Drug A, Drug B, Drug c, Dr

Khazar 1 Jan 28, 2022
A library of extension and helper modules for Python's data analysis and machine learning libraries.

Mlxtend (machine learning extensions) is a Python library of useful tools for the day-to-day data science tasks. Sebastian Raschka 2014-2021 Links Doc

Sebastian Raschka 4.2k Dec 29, 2022
Combines MLflow with a database (PostgreSQL) and a reverse proxy (NGINX) into a multi-container Docker application

Combines MLflow with a database (PostgreSQL) and a reverse proxy (NGINX) into a multi-container Docker application (with docker-compose).

Philip May 2 Dec 03, 2021