This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

Overview

Sample streaming Dataflow pipeline written in Python

This repository contains a streaming Dataflow pipeline written in Python with Apache Beam, reading data from PubSub.

For more details, see the following Beam Summit 2021 talk:

To run this pipeline, you need to have the SDK installed, and a project in Google Cloud Platform, even if you run the pipeline locally with the direct runner:

Description of the pipeline

Data input

We are using here a public PubSub topic with data, so we don't need to setup our own to run this pipeline.

The topic is projects/pubsub-public-data/topics/taxirides-realtime.

That topic contains messages from the NYC Taxi Ride dataset. Here is a sample of the data contained in a message in that topic:

{
  "ride_id": "328bec4b-0126-42d4-9381-cb1dbf0e2432",
  "point_idx": 305,
  "latitude": 40.776270000000004,
  "longitude": -73.99111,
  "timestamp": "2020-03-27T21:32:51.48098-04:00",
  "meter_reading": 9.403651,
  "meter_increment": 0.030831642,
  "ride_status": "enroute",
  "passenger_count": 1
}

But the messages also contain metadata, that is useful for streaming pipelines. In this case, the messages contain an attribute of name ts, which contains the same timestamp as the field of name timestamp in the data. Remember that PubSub treats the data as just a string of bytes, so it does not know anything about the data itself. The metadata fields are normally used to publish messages with specific ids and/or timestamps.

To inspect the messages from this topic, you can create a subscription, and then pull some messages.

To create a subscription, use the gcloud cli utility (installed by default in the Cloud Shell):

export TOPIC=projects/pubsub-public-data/topics/taxirides-realtime
gcloud pubsub subscriptions create taxis --topic $TOPIC

To pull messages:

gcloud pubsub subscriptions pull taxis --limit 3

or if you have jq (for pretty printing of JSON)

gcloud pubsub subscriptions pull taxis --limit 3 | grep " {" | cut -f 2 -d ' ' | jq

Pay special attention to the Attributes column (metadata). You will see that the timestamp included as a field in the metadata, as well as in the data. We will leverage that metadata field for the timestamps used in our streaming pipeline.

Data output

This pipeline writes the output to BigQuery, in streaming append-only mode.

The destination tables must exist prior to running the pipeline.

If you have the GCloud cli utility installed (for instance, it is installed by default in the Cloud Shell), you can create the tables from the command line.

You need to create a BigQuery dataset too, in the same region:

After that, you can create the destination tables with the provided script

./scripts/create_tables.sh taxi_rides

Algorithm / business rules

We are using a session window with a gap of 10 seconds. That means that all the messages with the same ride_id will be grouped together, as long as their timestamps are 10 seconds within each other. Any message with a timestamp more than 10 seconds apart will be discarded (for old timestamps) or will open a new window (for newer timestamps).

With the messages inside each window (that is, each different ride_id will be part of a different window), we will calculate the duration of the session, as the difference between the min and max timestamps in the window. We will also calculate the number of events in that session.

We will use a GroupByKey to operate with all the messages in a window. This will load all the messages in the window into memory. This is fine, as in Beam streaming, a window is always processed in a worker (windows cannot be split across different workers).

This is an example of the kind of logic that can be implemented leveraging windows in streaming pipelines. This grouping of messages across ride_id and event timestamps is automatically done by the pipeline, and we just need to express the generic operations to be performed with each window, as part of our pipeline.

Running the pipeline

Prerequirements

You need to have a Google Cloud project, and the gcloud SDK configured to run the pipeline. For instance, you could run it from the Cloud Shell in Google Cloud Platform (gcloud would be automatically configured).

Then you need to create a Google Cloud Storage bucket, with the same name as your project id, and in the same region where you will run Dataflow:

Make sure that you have a Python environment with Python 3 (<3.9). For instance a virtualenv, and install apache-beam[gcp] and python-dateutil in your local environment. For instance, assuming that you are running in a virtualenv:

pip install "apache-beam[gcp]" python-dateutil

Run the pipeline

Once the tables are created and the dependencies installed, edit scripts/launch_dataflow_runner.sh and set your project id and region, and then run it with:

./scripts/launch_dataflow_runner.sh

The outputs will be written to the BigQuery tables, and in the profile directory in your bucket you should see Python gprof files with profiling information.

CPU profiling

Beam uses the Python profiler to produce files in Python gprof format. You will need some scripting to interpret those files and extracts insights out of them.

In this repository, you will find some sample output in data/beam.prof, that you can use to check what the profiling output looks like. Use the following Colab notebook with an example analyzing that sample profiling data:

Refer to this post for more details about how to interpret that file:

License

Copyright 2021 Israel Herraiz

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Owner
Israel Herraiz
Strategic Cloud Engineer @GoogleCloudPlatform
Israel Herraiz
Sprint planner considering JIRA issues and google calendar meetings schedule.

Sprint planner Sprint planner is a Python script for planning your Jira tasks based on your calendar availability. Installation Use the package manage

Apptension 2 Dec 05, 2021
A Python wrapper of Neighbor Retrieval Visualizer (NeRV)

PyNeRV A Python wrapper of the dimensionality reduction algorithm Neighbor Retrieval Visualizer (NeRV) Compile Set up the paths in Makefile then make.

2 Aug 29, 2021
daily report of @arkinvest ETF activity + data collection

ark_invest daily weekday report of @arkinvest ETF activity + data collection This script was created to: Extract and save daily csv's from ARKInvest's

T D 27 Jan 02, 2023
Flipper Zero documentation repo

Flipper Zero Docs Participation To fix a bug or add something new to this repository, you need to open a pull-request. Also, on every page of the site

Flipper Zero (All Repositories will be public soon) 114 Dec 30, 2022
Simple and lightweight Spotify Overlay written in Python.

Simple Spotify Overlay This is a simple yet powerful Spotify Overlay. About I have been looking for something like this ever since I got Spotify. I th

27 Sep 03, 2022
Matplotlib colormaps from the yt project !

cmyt Matplotlib colormaps from the yt project ! Colormaps overview The following colormaps, as well as their respective reversed (*_r) versions are av

The yt project 5 Sep 16, 2022
Squidpy is a tool for the analysis and visualization of spatial molecular data.

Squidpy is a tool for the analysis and visualization of spatial molecular data. It builds on top of scanpy and anndata, from which it inherits modularity and scalability. It provides analysis tools t

Theis Lab 251 Dec 19, 2022
HW 02 for CS40 - matplotlib practice

HW 02 for CS40 - matplotlib practice project instructions https://github.com/mikeizbicki/cmc-csci040/tree/2021fall/hw_02 Drake Lyric Analysis Bar Char

13 Oct 27, 2021
Interactive plotting for Pandas using Vega-Lite

pdvega: Vega-Lite plotting for Pandas Dataframes pdvega is a library that allows you to quickly create interactive Vega-Lite plots from Pandas datafra

Altair 342 Oct 26, 2022
plotly scatterplots which show molecule images on hover!

molplotly Plotly scatterplots which show molecule images on hovering over the datapoints! Required packages: pandas rdkit jupyter_dash ➑️ See example.

150 Dec 28, 2022
Create matplotlib visualizations from the command-line

MatplotCLI Create matplotlib visualizations from the command-line MatplotCLI is a simple utility to quickly create plots from the command-line, levera

Daniel Moura 46 Dec 16, 2022
Leyna's Visualizing Data With Python

Leyna's Visualizing Data Below is information on the number of bilingual students in three school districts in Massachusetts. You will also find infor

11 Oct 28, 2021
simple tool to paint axis x and y

simple tool to paint axis x and y

G705 1 Oct 21, 2021
Interactive Dashboard for Visualizing OSM Data Change

Dashboard and intuitive data downloader for more interactive experience with interpreting osm change data.

1 Feb 20, 2022
Geospatial Data Visualization using PyGMT

Example script to visualize topographic data, earthquake data, and tomographic data on a map

Utpal Kumar 2 Jul 30, 2022
Decision Border Visualizer for Classification Algorithms

dbv Decision Border Visualizer for Classification Algorithms Project description A python package for Machine Learning Engineers who want to visualize

Sven Eschlbeck 1 Nov 01, 2021
Pydrawer: The Python package for visualizing curves and linear transformations in a super simple way

pydrawer πŸ“ The Python package for visualizing curves and linear transformations in a super simple way. ✏️ Installation Install pydrawer package with

Dylan Tintenfich 56 Dec 30, 2022
Python support for Godot 🐍🐍🐍

Godot Python, because you want Python on Godot ! The goal of this project is to provide Python language support as a scripting module for the Godot ga

Emmanuel Leblond 1.4k Jan 04, 2023
Active Transport Analytics Model (ATAM) is a new strategic transport modelling and data visualization framework for Active Transport as well as emerging micro-mobility modes

{ATAM} Active Transport Analytics Model Active Transport Analytics Model (β€œATAM”) is a new strategic transport modelling and data visualization framew

Peter Stephan 0 Jan 12, 2022
A simple interpreted language for creating basic mathematical graphs.

graphr Introduction graphr is a small language written to create basic mathematical graphs. It is an interpreted language written in python and essent

2 Dec 26, 2021