Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
A Python-based application demonstrating various search algorithms, namely Depth-First Search (DFS), Breadth-First Search (BFS), and A* Search (Manhattan Distance Heuristic)

A Python-based application demonstrating various search algorithms, namely Depth-First Search (DFS), Breadth-First Search (BFS), and the A* Search (using the Manhattan Distance Heuristic)

17 Aug 14, 2022
Customers Segmentation with RFM Scores and K-means

Customer Segmentation with RFM Scores and K-means RFM Segmentation table: K-Means Clustering: Business Problem Rule-based customer segmentation machin

5 Aug 10, 2022
Simple, light-weight config handling through python data classes with to/from JSON serialization/deserialization.

Simple but maybe too simple config management through python data classes. We use it for machine learning.

Eren Gölge 67 Nov 29, 2022
Distributed training framework for TensorFlow, Keras, PyTorch, and Apache MXNet.

Horovod Horovod is a distributed deep learning training framework for TensorFlow, Keras, PyTorch, and Apache MXNet. The goal of Horovod is to make dis

Horovod 12.9k Jan 07, 2023
Fast Fourier Transform-accelerated Interpolation-based t-SNE (FIt-SNE)

FFT-accelerated Interpolation-based t-SNE (FIt-SNE) Introduction t-Stochastic Neighborhood Embedding (t-SNE) is a highly successful method for dimensi

Kluger Lab 547 Dec 21, 2022
Bodywork deploys machine learning projects developed in Python, to Kubernetes.

Bodywork deploys machine learning projects developed in Python, to Kubernetes. It helps you to: serve models as microservices execute batch jobs run r

Bodywork Machine Learning 409 Jan 01, 2023
Fundamentals of Machine Learning

Fundamentals-of-Machine-Learning This repository introduces the basics of machine learning algorithms for preprocessing, regression and classification

Happy N. Monday 3 Feb 15, 2022
DeepSpeed is a deep learning optimization library that makes distributed training easy, efficient, and effective.

DeepSpeed is a deep learning optimization library that makes distributed training easy, efficient, and effective. 10x Larger Models 10x Faster Trainin

Microsoft 8.4k Dec 30, 2022
Both social media sentiment and stock market data are crucial for stock price prediction

Relating-Social-Media-to-Stock-Movement-Public - We explore the application of Machine Learning for predicting the return of the stock by using the information of stock returns. A trading strategy ba

Vishal Singh Parmar 15 Oct 29, 2022
Azure MLOps (v2) solution accelerators.

Azure MLOps (v2) solution accelerator Welcome to the MLOps (v2) solution accelerator repository! This project is intended to serve as the starting poi

Microsoft Azure 233 Jan 01, 2023
A machine learning project that predicts the price of used cars in the UK

Car Price Prediction Image Credit: AA Cars Project Overview Scraped 3000 used cars data from AA Cars website using Python and BeautifulSoup. Cleaned t

Victor Umunna 7 Oct 13, 2022
icepickle is to allow a safe way to serialize and deserialize linear scikit-learn models

icepickle It's a cooler way to store simple linear models. The goal of icepickle is to allow a safe way to serialize and deserialize linear scikit-lea

vincent d warmerdam 24 Dec 09, 2022
The code from the Machine Learning Bookcamp book and a free course based on the book

The code from the Machine Learning Bookcamp book and a free course based on the book

Alexey Grigorev 5.5k Jan 09, 2023
The Simpsons and Machine Learning: What makes an Episode Great?

The Simpsons and Machine Learning: What makes an Episode Great? Check out my Medium article on this! PROBLEM: The Simpsons has had a decline in qualit

1 Nov 02, 2021
moDel Agnostic Language for Exploration and eXplanation

moDel Agnostic Language for Exploration and eXplanation Overview Unverified black box model is the path to the failure. Opaqueness leads to distrust.

Model Oriented 1.2k Jan 04, 2023
Distributed scikit-learn meta-estimators in PySpark

sk-dist: Distributed scikit-learn meta-estimators in PySpark What is it? sk-dist is a Python package for machine learning built on top of scikit-learn

Ibotta 282 Dec 09, 2022
SageMaker Python SDK is an open source library for training and deploying machine learning models on Amazon SageMaker.

SageMaker Python SDK SageMaker Python SDK is an open source library for training and deploying machine learning models on Amazon SageMaker. With the S

Amazon Web Services 1.8k Jan 01, 2023
Machine learning template for projects based on sklearn library.

Machine learning template for projects based on sklearn library.

Janez Lapajne 17 Oct 28, 2022
Time Series Prediction with tf.contrib.timeseries

TensorFlow-Time-Series-Examples Additional examples for TensorFlow Time Series(TFTS). Read a Time Series with TFTS From a Numpy Array: See "test_input

Zhiyuan He 476 Nov 17, 2022
Iterative stochastic gradient descent (SGD) linear regressor with regularization

SGD-Linear-Regressor Iterative stochastic gradient descent (SGD) linear regressor with regularization Dataset: Kaggle “Graduate Admission 2” https://w

Zechen Ma 1 Oct 29, 2021