This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Overview

Sample streaming Dataflow pipeline written in Python

This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

For more details, see the following Beam Summit 2021 talk:

To run this pipeline, you need to have the SDK installed, and a project in Google Cloud Platform, even if you run the pipeline locally with the direct runner:

Description of the pipeline

Data input

We are using here a public PubSub topic with data, so we don't need to setup our own to run this pipeline.

The topic is projects/pubsub-public-data/topics/taxirides-realtime.

That topic contains messages from the NYC Taxi Ride dataset. Here is a sample of the data contained in a message in that topic:

{
  "ride_id": "328bec4b-0126-42d4-9381-cb1dbf0e2432",
  "point_idx": 305,
  "latitude": 40.776270000000004,
  "longitude": -73.99111,
  "timestamp": "2020-03-27T21:32:51.48098-04:00",
  "meter_reading": 9.403651,
  "meter_increment": 0.030831642,
  "ride_status": "enroute",
  "passenger_count": 1
}

But the messages also contain metadata, that is useful for streaming pipelines. In this case, the messages contain an attribute of name ts, which contains the same timestamp as the field of name timestamp in the data. Remember that PubSub treats the data as just a string of bytes, so it does not know anything about the data itself. The metadata fields are normally used to publish messages with specific ids and/or timestamps.

To inspect the messages from this topic, you can create a subscription, and then pull some messages.

To create a subscription, use the gcloud cli utility (installed by default in the Cloud Shell):

export TOPIC=projects/pubsub-public-data/topics/taxirides-realtime
gcloud pubsub subscriptions create taxis --topic $TOPIC

To pull messages:

gcloud pubsub subscriptions pull taxis --limit 3

or if you have jq (for pretty printing of JSON)

gcloud pubsub subscriptions pull taxis --limit 3 | grep " {" | cut -f 2 -d ' ' | jq

Pay special attention to the Attributes column (metadata). You will see that the timestamp included as a field in the metadata, as well as in the data. We will leverage that metadata field for the timestamps used in our streaming pipeline.

Data output

This pipeline writes the output to BigQuery, in streaming append-only mode.

The destination tables must exist prior to running the pipeline.

If you have the GCloud cli utility installed (for instance, it is installed by default in the Cloud Shell), you can create the tables from the command line.

You need to create a BigQuery dataset too, in the same region:

After that, you can create the destination tables with the provided script

./scripts/create_tables.sh taxi_rides

Algorithm / business rules

We are using a session window with a gap of 10 seconds. That means that all the messages with the same ride_id will be grouped together, as long as their timestamps are 10 seconds within each other. Any message with a timestamp more than 10 seconds apart will be discarded (for old timestamps) or will open a new window (for newer timestamps).

With the messages inside each window (that is, each different ride_id will be part of a different window), we will calculate the duration of the session, as the difference between the min and max timestamps in the window. We will also calculate the number of events in that session.

We will use a GroupByKey to operate with all the messages in a window. This will load all the messages in the window into memory. This is fine, as in Beam streaming, a window is always processed in a worker (windows cannot be split across different workers).

This is an example of the kind of logic that can be implemented leveraging windows in streaming pipelines. This grouping of messages across ride_id and event timestamps is automatically done by the pipeline, and we just need to express the generic operations to be performed with each window, as part of our pipeline.

Running the pipeline

Prerequirements

You need to have a Google Cloud project, and the gcloud SDK configured to run the pipeline. For instance, you could run it from the Cloud Shell in Google Cloud Platform (gcloud would be automatically configured).

Then you need to create a Google Cloud Storage bucket, with the same name as your project id, and in the same region where you will run Dataflow:

Make sure that you have a Python environment with Python 3 (<3.9). For instance a virtualenv, and install apache-beam[gcp] and python-dateutil in your local environment. For instance, assuming that you are running in a virtualenv:

pip install "apache-beam[gcp]" python-dateutil

Run the pipeline

Once the tables are created and the dependencies installed, edit scripts/launch_dataflow_runner.sh and set your project id and region, and then run it with:

./scripts/launch_dataflow_runner.sh

The outputs will be written to the BigQuery tables, and in the profile directory in your bucket you should see Python gprof files with profiling information.

CPU profiling

Beam uses the Python profiler to produce files in Python gprof format. You will need some scripting to interpret those files and extracts insights out of them.

In this repository, you will find some sample output in data/beam.prof, that you can use to check what the profiling output looks like. Use the following Colab notebook with an example analyzing that sample profiling data:

Refer to this post for more details about how to interpret that file:

License

Copyright 2021 Israel Herraiz

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Owner
Israel Herraiz
Strategic Cloud Engineer @GoogleCloudPlatform
Israel Herraiz
A custom qq-plot for two sample data comparision

QQ-Plot 2 Sample Just a gist to include the custom code to draw a qq-plot in python when dealing with a "two sample problem". This means when u try to

1 Dec 20, 2021
A blender import/export system for Defold

defold-blender-export A Blender export system for the Defold game engine. Setup Notes There are no exhaustive documents for this tool yet. Its just no

David Lannan 27 Dec 30, 2022
Data aggregated from the reports found at the MCPS COVID Dashboard into a set of visualizations.

Montgomery County Public Schools COVID-19 Visualizer Contents About this project Data Support this project About this project Data All data we use can

James 3 Jan 19, 2022
Fast visualization of radar_scenes based on oleschum/radar_scenes

RadarScenes Tools About This python package provides fast visualization for the RadarScenes dataset. The Open GL based visualizer is smoother than ole

Henrik Söderlund 2 Dec 09, 2021
:bowtie: Create a dashboard with python!

Installation | Documentation | Gitter Chat | Google Group Bowtie Introduction Bowtie is a library for writing dashboards in Python. No need to know we

Jacques Kvam 753 Dec 22, 2022
Piglet-shaders - PoC of custom shaders for Piglet

Piglet custom shader PoC This is a PoC for compiling Piglet fragment shaders usi

6 Mar 10, 2022
Simple and fast histogramming in Python accelerated with OpenMP.

pygram11 Simple and fast histogramming in Python accelerated with OpenMP with help from pybind11. pygram11 provides functions for very fast histogram

Doug Davis 28 Dec 14, 2022
GitHubPoster - Make everything a GitHub svg poster

GitHubPoster Make everything a GitHub svg poster 支持 Strava 开心词场 扇贝 Nintendo Switch GPX 多邻国 Issue

yihong 1.3k Jan 02, 2023
The official colors of the FAU as matplotlib/seaborn colormaps

FAU - Colors The official colors of Friedrich-Alexander-Universität Erlangen-Nürnberg (FAU) as matplotlib / seaborn colormaps. We support the old colo

Machine Learning and Data Analytics Lab FAU 9 Sep 05, 2022
Python package that generates hardware pinout diagrams as SVG images

PinOut A Python package that generates hardware pinout diagrams as SVG images. The package is designed to be quite flexible and works well for general

336 Dec 20, 2022
A Python wrapper of Neighbor Retrieval Visualizer (NeRV)

PyNeRV A Python wrapper of the dimensionality reduction algorithm Neighbor Retrieval Visualizer (NeRV) Compile Set up the paths in Makefile then make.

2 Aug 29, 2021
a simple REPL display lib for circuitpython

Circuitpython-termio-lib a simple REPL display lib for circuitpython Fonctions cls clear terminal screen and set cursor on top left : coords 0,0 usage

BeBoXoS 1 Nov 17, 2021
CONTRIBUTIONS ONLY: Voluptuous, despite the name, is a Python data validation library.

CONTRIBUTIONS ONLY What does this mean? I do not have time to fix issues myself. The only way fixes or new features will be added is by people submitt

Alec Thomas 1.8k Dec 31, 2022
DrawBot lets you draw images taken from the internet on Skribbl.io, Gartic Phone and Paint

DrawBot You don't speak french? No worries, english translation is over here. C'est quoi ? DrawBot est un logiciel codé par V2F qui va prendre possess

V2F 205 Jan 01, 2023
Create SVG drawings from vector geodata files (SHP, geojson, etc).

SVGIS Create SVG drawings from vector geodata files (SHP, geojson, etc). SVGIS is great for: creating small multiples, combining lots of datasets in a

Neil Freeman 78 Dec 09, 2022
Set of matplotlib operations that are not trivial

Matplotlib Snippets This repository contains a set of matplotlib operations that are not trivial. Histograms Histogram with bins adapted to log scale

Raphael Meudec 1 Nov 15, 2021
Analysis and plotting for motor/prop/ESC characterization, thrust vs RPM and torque vs thrust

esc_test This is a Python package used to plot and analyze data collected for the purpose of characterizing a particular propeller, motor, and ESC con

Alex Spitzer 1 Dec 28, 2021
Automate the case review on legal case documents and find the most critical cases using network analysis

Automation on Legal Court Cases Review This project is to automate the case review on legal case documents and find the most critical cases using netw

Yi Yin 7 Dec 28, 2022
Visualizing weather changes across the world using third party APIs and Python.

WEATHER FORECASTING ACROSS THE WORLD Overview Python scripts were created to visualize the weather for over 500 cities across the world at varying di

G Johnson 0 Jun 12, 2021
This component provides a wrapper to display SHAP plots in Streamlit.

streamlit-shap This component provides a wrapper to display SHAP plots in Streamlit.

Snehan Kekre 30 Dec 10, 2022