Demonstrate a Dataflow pipeline that saves data from an API into BigQuery table

Overview

Overview

dataflow-mvp provides a basic example pipeline that pulls data from an API and writes it to a BigQuery table using GCP's Dataflow (i.e., Apache Beam)

Table of Contents

File Description
main.py Main Python code for the Dataflow pipeline. The function defineBQSchema defines the BQ table schema
setup.py When the pipeline is deployed in GCP as a template, GCP uses setup.py to set up the worker nodes (e.g., install required Python dependencies).
build.bat Bash script to deploy the pipeline as a reusable template in GCP.

Environment

  • Local machine running Microsoft Windows 10 Home
  • Python 3.6.8
    • As of 12/1/21, Apache Beam only supports 3.6, 3.7, and 3.8 (not 3.9). However, orjson only supports 3.6.

Getting Started

Pre-Requisites

The following instructions assume that the project ID is dataflow-mvp and you have owner access to it.

  1. If you don't have it already, install the Google Cloud SDK:
    https://cloud.google.com/sdk/docs/install

  2. Authenticate your Google account:
    gcloud auth login

  3. Create a virtual environment for Python:
    py -3.8 venv venv

  4. Activate the virtual environment, upgrade pip, and install the Apache Beam library for GCP:

"./venv/Scripts/activate.bat"
python -m pip install --upgrade pip
python -m pip install apache_beam[gcp]

Run Build

  1. To make our lives easier later, set environment variables for the following:
  • DATAFLOW_BUCKET - the name of the bucket from step 10
  • DF_TEMPLATE_NAME - the name (of your choosing) for the Dataflow template (e.g., dataflow-mvp-dog)
  • PROJECT_ID - the name of the GCP project from step 4 (e.g., dataflow-mvp)
  • GCP_REGION - the GCP region (I like to choose the region closest to me e.g., useast-1)

For instance, to set the PROJECT_ID variable in the Windows CLI, use:
set PROJECT_ID=dataflow-mvp

On Linux machines, use
export PROJECT_ID=dataflow-mvp

The instructions below assume you're working on a Windows machine. Therefore, if you're working in a Linux environment, you'll have to use $PROJECT_ID instead of %PROJECT_ID% where appropriate in the instructions below.

  1. Set the GCP project via config:
    gcloud config set project %PROJECT_ID%
  • You can verify the project is correctly set using:
    gcloud config list
  1. Enable the necessary APIs:
gcloud services enable dataflow.googleapis.com && ^
gcloud services enable cloudscheduler.googleapis.com && ^
gcloud services enable bigquery.googleapis.com && ^
gcloud services enable cloudresourcemanager.googleapis.com  && ^
gcloud services enable appengine.googleapis.com
  1. Create a service account for the Dataflow runner:
gcloud iam service-accounts create dataflow-runner --display-name "Dataflow Runner service account"
  1. Add the required IAM roles to the Dataflow runner's service account:
gcloud projects add-iam-policy-binding %PROJECT_ID% --member serviceAccount:dataflow-runner@%PROJECT_ID%.iam.gserviceaccount.com --role roles/owner
  1. Create a GCS bucket to store Dataflow code, staging files and templates:
gsutil mb -p %PROJECT_ID% -l %GCP_REGION% gs://%DATAFLOW_BUCKET%

Build the Dataflow Template

  1. In build.bat, edit the variables in lines 1 through 4:
  • DATAFLOW_BUCKET - the name of the bucket from step 10
  • DF_TEMPLATE_NAME - the name (of your choosing) for the Dataflow template (e.g., dataflow-mvp-dog)
  • PROJECT_ID - the name of the GCP project from step 4 (e.g., dataflow-mvp)
  • GCP_REGION - the GCP region (I like to choose the region closest to me e.g., useast-1)
  1. Run the build.bat script:
build.bat

This will create the template for the Dataflow job in a the specified GCS bucket.

  1. Verify that the template has been uploaded to the GCS bucket:
    gsutil ls gs://%DATAFLOW_BUCKET%/templates/%DF_TEMPLATE_NAME%

Create the Cloud Scheduler Job

  1. Finally, submit a Cloud Scheduler job to run Dataflow on a desired schedule:
gcloud scheduler jobs create http api-to-gbq-scheduler ^
--schedule="0 */3 * * *" ^
--uri="https://dataflow.googleapis.com/v1b3/projects/%PROJECT_ID%/locations/%GCP_REGION%/templates:launch?gcsPath=gs://%DATAFLOW_BUCKET%/templates/%DF_TEMPLATE_NAME%" ^
--http-method="post" ^
--oauth-service-account-email="dataflow-runner@%PROJECT_ID%.iam.gserviceaccount.com" ^
--oauth-token-scope="https://www.googleapis.com/auth/cloud-platform" ^
--message-body="{""jobName"": ""api-to-bq-df"", ""parameters"": {""region"": ""%GCP_REGION%""}, ""environment"": {""numWorkers"": ""3""}}" ^
--time-zone=America/Chicago 

Notes:

  • Alternatively, you could use the message-body-from-file argument. However, you'll need to manually specify the GCP region since we can't use environment variables within the JSON.
  • The cron string 0 */3 * * * executes the job every 3 hours.
  • The jobName parameter, api-to-bq-df, names the job as it will be listed in the Cloud Scheduler app.

Resources

Warranty

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Owner
Chris Carbonell
Chris Carbonell
statDistros is a Python library for dealing with various statistical distributions

StatisticalDistributions statDistros statDistros is a Python library for dealing with various statistical distributions. Now it provides various stati

1 Oct 03, 2021
Creating a statistical model to predict 10 year treasury yields

Predicting 10-Year Treasury Yields Intitially, I wanted to see if the volatility in the stock market, represented by the VIX index (data source), had

10 Oct 27, 2021
Datashader is a data rasterization pipeline for automating the process of creating meaningful representations of large amounts of data.

Datashader is a data rasterization pipeline for automating the process of creating meaningful representations of large amounts of data.

HoloViz 2.9k Jan 06, 2023
Tools for analyzing data collected with a custom unity-based VR for insects.

unityvr Tools for analyzing data collected with a custom unity-based VR for insects. Organization: The unityvr package contains the following submodul

Hannah Haberkern 1 Dec 14, 2022
A program that uses an API and a AI model to get info of sotcks

Stock-Market-AI-Analysis I dont mind anyone using this code but please give me credit A program that uses an API and a AI model to get info of stocks

1 Dec 17, 2021
Candlestick Pattern Recognition with Python and TA-Lib

Candlestick-Pattern-Recognition-with-Python-and-TA-Lib Goal Look at the S&P500 to try and get a better understanding of these candlestick patterns and

Ganesh Jainarain 11 Oct 07, 2022
Investigating EV charging data

Investigating EV charging data Introduction: Got an opportunity to work with a home monitoring technology company over the last 6 months whose goal wa

Yash 2 Apr 07, 2022
Retail-Sim is python package to easily create synthetic dataset of retaile store.

Retailer's Sale Data Simulation Retail-Sim is python package to easily create synthetic dataset of retaile store. Simulation Model Simulator consists

Corca AI 7 Sep 30, 2022
Toolchest provides APIs for scientific and bioinformatic data analysis.

Toolchest Python Client Toolchest provides APIs for scientific and bioinformatic data analysis. It allows you to abstract away the costliness of runni

Toolchest 11 Jun 30, 2022
Analyze the Gravitational wave data stored at LIGO/VIRGO observatories

Gravitational-Wave-Analysis This project showcases how to analyze the Gravitational wave data stored at LIGO/VIRGO observatories, using Python program

1 Jan 23, 2022
Analyzing Earth Observation (EO) data is complex and solutions often require custom tailored algorithms.

eo-grow Earth observation framework for scaled-up processing in Python. Analyzing Earth Observation (EO) data is complex and solutions often require c

Sentinel Hub 18 Dec 23, 2022
INFO-H515 - Big Data Scalable Analytics

INFO-H515 - Big Data Scalable Analytics Jacopo De Stefani, Giovanni Buroni, Théo Verhelst and Gianluca Bontempi - Machine Learning Group Exercise clas

Yann-Aël Le Borgne 58 Dec 11, 2022
pandas: powerful Python data analysis toolkit

pandas is a Python package that provides fast, flexible, and expressive data structures designed to make working with "relational" or "labeled" data both easy and intuitive.

pandas 36.4k Jan 03, 2023
Churn prediction with PySpark

It is expected to develop a machine learning model that can predict customers who will leave the company.

3 Aug 13, 2021
Recommendations from Cramer: On the show Mad-Money (CNBC) Jim Cramer picks stocks which he recommends to buy. We will use this data to build a portfolio

Backtesting the "Cramer Effect" & Recommendations from Cramer Recommendations from Cramer: On the show Mad-Money (CNBC) Jim Cramer picks stocks which

Gábor Vecsei 12 Aug 30, 2022
Minimal working example of data acquisition with nidaqmx python API

Data Aquisition using NI-DAQmx python API Based on this project It is a minimal working example for data acquisition using the NI-DAQmx python API. It

Pablo 1 Nov 05, 2021
Automatic earthquake catalog building workflow: EQTransformer + Siamese EQTransformer + PickNet + REAL + HypoInverse

Automatic regional-scale earthquake catalog building workflow: EQTransformer + Siamese EQTransforme

Xiao Zhuowei 9 Nov 27, 2022
Python script for transferring data between three drives in two separate stages

Waterlock Waterlock is a Python script meant for incrementally transferring data between three folder locations in two separate stages. It performs ha

David Swanlund 13 Nov 10, 2021
vartests is a Python library to perform some statistic tests to evaluate Value at Risk (VaR) Models

gg I wasn't satisfied with any of the other available Gemini clients, so I wrote my own. Requires Python 3.9 (maybe older, I haven't checked) and opti

RAFAEL RODRIGUES 5 Jan 03, 2023
An extension to pandas dataframes describe function.

pandas_summary An extension to pandas dataframes describe function. The module contains DataFrameSummary object that extend describe() with: propertie

Mourad 450 Dec 30, 2022