Easy to use Google Pub/Sub

Overview

Relé makes integration with Google PubSub straightforward and easy.

Build Status Read the Docs Code Coverage PyPI - Python Version PyPI - Downloads

Motivation and Features

The Publish-Subscribe pattern and specifically the Google Cloud Pub/Sub library are very powerful tools but you can easily cut your fingers on it. Relé makes integration seamless by providing Publisher, Subscriber and Worker classes with the following features:

  • Powerful Publishing API
  • Highly Scalable Worker
  • Intuitive Subscription Management
  • Easily Extensible Middleware
  • Ready to go Django/Flask integration
  • CLI
  • And much more!

What it looks like

# Publish to the topic
import rele

rele.publish(topic='photo-uploaded', data={'customer_id': 123})

# Subscribe to the Pub/Sub topic
from rele import sub

@sub(topic='photo-uploaded')
def photo_uploaded(data, **kwargs):
    print(f"Customer {data['customer_id']} has uploaded an image")

What's in the name

"Relé" is Spanish for relay, a technology that has played a key role in history in the evolution of communication and electrical technology, including the telegraph, telephone, electricity transmission, and transistors.

Install

Relé supports Python 3.6+ and installing via pip

pip install rele

or with Django integration

pip install rele[django]

or with Flask integration

pip install rele[flask]

Quickstart

Please see our documentation to get started.

You can also read more about it here


Running Tests

Does the code actually work?

  make test
Comments
  • Clean up rele.config.setup + Worker() init

    Clean up rele.config.setup + Worker() init

    This relates to #114 & #119

    This makes makes all config variables nullable falling back to standard google envars, without breaking the current api.

    The new apis would look like this is you have GOOGLE_APPLICATION_CREDENTIALS set.

    rele.config.setup()
    
    w = Worker([sub1, sub2])
    w.run_forever()
    

    TBH, I think reading global configs from the environment is easier to reason about than a singleton so I'd suggest that. Any non globals should just be passed into the instances on init.

    :tophat: What?

    Provide a description of what has been implemented.

    :thinking: Why?

    Give an explanation of why.

    :link: Related issue

    Add related issue's number. Example: Fix #1

    opened by craigmulligan 8
  • Runtime error when trying to read project_id from default google creds

    Runtime error when trying to read project_id from default google creds

    I'm getting the following error when trying to run rele.config.setup using default credentials:

    rele.config.setup({
    	"GC_CREDENTIALS_PATH": None,
        "MIDDLEWARE": [
         	"rele.contrib.LoggingMiddleware",
            "rele.contrib.FlaskMiddleware",
         ],
         "APP_NAME": "smart_comms_planner",
    }, flask_app=app)
    

    output:

      File "/Users/matthewbridges/repos/smart-comms-planner/src/__init__.py", line 66, in <module>
        rele.config.setup(settings["rele"], flask_app=app)
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/config.py", line 69, in setup
        init_global_publisher(config)
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/publishing.py", line 10, in init_global_publisher
        gc_project_id=config.gc_project_id,
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/config.py", line 59, in gc_project_id
        return self.credentials.project_id
    AttributeError: 'Credentials' object has no attribute 'project_id'
    

    The code is incorrectly attempting to read project_id off the credentials object, when in fact it is returned as a tuple from get_google_defaults.

    I've added a proposed fix here: https://github.com/mercadona/rele/pull/195

    opened by Itsindigo 7
  • Style guide

    Style guide

    We currently have flake8 in our linting command. We do not have isort in the project style guidelines.

    Shall we add it?

    Or try out something new like black?

    good first issue question 
    opened by andrewgy8 5
  • Configure Publisher Client timeout

    Configure Publisher Client timeout

    I would like to be able to configure the timeout when publishing a message in a blocking fashion. Right now, the default and hard coded way is set to 3.0 seconds.

    I propose adding a configuration so I can declare any number of seconds for the publisher.

    Ex.

    RELE = {
    	...
    	'PUBLISHER_TIMEOUT': 5.0
    }
    
    enhancement 
    opened by andrewgy8 4
  • Add an API to allow passing objects to middleware

    Add an API to allow passing objects to middleware

    I'm adding rele to flask app, and the subscription callbacks need the flask app_context. I've done this by way of middleware:

    eg:

    class FlaskMiddleware(BaseMiddleware):
        def pre_process_message(self, subscription, message):
            from server import app
            self.ctx = app.app_context()
            self.ctx.push()
    
        def post_process_message(self):
            self.ctx.pop()
    

    But to make this reusable across our services and other flask apps, I'd need a way to add arbitrary data to the config that is passed to middleware.setup method or have an easy way to call custom middleware functions.

    eg:

    class FlaskMiddleware(BaseMiddleware):
        def setup(self, config):
            self.app = config["FLASK_APP"]
    
        def pre_process_message(self, subscription, message):
            self.ctx = self.app.app_context()
            self.ctx.push()
    
        def post_process_message(self):
            self.ctx.pop()
    
    enhancement 
    opened by craigmulligan 4
  • Simplify initializing and running the Worker

    Simplify initializing and running the Worker

    One consistent peice of feedback that I have received is simplifying the Worker class. Right now, you must Initialize with the subs, each individual config attribute, run setup, run start, and then sleep.

    Like this:

    worker = Worker(
        [photo_uploaded],
        config.gc_project_id,
        config.credentials,
        config.ack_deadline,
    )
    worker.setup()
    worker.start()
    sleep(120)
    

    I propose we simplify the API to run a worker to look something like this instead:

    worker = Worker([photo_uploaded], config)
    worker.run(sleep=120)
    

    run would call both setup and start, and we could add the standard sleep method. In addition we consolidate the configuration into one attribute.

    This would be backwards compatible since we will be creating a new method. And the change to Worker initialization could also fall back to the declared attributes if defined. Otherwise, use the config object.

    enhancement 
    opened by andrewgy8 4
  • Improve sub decorator

    Improve sub decorator

    The sub decorator can get some ease of use improvements

    1. Use functools.wraps to preserve __name__ and __doc__ of the original function
    2. Inspect callback function signature and raise an exception if the signature is not compatible (ie. missing data)
    3. Log a warning message if the function cannot be discovered (ie. no sub in the path)

    Slightly related to the second point: @sub("topic", filter_by=42) will raise a TypeError at runtime (because filters are used as callables but they are not checked)

    enhancement hacktoberfest 
    opened by tbarbugli 3
  • Breaking with Google PubSub >= 2.0

    Breaking with Google PubSub >= 2.0

    In the 1.0 release, I had to pin pubsub to <2.0. Otherwise, our tests would break. It seems there were major changes, and we need to update some usages in our code.

    To reproduce:

    1. update google pubsub library to >2.0.
    2. Run tests
    good first issue hacktoberfest 
    opened by andrewgy8 3
  • Execution doesn't stop when using debugger

    Execution doesn't stop when using debugger

    Hey 👋 first of all thanks for this great library!

    This might not be a strictly Relé related issue, but when I tried to pause execution to debug some issue (ie dropping a __import__("pdb").set_trace()) inside the subscription message handler function it did not stop, but just kept on processing messages.

    This might be a trivial problem to overcome, but my search engine fu failed me. I'm guessing the problem is that the subscriber handles the message in a thread, so it would somehow need to let the parent know to stop, but not really sure how to go about it.

    question 
    opened by daaain 3
  • Add unrecoverable_middleware

    Add unrecoverable_middleware

    This is useful if you know you will never be able to handle the message and don't have a dead-letter queue setup.

    :tophat: What?

    A custom exception and middleware to automate and ignore bad messages.

    :thinking: Why?

    I've been using this a fair amount in some of our services and thought it may be useful to others.

    opened by craigmulligan 3
  • Add python 3.8 to travis

    Add python 3.8 to travis

    :tophat: What?

    Add python 3.8 to travis.

    :thinking: Why?

    It was officially released on Oct 14, 2019

    This would allow us to officially add Python 3.8. However, it seems that grpcio is not compiled for Python 3.8 and therefore the travis build takes much longer to complete.. Ran for 5 min 40 sec.

    So either we can wait and not support 3.8 yet, or merge this and have the travis build take longer than expected.

    I leave it as a draft until we decide.

    opened by andrewgy8 3
  • [travis] PSQL is not required for tests execution

    [travis] PSQL is not required for tests execution

    :tophat: What?

    Remove PostgreSQL service from Travis pipeline

    :thinking: Why?

    • It's not used
    • It could give a false sensation of security by believing there are tests checking database features (Django DB)
    hacktoberfest 
    opened by Maks3w 0
  • [add] Python 3.10 support

    [add] Python 3.10 support

    :tophat: What?

    • Run the test suite against the minimumPython supported (3.6) and the maximum version supported (3.10)
    • Also update the dist image to bionic because it's the only one with support for all the Python versions.
    • Add the PIP classifiers for Python 3.10

    :thinking: Why?

    Be sure the library is compatible with the latest/current version of Python (3.10)

    hacktoberfest 
    opened by Maks3w 0
  • Add native async support for subscriptions

    Add native async support for subscriptions

    Currently, async functions can't be used with the @sub context manager.

    @sub(topic='my-topic'):
    async def async_handler(data, **kwargs):
        print('data', data)
    

    If you try, you'll get the following error.

    Configuring worker with 1 subscription(s)...
      dicom_uploads - handle_upload
    /usr/local/lib/python3.9/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py:126: RuntimeWarning: coroutine 'handle_upload' was never awaited
      callback(message)
    RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    

    There are some simple workarounds, but it would be nice if this was supported natively.

    @sub(topic='my-topic'):
    def sync_handler(data, **kwargs)
        return asyncio.run(async_handler(data, **kwargs))
    
    async def async_handler(data, **kwargs):
        print('data', data)
    
    opened by csaroff 6
  • Add support for arbitrary deserializers

    Add support for arbitrary deserializers

    :tophat: What?

    Allow custom deserializers other than the default json deserializer.

    :thinking: Why?

    Some of GCP's pub/sub notifications have messages that are simple strings which can't be deserialized as json.

    For example, notifications that a record was inserted into the dicom store comes through with a string indicating the path to that record and nothing more.

    :link: https://github.com/mercadona/rele/issues/229

    opened by csaroff 8
  • Add subscription option to disable json parsing

    Add subscription option to disable json parsing

    Currently rele's subscription model only supports consuming messages that are json deserializable.

    Unfortunately some of GCP's core services publish non-json messages. For example, the gcp dicom store publishes notifications to a topic when a new dicom instance was inserted, but the message field is just the dicom instance path. When attempting to consume these messages through rele, we get the following error.

    Exception raised while processing message for dicom_uploads - handle_upload: JSONDecodeError
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/rele/subscription.py", line 112, in __call__
        data = json.loads(message.data.decode("utf-8"))
      File "/usr/local/lib/python3.9/json/__init__.py", line 346, in loads
        return _default_decoder.decode(s)
      File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode
        obj, end = self.raw_decode(s, idx=_w(s, 0).end())
      File "/usr/local/lib/python3.9/json/decoder.py", line 355, in raw_decode
        raise JSONDecodeError("Expecting value", s, err.value) from None
    json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    

    Can we add an option to disable deserialization of these messages? Something like:

    from rele import sub
    
    @sub(topic='my-topic', deserialize_message=False):
    def handle_upload(data, **kwargs):
        print('Handling data', data)
    
    opened by csaroff 2
Releases(1.6.0)
  • 1.6.0(Aug 4, 2022)

  • v1.4.1(Apr 20, 2022)

    What's Changed

    • [Deleted] Delete post-publish-message-failure hook on VerboseLoggingMiddleware. (#220)

    Full Changelog: https://github.com/mercadona/rele/compare/v1.4.0...v1.4.1

    Source code(tar.gz)
    Source code(zip)
  • v1.4.0(Apr 13, 2022)

    What's Changed

    • [Added] Added a VerboseLoggingMiddleware that does not truncate mesage payload. (#218)

    Full Changelog: https://github.com/mercadona/rele/compare/v1.3.0...v1.4.0

    Source code(tar.gz)
    Source code(zip)
  • v1.3.0(Apr 5, 2022)

    What's Changed

    • GC Project Id & Windows support by @chirgjin-les in https://github.com/mercadona/rele/pull/215
    • Release v1.3.0 by @jonasae in https://github.com/mercadona/rele/pull/216

    New Contributors

    • @chirgjin-les made their first contribution in https://github.com/mercadona/rele/pull/215

    Full Changelog: https://github.com/mercadona/rele/compare/v1.2.0...v1.3.0

    Source code(tar.gz)
    Source code(zip)
  • v1.2.0(Dec 15, 2021)

    • [CHANGED] TimeotError from publisher (#212)
    • Added filter_subs_by setting in documentation (#208)
    • Automatic topic creation (#206)
    • Log post publish success (#204)
    Source code(tar.gz)
    Source code(zip)
  • 1.1.1(Jun 28, 2021)

  • 1.1.0(Mar 10, 2021)

    • Google Pubsub 2.0 Compat (#192)
    • Add validations to the sub decorator (#189)
    • Add new post_publish_hook and deprecate the old one (#190)
    • Discover and load settings when publishing (#188)
    • Fix #180: Raise error when the config loads a repeated subscription (#187)
    Source code(tar.gz)
    Source code(zip)
  • 1.0.0(Sep 25, 2020)

  • 0.14.0(Aug 5, 2020)

    • BREAKING CHANGE: Remove GC_CREDENTIALS (#174)
    • DEPRECATE: GC_PROJECT_ID setting (#178)
    • Add changelog to the docs site (#179)
    • Catch TimeoutError and run post_publish_failure when blocking (#172)
    Source code(tar.gz)
    Source code(zip)
  • 0.13.0(Jul 10, 2020)

  • 0.13.dev0(Jun 16, 2020)

  • 0.12.0(Jun 12, 2020)

  • 0.11.0(Jun 4, 2020)

  • 0.10.0(Feb 4, 2020)

    • Adjust default THREADS_PER_SUBSCRIPTION (#152)
    • Add unrecoverable_middleware (#150)
    • Allow multiple filters (#148)
    • Configure timeout from .publish() (#143)
    • Dont crash when subscription topic does not exist (#142)
    Source code(tar.gz)
    Source code(zip)
  • 0.9.1(Jan 2, 2020)

  • 0.9.0(Dec 20, 2019)

    • Flask support via middleware (#127)
    • Add message attributes to metrics log (#128)
    • Specify number of threads per subscriber with Subscription ThreadPoolExecutor (#139)
    • Publishing timeout while blocking (#137)
    • Clean up rele.config.setup + Worker() init (#132)
    Source code(tar.gz)
    Source code(zip)
  • 0.8.1(Nov 25, 2019)

  • 0.8.0(Nov 22, 2019)

    • Worker run method (#118)
    • Add kwargs to setup method passed through to middleware (#123)
    • Add missing worker middleware hooks (#121)
    • Add 3.8 support
    • More Documentation
    Source code(tar.gz)
    Source code(zip)
  • 0.7.0(Oct 21, 2019)

  • 0.6.0(Sep 23, 2019)

    • BREAKING: Remove drf as a dependency (#91)
    • Add message as a parameter for middleware hooks (#99)
    • Check setting.CONN_MAX_AGE and warn when not 0 (#97)
    • More documentation
    Source code(tar.gz)
    Source code(zip)
  • 0.5.0(Aug 9, 2019)

  • 0.4.1(Jun 19, 2019)

  • 0.4.0(Jun 17, 2019)

    • Set DEFAULT_ACK_DEADLINE (#49)
    • Filter by message attributes (#66)
    • BREAKING: All Relé settings are defined in a dict (#60)

    Old structure:

    from google.oauth2 import service_account
    RELE_GC_CREDENTIALS = service_account.Credentials.from_service_account_file(
        'rele/settings/dummy-credentials.json'
    )
    RELE_GC_PROJECT_ID = 'dummy-project-id'
    

    New structure:

    from google.oauth2 import service_account
    RELE = {
        'GC_CREDENTIALS': service_account.Credentials.from_service_account_file(
            'rele/settings/dummy-credentials.json'
        ),
        'GC_PROJECT_ID': 'dummy-project-id',
        'MIDDLEWARE': [
            'rele.contrib.LoggingMiddleware',
            'rele.contrib.DjangoDBMiddleware',
        ],
        'SUB_PREFIX': 'delivery',
        'APP_NAME': 'delivery',
    }
    
    • rele.contrib.middleware (#55)
    • Prefix argument in sub decorator (#47)
    • Add timestamp to the published message (#42)
    • BREAKING: Explicit publisher and subscriber configuration (#43)
    • Sphinx documentation (#27, #34, #40, #41)
    • Contributing guidelines (#32)
    Source code(tar.gz)
    Source code(zip)
  • v0.3.1(Jun 5, 2019)

  • v0.3.0(May 14, 2019)

    • Ability to run in emulator mode (#12)
    • Add Travis-CI builds (#10)
    • More friendly global publish (#11)
    • Non-blocking behaviour when publishing by default (#6)
    Source code(tar.gz)
    Source code(zip)
A discord account nuker with lots of tools that will destroy a discord account

A discord account nuker with lots of tools that will destroy a discord account (token destroyer... and much more).

firexi 10 Apr 28, 2022
Telegram Bot For Screenshot Generation.

Screenshotit_bot Telegram Bot For Screenshot Generation. Description An attempt to implement the screenshot generation of telegram files without downl

1 Nov 06, 2021
Spotify Web API client for Python 3

Welcome to the GitHub repository of Tekore! We provide a client for the Spotify Web API for Python, complete with all available endpoints and authenti

Felix Hildén 186 Dec 22, 2022
Quickly and efficiently delete your entire tweet history with the help of your Twitter archive without worrying about the pointless 3200 tweet limit imposed by Twitter.

Twitter Nuke Quickly and efficiently delete your entire tweet history with the help of your Twitter archive without worrying about the puny and pointl

Mayur Bhoi 73 Dec 12, 2022
Unofficial Coinbase Python Library

Unofficial Coinbase Python Library Python Library for the Coinbase API for use with three legged oAuth2 and classic API key usage Version 0.3.0 Requir

George Sibble 104 Dec 01, 2022
this is a telegram torrent bot

owner of this repo :- AYUSH contact me :- AYUSH Slam Mirror Bot This is a telegram bot writen in python for mirroring files on the internet to our bel

AYUSH 237 Dec 16, 2021
A Python app which retrieves the rank and players' equipped skins during a match

VALORANT rank yoinker About The Project Usage Contributing Contact Acknowledgements Disclaimer About The Project Their Queue Current Skin Current Rank

Isaac Kenyon 270 Jan 04, 2023
Tools for Twitter

Tools for Twitter Data This is a start of a collection of tools to use for collecting data via the Twitter API. If you do not have a Twitter Developer

DiscoverText 36 Oct 13, 2022
This is a Anti Channel Ban Robots

AntiChannelBan This is a Anti Channel Ban Robots delete and ban message sent by channels Heroku Deployment 💜 Heroku is the best way to host ur Projec

BᵣₐyDₑₙ 25 Dec 10, 2021
TG-Streaming-bot - TG Simple Streaming bot

TG Simple Streaming bot telegram video straming bot 🎚️ Features Play youtube li

HyDrix 4 May 05, 2022
🤖 Fast and simple bot to transform links from Amazon into a nice post with your referral link in Telegram 🛒

AmazonBot 🤖 Fast and simple bot to transform links from Amazon into a nice post with your referral link in Telegram 🛒 Prerequisites You need Python

Alternative Profit 3 Dec 25, 2022
Wonderful Phoenix-Bot

Phoenix Bot Discord Phoenix Bot is inspired by Natewong1313's Bird Bot project yet due to lack of activity by their team. We have decided to revive th

Senior Developer 0 Aug 12, 2021
A Python wrapper for the DeepL API

deepl.py A Python wrapper for the DeepL API installing Install and update using pip: pip install deepl.py A simple example. # Sync Sample import deep

grarich 18 Dec 12, 2022
Discord Bot for SurPath Hub's server

Dayong Dayong is dedicated to helping Discord servers build and manage their communities. Multipurpose —lots of features, lots of automation. Self-hos

SurPath Hub 6 Dec 18, 2021
Free and Open Source Machine Translation API. 100% self-hosted, no limits, no ties to proprietary services. Built on top of Argos Translate.

LibreTranslate Try it online! | API Docs Free and Open Source Machine Translation API, entirely self-hosted. Unlike other APIs, it doesn't rely on pro

UAV4GEO 3.5k Jan 03, 2023
A basic Ubisoft API wrapper created in python.

UbisoftAPI A basic Ubisoft API wrapper created in python. I will be updating this with more endpoints as time goes on. Please note that this is my fir

Ethan 2 Oct 31, 2021
Fully Dockerized cryptocurrencies Trading Bot, based on Freqtrade engine. Multi instances.

Cryptocurrencies Trading Bot - Freqtrade Manager This automated Trading Bot is based on the amazing Freqtrade one. It allows you to manage many Freqtr

Cédric Dugat 47 Dec 06, 2022
Connect your Nintendo Switch playing status to Discord!

Disclaimer: Unfortunately, it appears that Nintendo has removed returning self-Presence in their API as of recently, making this project near obsolete

Deltaion Lee 145 Dec 30, 2022
Telegram bot which has truecaller and smsbomber features

Truecaller-telegram_bot Add your telegram bot api key in main.py and you are good to go To get a api key Goto telegram and search BotFather From the c

Rudranag 32 Dec 05, 2022
Telegram üzerinden paylaşılan kısa linkleri geçmenin daha hızlı bir yolu

Telegram Url skipper Telegramda paylaşılan kısa linkleri geçmenin daha hızlı bir yolu · Hata Raporla · Öneri Yap İçerik Tablosu Kurulum Kullanım Lisan

WarForPeace 6 Oct 07, 2022