Educational project on how to build an ETL (Extract, Transform, Load) data pipeline, orchestrated with Airflow.

Overview

ETL Pipeline with Airflow, Spark, s3, MongoDB and Amazon Redshift

AboutScenarioBase ConceptsPrerequisitesSet-upInstallationAirflow InterfacePipeline Task by TaskShut Down and Restart AirflowLearning Resources


About

Educational project on how to build an ETL (Extract, Transform, Load) data pipeline, orchestrated with Airflow.

An AWS s3 bucket is used as a Datalake in which json files are stored. The data is extracted from a json and parsed (cleaned). It is then transformed/processed with Spark (PySpark) and loaded/stored in either a Mongodb database or in an Amazon Redshift Data Warehouse.

The pipeline architecture - author's interpretation:

Note: Since this project was built for learning purposes and as an example, it functions only for a single scenario and data schema.

The project is built in Python and it has 2 main parts:

  1. The Airflow DAG file, dags/dagRun.py, which orchestrates the data pipeline tasks.
  2. The PySpark data transformation/processing script, located in sparkFiles/sparkProcess.py

Note: The code and especially the comments in the python files dags/dagRun.py and sparkFiles/sparkProcess.py are intentionally verbose for a better understanding of the functionality.

Scenario

The Romanian COVID-19 data, provided by https://datelazi.ro/, contains COVID-19 data for each county, including the total COVID numbers from one day to the next. It does not contain the difference in numbers between the days (i.e. for county X in day 1 there were 7 cases, in day 2 there were 37 cases). This data is loaded as a json file in the s3 bucket.

Find the differences between days for all counties (i.e. for county X there were 30 more cases in day 2 than in day 1). If the difference is smaller than 0 (e.g. because of a data recording error), then the difference for that day should be 0.

Base concepts

Prerequisites

Set-up

Download / pull the repo to your desired location.

You will have to create an AWS s3 user specifficaly for Airflow to interact with the s3 bucket. The credentials for that user will have to be saved in the s3 file found the directory /airflow-data/creds:

[airflow-spark1]
aws_access_key_id = 
aws_secret_access_key = 

On rows 17 and 18 in dags/dagRun.py you have the option to choose what databases system to use, mongoDB (noSQL) or Amazon Redshift (RDBMS), just by commenting/uncommenting one or the other:

# database = 'mongoDB'
database = 'Redshift'

If you want to use mongoDB, you will have to enter the mongoDB connection string (or environment variable or file with the string) in the dags/dagRun.py file, line 22:

client = pymongo.MongoClient('mongoDB_connection_string')

If you want to use a Redshift cluster, you will have to provide your Amazon Redshift database name, host and the rest of the credentials from row 29 to 34 in dags/dagRun.py:

dbname = 'testairflow'
host = '*******************************.eu-central-1.redshift.amazonaws.com'
port = '****'
user = '*********'
password = '********************'
awsIAMrole = 'arn:aws:iam::************:role/*******

You will have to change the s3 bucket name and file key (the name of the file saved in the s3 bucket) located at lines 148 and line 150 in dags/dagRun.py:

# name of the file in the AWS s3 bucket
key = 'countyData.json'
# name of the AWS s3 bucket
bucket = 'renato-airflow-raw'

In the repo directory, execute the following command that will create the .env file containig the Airflow UID and GID needed by docker-compose:

echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env

Installation

Start the installation with:

docker-compose up -d

This command will pull and create Docker images and containers for Airflow, according to the instructions in the docker-compose.yml file:

After everything has been installed, you can check the status of your containers (if they are healthy) with:

docker ps

Note: it might take up to 30 seconds for the containers to have the healthy flag after starting.

Airflow Interface

You can now access the Airflow web interface by going to http://localhost:8080/. If you have not changed them in the docker-compose.yml file, the default user is airflow and password is airflow:

After signing in, the Airflow home page is the DAGs list page. Here you will see all your DAGs and the Airflow example DAGs, sorted alphabetically.

Any DAG python script saved in the directory dags/, will show up on the DAGs page (e.g. the first DAG, analyze_json_data, is the one built for this project).

Note: If you update the code in the python DAG script, the airflow DAGs page has to be refreshed

Note: If you do not want to see any Airflow example dags, se the AIRFLOW__CORE__LOAD_EXAMPLES: flag to False in the docker-compose.yml file before starting the installation.

Click on the name of the dag to open the DAG details page:

On the Graph View page you can see the dag running through each task (getLastProcessedDate, getDate, etc) after it has been unpaused and trigerred:

Pipeline Task by Task

Task getLastProcessedDate

Finds the last processed date in the mongo database and saves/pushes it in an Airflow XCom

Task getDate

Grabs the data saved in the XCom and depending of the value pulled, returns the task id parseJsonFile or the task id endRun

Task parseJsonFile

The json contains unnecessary data for this case, so it needs to be parsed to extract only the daily total numbers for each county.

If there is any new data to be processed (the date extracted in the task getLastProcessedDate is older than dates in the data) it is saved in a temp file in the directory sparkFiles:

i.e.: for the county AB, on the 7th of April, there were 1946 COVID cases, on the 8th of April there were 19150 cases

It also returns the task id endRun if there was no new data, or the task ID processParsedData

Task processParsedData

Executes the PySpark script sparkFiles/sparkProcess.py.

The parsed data is processed and the result is saved in another temporary file in the sparkFiles directory:

i.e.: for the county AB, on the 8th of April there were 104 more cases than on the 7th of April

Task saveToDB

Save the processed data either in the mongoDB database:

Or in Redshift:

Note: The Redshift column names are the full name of the counties as the short version for some of them conflicts with SQL reserved words

Task endRun

Dummy task used as the end of the pipeline

Shut Down and Restart Airflow

If you want to make changes to any of the configuration files docker-compose.yml, Dockerfile, requirements.txt you will have to shut down the Airflow instance with:

docker-compose down

This command will shut down and delete any containers created/used by Airflow.

For any changes made in the configuration files to be applied, you will have to rebuild the Airflow images with the command:

docker-compose build

Recreate all the containers with:

docker-compose up -d

Learning Resources

These are some useful learning resources for anyone interested in Airflow and Spark:

License

You can check out the full license here

This project is licensed under the terms of the MIT license.

Owner
Renato
Renato
TextDescriptives - A Python library for calculating a large variety of statistics from text

A Python library for calculating a large variety of statistics from text(s) using spaCy v.3 pipeline components and extensions. TextDescriptives can be used to calculate several descriptive statistic

150 Dec 30, 2022
CubingB is a timer/analyzer for speedsolving Rubik's cubes, with smart cube support

CubingB is a timer/analyzer for speedsolving Rubik's cubes (and related puzzles). It focuses on supporting "smart cubes" (i.e. bluetooth cubes) for recording the exact moves of a solve in real time.

Zach Wegner 5 Sep 18, 2022
OpenDrift is a software for modeling the trajectories and fate of objects or substances drifting in the ocean, or even in the atmosphere.

opendrift OpenDrift is a software for modeling the trajectories and fate of objects or substances drifting in the ocean, or even in the atmosphere. Do

OpenDrift 167 Dec 13, 2022
The Master's in Data Science Program run by the Faculty of Mathematics and Information Science

The Master's in Data Science Program run by the Faculty of Mathematics and Information Science is among the first European programs in Data Science and is fully focused on data engineering and data a

Amir Ali 2 Jun 17, 2022
apricot implements submodular optimization for the purpose of selecting subsets of massive data sets to train machine learning models quickly.

Please consider citing the manuscript if you use apricot in your academic work! You can find more thorough documentation here. apricot implements subm

Jacob Schreiber 457 Dec 20, 2022
A lightweight, hub-and-spoke dashboard for multi-account Data Science projects

A lightweight, hub-and-spoke dashboard for cross-account Data Science Projects Introduction Modern Data Science environments often involve many indepe

AWS Samples 3 Oct 30, 2021
OpenARB is an open source program aiming to emulate a free market while encouraging players to participate in arbitrage in order to increase working capital.

Overview OpenARB is an open source program aiming to emulate a free market while encouraging players to participate in arbitrage in order to increase

Tom 3 Feb 12, 2022
The lastest all in one bombing tool coded in python uses tbomb api

BaapG-Attack is a python3 based script which is officially made for linux based distro . It is inbuit mass bomber with sms, mail, calls and many more bombing

59 Dec 25, 2022
My first Python project is a simple Mad Libs program.

Python CLI Mad Libs Game My first Python project is a simple Mad Libs program. Mad Libs is a phrasal template word game created by Leonard Stern and R

Carson Johnson 1 Dec 10, 2021
Generates a simple report about the current Covid-19 cases and deaths in Malaysia

Generates a simple report about the current Covid-19 cases and deaths in Malaysia. Results are delay one day, data provided by the Ministry of Health Malaysia Covid-19 public data.

Yap Khai Chuen 7 Dec 15, 2022
This is an example of how to automate Ridit Analysis for a dataset with large amount of questions and many item attributes

This is an example of how to automate Ridit Analysis for a dataset with large amount of questions and many item attributes

Ishan Hegde 1 Nov 17, 2021
DataPrep — The easiest way to prepare data in Python

DataPrep — The easiest way to prepare data in Python

SFU Database Group 1.5k Dec 27, 2022
:truck: Agile Data Preparation Workflows made easy with dask, cudf, dask_cudf and pyspark

To launch a live notebook server to test optimus using binder or Colab, click on one of the following badges: Optimus is the missing framework to prof

Iron 1.3k Dec 30, 2022
Tkinter Izhikevich Neuron Model With Python

TKINTER IZHIKEVICH NEURON MODEL WITH PYTHON Hodgkin-Huxley Model It is a mathematical model for the generation and transmission of action potentials i

Rabia KOÇ 8 Jul 16, 2022
Validation and inference over LinkML instance data using souffle

Translates LinkML schemas into Datalog programs and executes them using Souffle, enabling advanced validation and inference over instance data

Linked data Modeling Language 7 Aug 07, 2022
Programmatically access the physical and chemical properties of elements in modern periodic table.

API to fetch elements of the periodic table in JSON format. Uses Pandas for dumping .csv data to .json and Flask for API Integration. Deployed on "pyt

the techno hack 3 Oct 23, 2022
Created covid data pipeline using PySpark and MySQL that collected data stream from API and do some processing and store it into MYSQL database.

Created covid data pipeline using PySpark and MySQL that collected data stream from API and do some processing and store it into MYSQL database.

2 Nov 20, 2021
DefAP is a program developed to facilitate the exploration of a material's defect chemistry

DefAP is a program developed to facilitate the exploration of a material's defect chemistry. A large number of features are provided and rapid exploration is supported through the use of autoplotting

6 Oct 25, 2022
Pip install minimal-pandas-api-for-polars

Minimal Pandas API for Polars Install From PyPI: pip install minimal-pandas-api-for-polars Example Usage (see tests/test_minimal_pandas_api_for_polars

Austin Ray 6 Oct 16, 2022
Statistical package in Python based on Pandas

Pingouin is an open-source statistical package written in Python 3 and based mostly on Pandas and NumPy. Some of its main features are listed below. F

Raphael Vallat 1.2k Dec 31, 2022